import logging
import asyncio
import json
import boto3
from boto3.dynamodb.conditions import Attr
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, InlineQueryResultArticle, InputTextMessageContent
from telegram.ext import (
Application, CommandHandler, MessageHandler, filters, ContextTypes,
CallbackQueryHandler, ConversationHandler, InlineQueryHandler
)
# NOTICE: EditedMessageHandler has been REMOVED from this import list.
from telegram.constants import ParseMode
from telegram.error import TelegramError, Forbidden
import os
# --- Configuration ---
BOT_TOKEN = os.environ.get("BOT_TOKEN")
DEV_CHAT_ID = os.environ.get("DEV_CHAT_ID")
DYNAMODB_TABLE_NAME = "TelegramChannelSubscriptions"
# --- State for ConversationHandler ---
AWAITING_CHANNEL_INPUT = range(1)
# --- Logging Setup ---
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO)
logger = logging.getLogger(__name__)
# --- DynamoDB Integration ---
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(DYNAMODB_TABLE_NAME)
def get_subscriptions(user_id: int) -> list:
try:
response = table.get_item(Key={'user_id': user_id})
return response.get('Item', {}).get('channels', [])
except Exception as e:
logger.error(f"Error getting subscriptions for {user_id}: {e}")
return []
def save_subscriptions(user_id: int, channels: list):
try:
table.put_item(Item={'user_id': user_id, 'channels': channels})
except Exception as e:
logger.error(f"Error saving subscriptions for {user_id}: {e}")
# --- Core Conversion Logic (Emoji-Proof and Compatible) ---
def convert_text_and_entities(original_text: str, entities: list) -> str:
if not original_text: return ""
if not entities: return original_text
utf16_text = original_text.encode('utf-16-le')
result_parts = []; last_offset_utf16 = 0
sorted_entities = sorted(entities, key=lambda e: e.offset)
for entity in sorted_entities:
start_bytes = entity.offset * 2; end_bytes = (entity.offset + entity.length) * 2
result_parts.append(utf16_text[last_offset_utf16:start_bytes].decode('utf-16-le'))
# The entity.type from an old library is already a string, so this is safe.
entity_type_str = entity.type
entity_text = utf16_text[start_bytes:end_bytes].decode('utf-16-le')
formatted_text = ""
if entity_type_str == "blockquote":
formatted_text = '\n'.join([f"> {line}" for line in entity_text.split('\n')])
else:
formatting = {"bold": ("*", "*"), "italic": ("_", "_"), "strikethrough": ("~", "~"), "code": ("```", "```"), "pre": ("`", "`")}.get(entity_type_str)
if formatting: formatted_text = f"{formatting[0]}{entity_text}{formatting[1]}"
else: formatted_text = entity_text
result_parts.append(formatted_text)
last_offset_utf16 = end_bytes
result_parts.append(utf16_text[last_offset_utf16:].decode('utf-16-le'))
return "".join(result_parts)
# --- Main Handlers ---
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
bot_username = context.bot.username
keyboard = [
[InlineKeyboardButton("➕ Add me to a group", url=f"https://t.me/{bot_username}?startgroup=true")],
[InlineKeyboardButton("✨ Try Inline Mode", switch_inline_query_current_chat="**bold** _italic_")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
start_text = (
f"Welcome to the <b>WhatsApp Formatter Bot</b>! 🤖\n\n"
"This bot effortlessly transforms Telegram's formatting into a copy-paste ready format for WhatsApp.\n\n"
"<b>It works in:</b>\n"
"• <b>Private chat:</b> Just send me any formatted message.\n"
"• <b>Group chat:</b> Use <code>/run</code> in reply to a message, or with text after it.\n"
"• <b>Inline mode:</b> Type my username (<code>@{bot_username}</code>) followed by your text.\n\n"
"<b>Channel Forwarding:</b>\n"
"Use the <code>/channels</code> command to manage automatic forwarding from channels.\n\n"
"<b>Other Commands:</b>\n"
"» <code>/start</code>: Show this message.\n"
"» <code>/feedback <your message></code>: Send feedback to my developer.\n\n"
"• Developed by @shantanuoak\n"
"• Supported by @samvadah"
)
await update.message.reply_text(start_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML)
# --- THIS IS THE UNIFIED HANDLER FOR NEW AND EDITED MESSAGES ---
async def handle_message_or_edit(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
# Determine if this is a new message or an edit
message = update.edited_message or update.message
# Prefix the reply if it's an edit
prefix = "*(Edited)*\n" if update.edited_message else ""
text_to_convert = message.text or message.caption or ""
entities_to_convert = message.entities or message.caption_entities or []
converted_text = convert_text_and_entities(text_to_convert, entities_to_convert)
# The old library might not support editing messages reliably,
# so we will just reply to the message (new or edited). This is robust.
await message.reply_text(f"{prefix}{converted_text}")
async def run_in_group(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
message_to_process = update.message.reply_to_message or update.message
text = message_to_process.text or message_to_process.caption or ""
entities = message_to_process.entities or message_to_process.caption_entities or []
if message_to_process == update.message:
text = text.replace("/run", "").strip()
if not text:
await update.message.reply_text("Usage: `/run <formatted text>` or reply to a message with `/run`.")
return
converted_text = convert_text_and_entities(text, entities)
await update.message.reply_text(converted_text)
async def feedback(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user = update.effective_user
feedback_text = " ".join(context.args)
if not feedback_text:
await update.message.reply_text("Please provide your feedback after the command, e.g., `/feedback I love this bot!`")
return
message = (f"📝 <b>New Feedback</b>\n\n<b>From:</b> {user.mention_html()} (ID: <code>{user.id}</code>)\n"
f"<b>Message:</b>\n{feedback_text}")
try:
await context.bot.send_message(chat_id=DEV_CHAT_ID, text=message, parse_mode=ParseMode.HTML)
await update.message.reply_text("Thank you for your feedback! It has been sent to the developer.")
except TelegramError:
await update.message.reply_text("Sorry, there was an error sending your feedback.")
async def inline_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
query = update.inline_query.query
if not query: return
# As the API does not provide entities for inline queries, we can't do a full conversion here.
# A simple replacement is a compromise for inline mode usability.
converted_text = query.replace("**", "*").replace("__", "_")
results = [InlineQueryResultArticle(id=query, title="Convert to WhatsApp Format",
description="Tap to send the converted message.",
input_message_content=InputTextMessageContent(converted_text))]
await update.inline_query.answer(results, cache_time=5)
# --- Channel Management (Conversation Handler) ---
# ... [This entire section remains the same as the previous correct version] ...
async def manage_channels(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id; subscriptions = get_subscriptions(user_id)
text = ("Here you can manage your channel subscriptions.\n\n<b>Important:</b> For me to see new posts, I must be added as an <b>administrator</b> to the channel.")
buttons = []
if subscriptions:
for channel_id in subscriptions:
try:
chat = await context.bot.get_chat(channel_id); buttons.append([InlineKeyboardButton(f"❌ Remove {chat.title}", callback_data=f"remove_{channel_id}")])
except (TelegramError, Forbidden):
buttons.append([InlineKeyboardButton(f"❌ Remove Inaccessible Channel ({channel_id})", callback_data=f"remove_{channel_id}")])
buttons.append([InlineKeyboardButton("➕ Add a Channel", callback_data="add_start")])
await update.message.reply_text(text, reply_markup=InlineKeyboardMarkup(buttons), parse_mode=ParseMode.HTML)
return ConversationHandler.END
async def button_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query; await query.answer(); data = query.data; user_id = query.from_user.id
if data.startswith("remove_"):
channel_id_to_remove = int(data.split("_")[1]); subscriptions = get_subscriptions(user_id)
if channel_id_to_remove in subscriptions:
subscriptions.remove(channel_id_to_remove); save_subscriptions(user_id, subscriptions)
await manage_channels_callback(query, context, "Subscription removed.")
elif data == "add_start":
await query.edit_message_text("Please send the channel username (e.g., @channel) or its ID."); return AWAITING_CHANNEL_INPUT
async def manage_channels_callback(query, context, text):
user_id = query.from_user.id; subscriptions = get_subscriptions(user_id); buttons = []
if subscriptions:
for channel_id in subscriptions:
try:
chat = await context.bot.get_chat(channel_id); buttons.append([InlineKeyboardButton(f"❌ Remove {chat.title}", callback_data=f"remove_{channel_id}")])
except (TelegramError, Forbidden):
buttons.append([InlineKeyboardButton(f"❌ Remove Inaccessible Channel ({channel_id})", callback_data=f"remove_{channel_id}")])
buttons.append([InlineKeyboardButton("➕ Add a Channel", callback_data="add_start")])
await query.edit_message_text(text, reply_markup=InlineKeyboardMarkup(buttons))
async def handle_channel_input(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
user_id = update.effective_user.id; channel_input = update.message.text
try:
chat = await context.bot.get_chat(chat_id=channel_input)
if chat.type != "channel":
await update.message.reply_text("This is not a channel. Please try again or /cancel."); return AWAITING_CHANNEL_INPUT
bot_member = await chat.get_member(context.bot.id)
if not bot_member.status == 'administrator':
await update.message.reply_text(f"Error: I must be an admin in {chat.title}. Please make me an admin there and try again, or /cancel."); return AWAITING_CHANNEL_INPUT
channel_id = chat.id; subscriptions = get_subscriptions(user_id)
if channel_id not in subscriptions:
subscriptions.append(channel_id); save_subscriptions(user_id, subscriptions)
await update.message.reply_text(f"Success! Subscribed to {chat.title}.")
else: await update.message.reply_text(f"You are already subscribed to {chat.title}.")
except TelegramError:
await update.message.reply_text(f"Could not find or access channel '{channel_input}'. Please check it and try again, or /cancel."); return AWAITING_CHANNEL_INPUT
await manage_channels(update, context); return ConversationHandler.END
async def cancel_conversation(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
await update.message.reply_text("Action cancelled."); await manage_channels(update, context); return ConversationHandler.END
async def channel_post_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
post = update.channel_post; channel_id = post.chat.id
logger.info(f"Received post from channel {channel_id}. Checking for subscribers...")
response = table.scan(FilterExpression=Attr('channels').contains(channel_id)); subscribers = response.get('Items', [])
if not subscribers: logger.info(f"No subscribers found for channel {channel_id}."); return
converted_text = convert_text_and_entities(post.text or post.caption, post.entities or post.caption_entities)
for item in subscribers:
user_id = item['user_id']
try:
await context.bot.send_message(chat_id=user_id, text=converted_text)
logger.info(f"Forwarded post from {channel_id} to {user_id}")
except Forbidden:
logger.warning(f"User {user_id} blocked bot. Removing subscription.")
subscriptions = get_subscriptions(user_id)
if channel_id in subscriptions: subscriptions.remove(channel_id); save_subscriptions(user_id, subscriptions)
except Exception as e: logger.error(f"Failed to send to user {user_id}: {e}")
# --- Lambda Handler Setup ---
application = Application.builder().token(BOT_TOKEN).build()
conv_handler = ConversationHandler(
entry_points=[CallbackQueryHandler(button_handler, pattern="^add_start$")],
states={AWAITING_CHANNEL_INPUT: [MessageHandler(filters.TEXT & ~filters.COMMAND, handle_channel_input)],},
fallbacks=[CommandHandler("cancel", cancel_conversation), CommandHandler("channels", manage_channels)],
per_message=False)
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("channels", manage_channels))
application.add_handler(CommandHandler("run", run_in_group, filters=filters.ChatType.GROUP))
application.add_handler(CommandHandler("feedback", feedback))
application.add_handler(conv_handler)
application.add_handler(CallbackQueryHandler(button_handler))
application.add_handler(InlineQueryHandler(inline_query))
application.add_handler(MessageHandler(filters.ChatType.CHANNEL, channel_post_handler))
# --- THIS IS THE FIX ---
# This single handler now manages both new and edited messages in private chats.
unified_message_handler = MessageHandler(
filters.ChatType.PRIVATE & (filters.TEXT | filters.CAPTION) & ~filters.COMMAND,
handle_message_or_edit
)
application.add_handler(unified_message_handler)
async def main_handler(event, context):
try:
await application.initialize()
update = Update.de_json(json.loads(event.get("body", "{}")), application.bot)
await application.process_update(update)
return {"statusCode": 200, "body": "OK"}
finally:
await application.shutdown()
def lambda_handler(event, context):
return asyncio.get_event_loop().run_until_complete(main_handler(event, context))
Public Last updated: 2025-09-01 05:23:14 AM
