wrk 2024-12-25 20-44-38
simple telegram bot ( run commands like therminal ) import subprocess from telegram import Update from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes # Replace 'YOUR_API_TOKEN' with your actual API token TOKEN = "YOUR_API_TOKEN" # List of allowed user IDs white_list = [xxxxxxxxxx, yyyyyyyyyy] async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: await update.effective_message.reply_text('Hello! You can run shell commands with /run.') async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: help_text = ( "Available commands:n" "/start - Start the botn" "/help - Show this help messagen" "/run- Execute a shell command and return the outputn" ) await update.effective_message.reply_text(help_text) async def run_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id # Get the user ID # Check if the user is in the white list if user_id not in white_list: await update.effective_message.reply_text("You do not have permission to run this command.") await update.effective_message.reply_text(f"User ID: {user_id}") # Send user ID return if not context.args: await update.effective_message.reply_text("Please provide a command to run.") await update.effective_message.reply_text(f"User ID: {user_id}") # Send user ID return command = ' '.join(context.args) # Join all arguments into a single command string try: # Execute the command with a timeout result = subprocess.run( ['timeout', '--signal=9', '10'] + command.split(), # Timeout after 10 seconds capture_output=True, text=True ) # Prepare the output message output_message = "" # Check if there was any output or error if result.stdout: output_message += f"Output:n{result.stdout}n" if result.stderr: output_message += f"Error:n{result.stderr}n" # Add user ID to the output output_message += f"User ID: {user_id}" await update.effective_message.reply_text(output_message) except Exception as e: await update.effective_message.reply_text(f"An error occurred while executing the command: {str(e)}") await update.effective_message.reply_text(f"User ID: {user_id}") # Send user ID def main() -> None: app = ApplicationBuilder().token(TOKEN).build() app.add_handler(CommandHandler('start', start)) app.add_handler(CommandHandler('help', help_command)) app.add_handler(CommandHandler('run', run_command)) # Adding /run handler app.run_polling() # Start the bot without asyncio.run() if __name__ == '__main__': main()