Profile Image

Tetow trial

@samvadah

import logging import asyncio import json import boto3 import re import html import time from decimal import Decimal from boto3.dynamodb.conditions import Attr from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, MessageEntity from telegram.ext import ( Application, CommandHandler, MessageHandler, filters, ContextTypes, CallbackQueryHandler, ConversationHandler ) from telegram.constants import ParseMode from telegram.error import TelegramError, Forbidden import os # --- Configuration & Setup --- BOT_TOKEN = os.environ.get("BOT_TOKEN") DEV_CHAT_ID = os.environ.get("DEV_CHAT_ID") DYNAMODB_CHANNELS_TABLE = "TelegramChannelSubscriptions" DYNAMODB_MESSAGES_TABLE = "TelegramMessageMappings" AWAITING_CHANNEL_INPUT = range(1) logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO) logger = logging.getLogger(__name__) dynamodb = boto3.resource('dynamodb') channels_table = dynamodb.Table(DYNAMODB_CHANNELS_TABLE) messages_table = dynamodb.Table(DYNAMODB_MESSAGES_TABLE) # --- Database Functions --- def get_subscriptions(user_id: int) -> list: try: response = channels_table.get_item(Key={'user_id': user_id}) return response.get('Item', {}).get('channels', []) except Exception as e: logger.error(f"Error getting subs: {e}"); return [] def save_subscriptions(user_id: int, channels: list): try: channels_table.put_item(Item={'user_id': user_id, 'channels': channels}) except Exception as e: logger.error(f"Error saving subs: {e}") def save_message_mapping(user_msg_id: int, bot_msg_id: int, chat_id: int): try: expiration = int(time.time()) + (2 * 24 * 60 * 60) messages_table.put_item( Item={'user_message_id': str(user_msg_id), 'bot_message_id': bot_msg_id, 'chat_id': chat_id, 'expiration_time': expiration}) except Exception as e: logger.error(f"Error saving message mapping: {e}") def get_mapping(user_msg_id: int) -> dict: try: response = messages_table.get_item(Key={'user_message_id': str(user_msg_id)}) return response.get('Item', {}) except Exception as e: logger.error(f"Error getting message mapping: {e}"); return {} # --- Core Conversion Logic --- def convert_tg_to_wa(original_text: str, entities: list) -> str: if not original_text or not entities: return original_text text_bytes = bytearray(original_text.encode('utf-16-le')) for entity in sorted(entities, key=lambda e: e.offset, reverse=True): start_index = entity.offset * 2; end_index = (entity.offset + entity.length) * 2 entity_type_str = entity.type if entity_type_str == "blockquote": slice_to_modify = text_bytes[start_index:end_index].decode('utf-16-le') quoted_lines = [f"> {line}" for line in slice_to_modify.split('\n')] modified_slice = '\n'.join(quoted_lines).encode('utf-16-le') text_bytes[start_index:end_index] = modified_slice elif entity_type_str == "text_link": slice_to_modify = text_bytes[start_index:end_index].decode('utf-16-le') modified_slice = f"{slice_to_modify} : {entity.url}".encode('utf-16-le') text_bytes[start_index:end_index] = modified_slice else: formatting = {"bold": ("*", "*"), "italic": ("_", "_"), "strikethrough": ("~", "~"), "code": ("```", "```"), "pre": ("`", "`")}.get(entity_type_str) if formatting: open_char, close_char = formatting text_bytes[end_index:end_index] = close_char.encode('utf-16-le') text_bytes[start_index:start_index] = open_char.encode('utf-16-le') return text_bytes.decode('utf-16-le') def convert_wa_to_tg(text: str) -> str: safe_text = html.escape(text) lines = safe_text.split('\n') processed_lines = [f"<blockquote>{line[1:].strip()}</blockquote>" if re.match(r'^\s*>\s', line) else line for line in lines] safe_text = '\n'.join(processed_lines) safe_text = re.sub(r'([^\s:]+)\s*:\s*(https?://[^\s]+)', r'<a href="\2">\1</a>', safe_text) safe_text = re.sub(r'(?<!`)`(?!`)(.*?)(?<!`)`(?!`)', r'<pre>\1</pre>', safe_text, flags=re.DOTALL) safe_text = re.sub(r'```(.*?)```', r'<code>\1</code>', safe_text, flags=re.DOTALL) safe_text = re.sub(r'(?<![\w*])\*(?!\s)(.*?)(?<!\s)\*(?![\w*])', r'<b>\1</b>', safe_text) safe_text = re.sub(r'(?<![\w_])_(?!\s)(.*?)(?<!\s)_(?![\w_])', r'<i>\1</i>', safe_text) safe_text = re.sub(r'(?<![\w~])~(?!\s)(.*?)(?<!\s)~(?![\w~])', r'<s>\1</s>', safe_text) return safe_text # --- Main Handlers --- async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: bot_username = context.bot.username keyboard = [[InlineKeyboardButton("➕ Add me to a group", url=f"https://t.me/{bot_username}?startgroup=true")]] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_html( "Welcome to the <b>WhatsApp Formatter Bot</b>! 🤖\n\n" "This bot effortlessly transforms Telegram's formatting into a copy-paste ready format for WhatsApp.\n\n" "<b>It works in:</b>\n" "• <b>Private chat:</b> Just send me any formatted message.\n" "• <b>Group chat:</b> Use <code>/run</code> in reply to a message.\n\n" "<b>Channel Forwarding:</b>\n" "Use the <code>/channels</code> command to manage channel forwarding.\n\n" "<b>Other Commands:</b>\n" "» <code>/start</code>: Show this message.\n" "» <code>/feedback &lt;your message&gt;</code>: Send feedback.\n" "» Reply with <code>/reverse</code> to convert a message from WA to TG format.") async def handle_message_or_edit(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: message = update.edited_message or update.message is_edit = bool(update.edited_message) text_to_convert = message.text or message.caption or "" entities_to_convert = message.entities or message.caption_entities or [] converted_text = convert_tg_to_wa(text_to_convert, entities_to_convert) if is_edit: mapping = get_mapping(message.message_id) if mapping and mapping.get('bot_message_id'): # FIX: Convert DynamoDB's Decimal type to standard int bot_message_id = int(mapping['bot_message_id']) chat_id = int(mapping['chat_id']) try: await context.bot.edit_message_text(text=converted_text, chat_id=chat_id, message_id=bot_message_id) return except TelegramError as e: logger.warning(f"Failed to edit message (ID: {bot_message_id}), sending new one. Error: {e}") bot_message = await message.reply_text(converted_text) save_message_mapping(user_msg_id=message.message_id, bot_msg_id=bot_message.message_id, chat_id=message.chat_id) async def run_in_group(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: message_to_process = update.message.reply_to_message if not message_to_process: await update.message.reply_text("Please use `/run` by **replying** to the message you want to convert.") return text = message_to_process.text or message_to_process.caption or "" entities = message_to_process.entities or message_to_process.caption_entities or [] converted_text = convert_tg_to_wa(text, entities) await update.message.reply_text(converted_text) async def reverse_message(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: replied_message = update.message.reply_to_message if not replied_message: await update.message.reply_text("Please reply to a message with `/reverse` to convert it."); return text_to_reverse = replied_message.text or replied_message.caption or "" if not text_to_reverse: await update.message.reply_text("The replied message has no text to convert."); return tg_formatted_text = convert_wa_to_tg(text_to_reverse) await replied_message.reply_html(tg_formatted_text) async def debug_info(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id if str(user_id) != str(DEV_CHAT_ID): await update.message.reply_text("You are not authorized to use this command."); return try: webhook_info = await context.bot.get_webhook_info() debug_text = (f"<b>Webhook Info:</b>\n" f"<b>URL:</b> <code>{webhook_info.url or 'Not Set'}</code>\n" f"<b>Allowed Updates:</b> <code>{', '.join(webhook_info.allowed_updates) if webhook_info.allowed_updates else 'All (Default)'}</code>") await update.message.reply_html(debug_text) except Exception as e: await update.message.reply_text(f"An error occurred: {e}") async def manage_channels(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id; subscriptions = get_subscriptions(user_id) text = "Here you can manage channel subscriptions.\n\n<b>Important:</b> For forwarding to work, I must be an <b>administrator</b> in the source channel." buttons = [] if subscriptions: for channel_id in subscriptions: try: chat = await context.bot.get_chat(channel_id); buttons.append([InlineKeyboardButton(f"❌ Remove {chat.title}", callback_data=f"remove_{channel_id}")]) except (TelegramError, Forbidden): buttons.append([InlineKeyboardButton(f"🗑️ Delete Inaccessible Channel ({channel_id})", callback_data=f"remove_{channel_id}")]) buttons.append([InlineKeyboardButton("➕ Add a Channel", callback_data="add_start")]) await update.message.reply_html(text, reply_markup=InlineKeyboardMarkup(buttons)); return ConversationHandler.END async def handle_channel_input(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: user_id = update.effective_user.id; channel_input = update.message.text.strip() try: chat = await context.bot.get_chat(chat_id=channel_input) if chat.type != "channel": await update.message.reply_text("This is not a channel. Please try again or /cancel."); return AWAITING_CHANNEL_INPUT bot_member = await context.bot.get_chat_member(chat_id=chat.id, user_id=context.bot.id) if not bot_member.status == 'administrator': await update.message.reply_html(f"Error: I am not an admin in <b>{chat.title}</b>. Please make me an administrator there and try again, or /cancel."); return AWAITING_CHANNEL_INPUT channel_id = chat.id; subscriptions = get_subscriptions(user_id) if channel_id not in subscriptions: subscriptions.append(channel_id); save_subscriptions(user_id, subscriptions) await update.message.reply_text(f"Success! Subscribed to {chat.title}.") else: await update.message.reply_text(f"You are already subscribed to {chat.title}.") except Exception as e: logger.error(f"Error handling channel input for '{channel_input}': {e}") error_text = (f"❌ <b>Access Denied</b>\n\nI couldn't access <code>{channel_input}</code>.\n\nPlease ensure:\n1. The username/ID is correct.\n2. I am an <b>administrator</b> in that channel.\n\nOnce checked, try again or /cancel.") await update.message.reply_html(error_text); return AWAITING_CHANNEL_INPUT await manage_channels(update, context); return ConversationHandler.END async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): query = update.callback_query; await query.answer(); data = query.data; user_id = query.from_user.id if data.startswith("remove_"): channel_id_to_remove = int(data.split("_")[1]); subscriptions = get_subscriptions(user_id) if channel_id_to_remove in subscriptions: subscriptions.remove(channel_id_to_remove); save_subscriptions(user_id, subscriptions) await manage_channels_callback(query, context, "Subscription removed.") elif data == "add_start": await query.edit_message_text("Please send the channel username (e.g., @channel) or its ID."); return AWAITING_CHANNEL_INPUT async def feedback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user = update.effective_user; feedback_text = " ".join(context.args) if not feedback_text: await update.message.reply_html("Usage: <code>/feedback &lt;your message&gt;</code>"); return message = (f"📝 <b>New Feedback</b>\n\n<b>From:</b> {user.mention_html()} (ID: <code>{user.id}</code>)\n" f"<b>Message:</b>\n{html.escape(feedback_text)}") try: if not DEV_CHAT_ID: raise TelegramError("DEV_CHAT_ID not set") await context.bot.send_message(chat_id=DEV_CHAT_ID, text=message, parse_mode=ParseMode.HTML) await update.message.reply_text("Thank you for your feedback!") except Exception as e: logger.error(f"Failed to send feedback: {e}"); await update.message.reply_text("Sorry, there was an error sending your feedback.") async def manage_channels_callback(query, context, text): user_id = query.from_user.id; subscriptions = get_subscriptions(user_id); buttons = [] if subscriptions: for channel_id in subscriptions: try: chat = await context.bot.get_chat(channel_id); buttons.append([InlineKeyboardButton(f"❌ Remove {chat.title}", callback_data=f"remove_{channel_id}")]) except (TelegramError, Forbidden): buttons.append([InlineKeyboardButton(f"🗑️ Delete Inaccessible Channel ({channel_id})", callback_data=f"remove_{channel_id}")]) buttons.append([InlineKeyboardButton("➕ Add a Channel", callback_data="add_start")]) await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(buttons)) async def cancel_conversation(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int: await update.message.reply_text("Action cancelled."); await manage_channels(update, context); return ConversationHandler.END # --- This is the relevant part to fix --- async def channel_post_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: post = update.channel_post channel_id = post.chat.id logger.info(f"Received post from channel {channel_id}. Checking subscribers...") # The response from DynamoDB contains Decimal objects response = channels_table.scan(FilterExpression=Attr('channels').contains(channel_id)) subscribers = response.get('Items', []) if not subscribers: logger.info(f"No subscribers for {channel_id}.") return text = post.text or post.caption or "" entities = post.entities or post.caption_entities or [] converted_text = convert_tg_to_wa(text, entities) for item in subscribers: # FIX: Convert the user_id from Decimal to a standard integer user_id = int(item['user_id']) try: bot_message = await context.bot.send_message(chat_id=user_id, text=converted_text) # We save the mapping so that if the channel post is edited, the forwarded message can also be edited. save_message_mapping(user_msg_id=post.message_id, bot_msg_id=bot_message.message_id, chat_id=user_id) logger.info(f"Forwarded post from {channel_id} to {user_id}") except Forbidden: logger.warning(f"User {user_id} blocked bot. Removing their subscription to {channel_id}.") subscriptions = get_subscriptions(user_id) if channel_id in subscriptions: subscriptions.remove(channel_id) save_subscriptions(user_id, subscriptions) except Exception as e: logger.error(f"Failed to send to user {user_id}: {e}") application = Application.builder().token(BOT_TOKEN).build() conv_handler = ConversationHandler( entry_points=[CallbackQueryHandler(button_handler, pattern="^add_start$")], states={AWAITING_CHANNEL_INPUT: [MessageHandler(filters.TEXT & ~filters.COMMAND, handle_channel_input)],}, fallbacks=[CommandHandler("cancel", cancel_conversation), CommandHandler("channels", manage_channels)], per_message=False) application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("debug", debug_info)) application.add_handler(CommandHandler("channels", manage_channels)) application.add_handler(CommandHandler("run", run_in_group, filters=filters.ChatType.GROUPS)) application.add_handler(CommandHandler("feedback", feedback)) application.add_handler(CommandHandler("reverse", reverse_message)) application.add_handler(conv_handler) application.add_handler(CallbackQueryHandler(button_handler)) application.add_handler(MessageHandler(filters.ChatType.CHANNEL, channel_post_handler)) application.add_handler(MessageHandler(filters.UpdateType.EDITED_MESSAGE, handle_message_or_edit)) application.add_handler(MessageHandler( filters.ChatType.PRIVATE & (filters.TEXT | filters.CAPTION) & ~filters.COMMAND, handle_message_or_edit )) async def main_handler(event, context): try: await application.initialize() update = Update.de_json(json.loads(event.get("body", "{}")), application.bot) await application.process_update(update) return {"statusCode": 200, "body": "OK"} finally: await application.shutdown() def lambda_handler(event, context): return asyncio.get_event_loop().run_until_complete(main_handler(event, context))

Public Last updated: 2025-09-03 07:53:27 AM