AllInfo
Main: Info Blog Temp Mail


wrk 2024-12-25 20-44-38

 simple telegram bot ( run commands like therminal )
 
import subprocess
from telegram import Update
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes

# Replace 'YOUR_API_TOKEN' with your actual API token
TOKEN = "YOUR_API_TOKEN"

# List of allowed user IDs
white_list = [xxxxxxxxxx, yyyyyyyyyy]

async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    await update.effective_message.reply_text('Hello! You can run shell commands with /run.')

async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    help_text = (
        "Available commands:n"
        "/start - Start the botn"
        "/help - Show this help messagen"
        "/run  - Execute a shell command and return the outputn"
    )
    await update.effective_message.reply_text(help_text)

async def run_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
    user_id = update.effective_user.id  # Get the user ID

    # Check if the user is in the white list
    if user_id not in white_list:
        await update.effective_message.reply_text("You do not have permission to run this command.")
        await update.effective_message.reply_text(f"User ID: {user_id}")  # Send user ID
        return

    if not context.args:
        await update.effective_message.reply_text("Please provide a command to run.")
        await update.effective_message.reply_text(f"User ID: {user_id}")  # Send user ID
        return

    command = ' '.join(context.args)  # Join all arguments into a single command string

    try:
        # Execute the command with a timeout
        result = subprocess.run(
            ['timeout', '--signal=9', '10'] + command.split(),  # Timeout after 10 seconds
            capture_output=True,
            text=True
        )
        
        # Prepare the output message
        output_message = ""
        
        # Check if there was any output or error
        if result.stdout:
            output_message += f"Output:n{result.stdout}n"
        if result.stderr:
            output_message += f"Error:n{result.stderr}n"
        
        # Add user ID to the output
        output_message += f"User ID: {user_id}"

        await update.effective_message.reply_text(output_message)

    except Exception as e:
        await update.effective_message.reply_text(f"An error occurred while executing the command: {str(e)}")
        await update.effective_message.reply_text(f"User ID: {user_id}")  # Send user ID

def main() -> None:
    app = ApplicationBuilder().token(TOKEN).build()

    app.add_handler(CommandHandler('start', start))
    app.add_handler(CommandHandler('help', help_command))
    app.add_handler(CommandHandler('run', run_command))  # Adding /run handler

    app.run_polling()  # Start the bot without asyncio.run()

if __name__ == '__main__':
    main()


18.222.106.87 / 2025-02-05_06-36-07 UTC.