Profile Image

T2w trial

@samvadah

import logging import asyncio import json import boto3 from boto3.dynamodb.conditions import Attr from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, InlineQueryResultArticle, InputTextMessageContent from telegram.ext import ( Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler, ConversationHandler, InlineQueryHandler ) # NOTICE: EditedMessageHandler has been REMOVED from this import list. from telegram.constants import ParseMode from telegram.error import TelegramError, Forbidden import os # --- Configuration --- BOT_TOKEN = os.environ.get("BOT_TOKEN") DEV_CHAT_ID = os.environ.get("DEV_CHAT_ID") DYNAMODB_TABLE_NAME = "TelegramChannelSubscriptions" # --- State for ConversationHandler --- AWAITING_CHANNEL_INPUT = range(1) # --- Logging Setup --- logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO) logger = logging.getLogger(__name__) # --- DynamoDB Integration --- dynamodb = boto3.resource('dynamodb') table = dynamodb.Table(DYNAMODB_TABLE_NAME) def get_subscriptions(user_id: int) -> list: try: response = table.get_item(Key={'user_id': user_id}) return response.get('Item', {}).get('channels', []) except Exception as e: logger.error(f"Error getting subscriptions for {user_id}: {e}") return [] def save_subscriptions(user_id: int, channels: list): try: table.put_item(Item={'user_id': user_id, 'channels': channels}) except Exception as e: logger.error(f"Error saving subscriptions for {user_id}: {e}") # --- Core Conversion Logic (Emoji-Proof and Compatible) --- def convert_text_and_entities(original_text: str, entities: list) -> str: if not original_text: return "" if not entities: return original_text utf16_text = original_text.encode('utf-16-le') result_parts = []; last_offset_utf16 = 0 sorted_entities = sorted(entities, key=lambda e: e.offset) for entity in sorted_entities: start_bytes = entity.offset * 2; end_bytes = (entity.offset + entity.length) * 2 result_parts.append(utf16_text[last_offset_utf16:start_bytes].decode('utf-16-le')) # The entity.type from an old library is already a string, so this is safe. entity_type_str = entity.type entity_text = utf16_text[start_bytes:end_bytes].decode('utf-16-le') formatted_text = "" if entity_type_str == "blockquote": formatted_text = '\n'.join([f"> {line}" for line in entity_text.split('\n')]) else: formatting = {"bold": ("*", "*"), "italic": ("_", "_"), "strikethrough": ("~", "~"), "code": ("```", "```"), "pre": ("`", "`")}.get(entity_type_str) if formatting: formatted_text = f"{formatting[0]}{entity_text}{formatting[1]}" else: formatted_text = entity_text result_parts.append(formatted_text) last_offset_utf16 = end_bytes result_parts.append(utf16_text[last_offset_utf16:].decode('utf-16-le')) return "".join(result_parts) # --- Main Handlers --- async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: bot_username = context.bot.username keyboard = [ [InlineKeyboardButton("➕ Add me to a group", url=f"https://t.me/{bot_username}?startgroup=true")], [InlineKeyboardButton("✨ Try Inline Mode", switch_inline_query_current_chat="**bold** _italic_")] ] reply_markup = InlineKeyboardMarkup(keyboard) start_text = ( f"Welcome to the <b>WhatsApp Formatter Bot</b>! 🤖\n\n" "This bot effortlessly transforms Telegram's formatting into a copy-paste ready format for WhatsApp.\n\n" "<b>It works in:</b>\n" "• <b>Private chat:</b> Just send me any formatted message.\n" "• <b>Group chat:</b> Use <code>/run</code> in reply to a message, or with text after it.\n" "• <b>Inline mode:</b> Type my username (<code>@{bot_username}</code>) followed by your text.\n\n" "<b>Channel Forwarding:</b>\n" "Use the <code>/channels</code> command to manage automatic forwarding from channels.\n\n" "<b>Other Commands:</b>\n" "» <code>/start</code>: Show this message.\n" "» <code>/feedback &lt;your message&gt;</code>: Send feedback to my developer.\n\n" "• Developed by @shantanuoak\n" "• Supported by @samvadah" ) await update.message.reply_text(start_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML) # --- THIS IS THE UNIFIED HANDLER FOR NEW AND EDITED MESSAGES --- async def handle_message_or_edit(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: # Determine if this is a new message or an edit message = update.edited_message or update.message # Prefix the reply if it's an edit prefix = "*(Edited)*\n" if update.edited_message else "" text_to_convert = message.text or message.caption or "" entities_to_convert = message.entities or message.caption_entities or [] converted_text = convert_text_and_entities(text_to_convert, entities_to_convert) # The old library might not support editing messages reliably, # so we will just reply to the message (new or edited). This is robust. await message.reply_text(f"{prefix}{converted_text}") async def run_in_group(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: message_to_process = update.message.reply_to_message or update.message text = message_to_process.text or message_to_process.caption or "" entities = message_to_process.entities or message_to_process.caption_entities or [] if message_to_process == update.message: text = text.replace("/run", "").strip() if not text: await update.message.reply_text("Usage: `/run <formatted text>` or reply to a message with `/run`.") return converted_text = convert_text_and_entities(text, entities) await update.message.reply_text(converted_text) async def feedback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user = update.effective_user feedback_text = " ".join(context.args) if not feedback_text: await update.message.reply_text("Please provide your feedback after the command, e.g., `/feedback I love this bot!`") return message = (f"📝 <b>New Feedback</b>\n\n<b>From:</b> {user.mention_html()} (ID: <code>{user.id}</code>)\n" f"<b>Message:</b>\n{feedback_text}") try: await context.bot.send_message(chat_id=DEV_CHAT_ID, text=message, parse_mode=ParseMode.HTML) await update.message.reply_text("Thank you for your feedback! It has been sent to the developer.") except TelegramError: await update.message.reply_text("Sorry, there was an error sending your feedback.") async def inline_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: query = update.inline_query.query if not query: return # As the API does not provide entities for inline queries, we can't do a full conversion here. # A simple replacement is a compromise for inline mode usability. converted_text = query.replace("**", "*").replace("__", "_") results = [InlineQueryResultArticle(id=query, title="Convert to WhatsApp Format", description="Tap to send the converted message.", input_message_content=InputTextMessageContent(converted_text))] await update.inline_query.answer(results, cache_time=5) # --- Channel Management (Conversation Handler) --- # ... [This entire section remains the same as the previous correct version] ... async def manage_channels(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id; subscriptions = get_subscriptions(user_id) text = ("Here you can manage your channel subscriptions.\n\n<b>Important:</b> For me to see new posts, I must be added as an <b>administrator</b> to the channel.") buttons = [] if subscriptions: for channel_id in subscriptions: try: chat = await context.bot.get_chat(channel_id); buttons.append([InlineKeyboardButton(f"❌ Remove {chat.title}", callback_data=f"remove_{channel_id}")]) except (TelegramError, Forbidden): buttons.append([InlineKeyboardButton(f"❌ Remove Inaccessible Channel ({channel_id})", callback_data=f"remove_{channel_id}")]) buttons.append([InlineKeyboardButton("➕ Add a Channel", callback_data="add_start")]) await update.message.reply_text(text, reply_markup=InlineKeyboardMarkup(buttons), parse_mode=ParseMode.HTML) return ConversationHandler.END async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): query = update.callback_query; await query.answer(); data = query.data; user_id = query.from_user.id if data.startswith("remove_"): channel_id_to_remove = int(data.split("_")[1]); subscriptions = get_subscriptions(user_id) if channel_id_to_remove in subscriptions: subscriptions.remove(channel_id_to_remove); save_subscriptions(user_id, subscriptions) await manage_channels_callback(query, context, "Subscription removed.") elif data == "add_start": await query.edit_message_text("Please send the channel username (e.g., @channel) or its ID."); return AWAITING_CHANNEL_INPUT async def manage_channels_callback(query, context, text): user_id = query.from_user.id; subscriptions = get_subscriptions(user_id); buttons = [] if subscriptions: for channel_id in subscriptions: try: chat = await context.bot.get_chat(channel_id); buttons.append([InlineKeyboardButton(f"❌ Remove {chat.title}", callback_data=f"remove_{channel_id}")]) except (TelegramError, Forbidden): buttons.append([InlineKeyboardButton(f"❌ Remove Inaccessible Channel ({channel_id})", callback_data=f"remove_{channel_id}")]) buttons.append([InlineKeyboardButton("➕ Add a Channel", callback_data="add_start")]) await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(buttons)) async def handle_channel_input(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: user_id = update.effective_user.id; channel_input = update.message.text try: chat = await context.bot.get_chat(chat_id=channel_input) if chat.type != "channel": await update.message.reply_text("This is not a channel. Please try again or /cancel."); return AWAITING_CHANNEL_INPUT bot_member = await chat.get_member(context.bot.id) if not bot_member.status == 'administrator': await update.message.reply_text(f"Error: I must be an admin in {chat.title}. Please make me an admin there and try again, or /cancel."); return AWAITING_CHANNEL_INPUT channel_id = chat.id; subscriptions = get_subscriptions(user_id) if channel_id not in subscriptions: subscriptions.append(channel_id); save_subscriptions(user_id, subscriptions) await update.message.reply_text(f"Success! Subscribed to {chat.title}.") else: await update.message.reply_text(f"You are already subscribed to {chat.title}.") except TelegramError: await update.message.reply_text(f"Could not find or access channel '{channel_input}'. Please check it and try again, or /cancel."); return AWAITING_CHANNEL_INPUT await manage_channels(update, context); return ConversationHandler.END async def cancel_conversation(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: await update.message.reply_text("Action cancelled."); await manage_channels(update, context); return ConversationHandler.END async def channel_post_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: post = update.channel_post; channel_id = post.chat.id logger.info(f"Received post from channel {channel_id}. Checking for subscribers...") response = table.scan(FilterExpression=Attr('channels').contains(channel_id)); subscribers = response.get('Items', []) if not subscribers: logger.info(f"No subscribers found for channel {channel_id}."); return converted_text = convert_text_and_entities(post.text or post.caption, post.entities or post.caption_entities) for item in subscribers: user_id = item['user_id'] try: await context.bot.send_message(chat_id=user_id, text=converted_text) logger.info(f"Forwarded post from {channel_id} to {user_id}") except Forbidden: logger.warning(f"User {user_id} blocked bot. Removing subscription.") subscriptions = get_subscriptions(user_id) if channel_id in subscriptions: subscriptions.remove(channel_id); save_subscriptions(user_id, subscriptions) except Exception as e: logger.error(f"Failed to send to user {user_id}: {e}") # --- Lambda Handler Setup --- application = Application.builder().token(BOT_TOKEN).build() conv_handler = ConversationHandler( entry_points=[CallbackQueryHandler(button_handler, pattern="^add_start$")], states={AWAITING_CHANNEL_INPUT: [MessageHandler(filters.TEXT & ~filters.COMMAND, handle_channel_input)],}, fallbacks=[CommandHandler("cancel", cancel_conversation), CommandHandler("channels", manage_channels)], per_message=False) application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("channels", manage_channels)) application.add_handler(CommandHandler("run", run_in_group, filters=filters.ChatType.GROUP)) application.add_handler(CommandHandler("feedback", feedback)) application.add_handler(conv_handler) application.add_handler(CallbackQueryHandler(button_handler)) application.add_handler(InlineQueryHandler(inline_query)) application.add_handler(MessageHandler(filters.ChatType.CHANNEL, channel_post_handler)) # --- THIS IS THE FIX --- # This single handler now manages both new and edited messages in private chats. unified_message_handler = MessageHandler( filters.ChatType.PRIVATE & (filters.TEXT | filters.CAPTION) & ~filters.COMMAND, handle_message_or_edit ) application.add_handler(unified_message_handler) async def main_handler(event, context): try: await application.initialize() update = Update.de_json(json.loads(event.get("body", "{}")), application.bot) await application.process_update(update) return {"statusCode": 200, "body": "OK"} finally: await application.shutdown() def lambda_handler(event, context): return asyncio.get_event_loop().run_until_complete(main_handler(event, context))

Public Last updated: 2025-09-01 05:23:14 AM