import os import json import yaml import logging import signal import sys from pathlib import Path from typing import Dict, Any, Optional from telegram import ( Update, InlineQueryResultArticle, InputTextMessageContent, InlineKeyboardButton, InlineKeyboardMarkup, ) from telegram.constants import ParseMode from telegram.ext import ( ApplicationBuilder, CommandHandler, ContextTypes, MessageHandler, filters, ChatMemberHandler, InlineQueryHandler, CallbackQueryHandler, ) DATA_DIR = Path(os.environ.get("DATA_DIR", "/data")) COUNTERS_FILE = DATA_DIR / "counters.json" CONFIG_FILE = Path(os.environ.get("CONFIG_FILE", "config.yaml")) logger = logging.getLogger("counter_bot") # ---------------------- Persistence Layer ---------------------- def load_config() -> Dict[str, Any]: if not CONFIG_FILE.exists(): logger.warning("Konfigurationsdatei %s nicht gefunden. Nutze Umgebungsvariablen / Defaults.", CONFIG_FILE) return {} with CONFIG_FILE.open("r", encoding="utf-8") as f: try: return yaml.safe_load(f) or {} except yaml.YAMLError as e: logger.error("Fehler beim Lesen der Konfigurationsdatei: %s", e) return {} def load_counters() -> Dict[str, int]: if not COUNTERS_FILE.exists(): return {} try: with COUNTERS_FILE.open("r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, dict): # ensure int values fixed = {} for k, v in data.items(): try: fixed[k] = int(v) except (ValueError, TypeError): logger.warning("Ungültiger Wert für Counter %s -> %s, setze 0", k, v) fixed[k] = 0 return fixed return {} except json.JSONDecodeError: logger.error("Konnte %s nicht lesen (JSON Fehler). Starte mit leerem Satz.", COUNTERS_FILE) return {} def atomic_write(path: Path, content: str) -> None: tmp = path.with_suffix(path.suffix + ".tmp") with tmp.open("w", encoding="utf-8") as f: f.write(content) tmp.replace(path) def save_counters(counters: Dict[str, int]) -> None: DATA_DIR.mkdir(parents=True, exist_ok=True) atomic_write(COUNTERS_FILE, json.dumps(counters, ensure_ascii=False, sort_keys=True, indent=2)) # ---------------------- Access Control ---------------------- def is_allowed(chat_id: int, allowed_list: Optional[list]) -> bool: if not allowed_list: # None oder leere Liste => alle erlaubt return True return chat_id in allowed_list # ---------------------- Command Handlers ---------------------- HELP_TEXT = ( "Verfügbare Befehle:\n" "/add - Legt einen neuen Counter mit Wert 0 an\n" "/remove - Löscht einen Counter\n" "/increment - Erhöht den Counter um 1 und zeigt neuen Wert\n" "/decrement - Verringert den Counter um 1 und zeigt neuen Wert\n" "/list - Listet alle Counter\n" "/help - Zeigt diese Hilfe an" ) async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text(HELP_TEXT) def norm_key(raw: str) -> str: return raw.strip().lower() def html_escape(text: str) -> str: return (text .replace('&', '&') .replace('<', '<') .replace('>', '>') ) async def add_counter(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = update.effective_chat.id if not context.bot_data['access'](chat_id): return if not context.args: await update.message.reply_text("Bitte einen Counternamen angeben: /add ") return key = norm_key(context.args[0]) counters = context.bot_data['counters'] if key in counters: await update.message.reply_text(f"Counter {html_escape(key)} existiert bereits.", parse_mode=ParseMode.HTML) return counters[key] = 0 save_counters(counters) await update.message.reply_text(f"Counter {html_escape(key)} wurde angelegt (Wert 0).", parse_mode=ParseMode.HTML) async def remove_counter(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = update.effective_chat.id if not context.bot_data['access'](chat_id): return if not context.args: await update.message.reply_text("Bitte einen Counternamen angeben: /remove ") return key = norm_key(context.args[0]) counters = context.bot_data['counters'] if key not in counters: await update.message.reply_text(f"Counter {html_escape(key)} existiert nicht.", parse_mode=ParseMode.HTML) return del counters[key] save_counters(counters) await update.message.reply_text(f"Counter {html_escape(key)} wurde gelöscht.", parse_mode=ParseMode.HTML) async def increment_counter(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = update.effective_chat.id if not context.bot_data['access'](chat_id): logger.info("Access denied for chat %s on /increment", chat_id) try: await update.message.reply_text("Zugriff verweigert: Dieser Chat ist nicht freigeschaltet.") except Exception: pass return counters = context.bot_data['counters'] # If no argument: show inline keyboard with counters if not context.args: logger.debug("/increment without args in chat %s -> showing keyboard (%d counters)", chat_id, len(counters)) if not counters: await update.message.reply_text("Keine Counter vorhanden. Lege einen an mit /add .") return buttons = [] row: list = [] for idx, name in enumerate(sorted(counters.keys())): row.append(InlineKeyboardButton(text=f"{name} ({counters[name]})", callback_data=f"inc:{name}")) if len(row) == 3: buttons.append(row) row = [] if row: buttons.append(row) markup = InlineKeyboardMarkup(buttons) await update.message.reply_text( "Wähle einen Counter zum Inkrementieren:", reply_markup=markup ) return key = norm_key(context.args[0]) if key not in counters: esc = html_escape(key) await update.message.reply_text(f"Counter {esc} existiert nicht. Lege ihn mit /add {esc} an.", parse_mode=ParseMode.HTML) return counters[key] += 1 save_counters(counters) await update.message.reply_text(f"Counter {html_escape(key)} steht jetzt auf {counters[key]}.", parse_mode=ParseMode.HTML) async def handle_increment_button(update: Update, context: ContextTypes.DEFAULT_TYPE): """Handle callback button presses for incrementing counters.""" query = update.callback_query if not query: return await query.answer() data = query.data or "" if not data.startswith("inc:"): return name = data.split(":", 1)[1] counters = context.bot_data['counters'] chat_id = query.message.chat_id if query.message else None if chat_id is not None and not context.bot_data['access'](chat_id): logger.info("Access denied for chat %s on increment button %s", chat_id, name) return key = norm_key(name) if key not in counters: await query.edit_message_text(f"Counter {html_escape(key)} existiert nicht mehr.", parse_mode=ParseMode.HTML) return counters[key] += 1 save_counters(counters) logger.debug("Increment via button: %s -> %d", key, counters[key]) # Rebuild keyboard to reflect new values buttons = [] row: list = [] for idx, cname in enumerate(sorted(counters.keys())): row.append(InlineKeyboardButton(text=f"{cname} ({counters[cname]})", callback_data=f"inc:{cname}")) if len(row) == 3: buttons.append(row) row = [] if row: buttons.append(row) markup = InlineKeyboardMarkup(buttons) try: await query.edit_message_text( f"Counter {html_escape(key)} steht jetzt auf {counters[key]}. Wähle weiteren Counter:", parse_mode=ParseMode.HTML, reply_markup=markup, ) except Exception: # Fallback: send separate message if edit fails if chat_id is not None: await context.bot.send_message(chat_id, f"Counter {html_escape(key)} steht jetzt auf {counters[key]}.", parse_mode=ParseMode.HTML) async def decrement_counter(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = update.effective_chat.id if not context.bot_data['access'](chat_id): return if not context.args: await update.message.reply_text("Bitte einen Counternamen angeben: /decrement ") return key = norm_key(context.args[0]) counters = context.bot_data['counters'] if key not in counters: esc = html_escape(key) await update.message.reply_text(f"Counter {esc} existiert nicht. Lege ihn mit /add {esc} an.", parse_mode=ParseMode.HTML) return counters[key] -= 1 save_counters(counters) await update.message.reply_text(f"Counter {html_escape(key)} steht jetzt auf {counters[key]}.", parse_mode=ParseMode.HTML) async def list_counters(update: Update, context: ContextTypes.DEFAULT_TYPE): chat_id = update.effective_chat.id if not context.bot_data['access'](chat_id): return counters = context.bot_data['counters'] if not counters: await update.message.reply_text("Keine Counter vorhanden. Lege einen an mit /add .") return lines = ["Aktuelle Counter:"] for k in sorted(counters.keys()): lines.append(f"- {html_escape(k)}: {counters[k]}") await update.message.reply_text("\n".join(lines), parse_mode=ParseMode.HTML) async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text("Unbekannter Befehl. Nutze /help für eine Übersicht.") async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE): logger.error("Fehler während Update Verarbeitung", exc_info=context.error) # Optionale kurze Rückmeldung nur bei klassischen Nachrichten try: if isinstance(update, Update) and update.effective_chat: await context.bot.send_message(update.effective_chat.id, "⚠️ Interner Fehler beim Verarbeiten der Anfrage.") except Exception: pass # ---------------------- Inline Query ---------------------- async def inline_query(update: Update, context: ContextTypes.DEFAULT_TYPE): """Answer inline queries listing counters; selecting one increments it. Hinweis: Auswahl eines Counters inkrementiert ihn und sendet den neuen Wert. Usage: @DeinBot [filter] """ if update.inline_query is None: return query_text = (update.inline_query.query or "").strip().lower() counters: Dict[str, int] = context.bot_data.get('counters', {}) results = [] if not counters: results.append( InlineQueryResultArticle( id="_no_counters_", title="Keine Counter vorhanden", description="/add legt einen neuen Counter an", input_message_content=InputTextMessageContent( "Noch keine Counter vorhanden. Nutze /add ." ), ) ) else: # Erste Ergebniszeile ist ein Hinweis results.append( InlineQueryResultArticle( id="_hint_", title="Hinweis: Auswahl inkrementiert", description="Beim Antippen eines Counters wird er um 1 erhöht", input_message_content=InputTextMessageContent( "(Inline Hinweis) — Auswahl eines Counters erhöht ihn um 1." ), ) ) all_items = sorted(counters.items()) if query_text: all_items = [kv for kv in all_items if query_text in kv[0]] for name, value in all_items[:50]: new_val = value + 1 results.append( InlineQueryResultArticle( id=f"counter_{name}", title=f"{name} -> {new_val}", description=f"Aktuell: {value}; nach Auswahl: {new_val}", input_message_content=InputTextMessageContent( f"/increment {name}" ), ) ) if len(results) == 1: # nur Hinweis, keine Treffer results.append( InlineQueryResultArticle( id="_no_match_", title="Keine Treffer", description="Filter ändern oder neuen Counter anlegen", input_message_content=InputTextMessageContent( "Keine passenden Counter gefunden." ), ) ) try: await update.inline_query.answer(results, cache_time=0, is_personal=True) except Exception as e: logger.warning("Fehler beim Beantworten der Inline Query: %s", e) # ---------------------- Group Events ---------------------- async def new_members(update: Update, context: ContextTypes.DEFAULT_TYPE): """Send a ready message when the bot itself is added to a group or any new member joins (configurable).""" if update.message is None or not update.message.new_chat_members: return bot_user = (await context.bot.get_me()) announce_all = os.environ.get("ANNOUNCE_ALL_JOINS", "false").lower() in {"1", "true", "yes"} for member in update.message.new_chat_members: if member.id == bot_user.id: await update.message.reply_text("🤖 Bot ist bereit! Verwende /help für Befehle.") elif announce_all: await update.message.reply_text(f"Willkommen {member.full_name}! Tippe /help für Befehle.") async def debug_all_messages(update: Update, context: ContextTypes.DEFAULT_TYPE): if update.message: logger.debug("Empfangene Nachricht chat=%s user=%s text=%r", update.effective_chat.id, update.effective_user.id if update.effective_user else None, update.message.text) async def my_chat_member(update: Update, context: ContextTypes.DEFAULT_TYPE): # Fired when bot's status in a chat changes (e.g., added, removed, promoted) chat = update.effective_chat diff = update.my_chat_member if diff: logger.info("my_chat_member update chat=%s old=%s new=%s", chat.id if chat else None, diff.old_chat_member.status, diff.new_chat_member.status) # If bot was just added / became member try: if diff.new_chat_member.status in {"member", "administrator"}: await context.bot.send_message(chat.id, "🤖 Bot ist jetzt aktiv in diesem Chat. /help für Befehle.") except Exception as e: logger.warning("Fehler beim Senden der Aktiv-Nachricht: %s", e) # ---------------------- Setup & Main ---------------------- def setup_logging(level: str): numeric = getattr(logging, level.upper(), logging.INFO) logging.basicConfig(stream=sys.stdout, level=numeric, format='%(asctime)s %(levelname)s %(name)s: %(message)s') def init_counters(existing: Dict[str, int], config: Dict[str, Any]) -> Dict[str, int]: """Return existing counters or (optionally) seed initial ones. Seeding now only happens if BOTH conditions apply: 1) No existing counters file/content 2) Env USE_INITIAL_COUNTERS is truthy (1/true/yes) """ if existing: return existing if os.environ.get("USE_INITIAL_COUNTERS", "false").lower() not in {"1", "true", "yes"}: return {} initial = config.get('initial_counters') or {} normalized = {norm_key(k): int(v) for k, v in initial.items()} if normalized: save_counters(normalized) return normalized async def on_startup(app): logger.info("Bot gestartet und bereit.") announce_ids = os.environ.get("STARTUP_ANNOUNCE_CHAT_IDS") if announce_ids: ids = [] for raw in announce_ids.split(','): raw = raw.strip() if not raw: continue try: ids.append(int(raw)) except ValueError: logger.warning("Kann Chat ID %s nicht in int umwandeln", raw) if ids: me = await app.bot.get_me() for cid in ids: try: await app.bot.send_message(cid, f"🤖 {me.first_name} ist bereit. Nutze /help für Befehle.") except Exception as e: logger.warning("Konnte Startup-Nachricht an %s nicht senden: %s", cid, e) def main(): config = load_config() token = os.environ.get('BOT_TOKEN') or config.get('bot_token') if not token: print("Fehlendes Bot Token: Setze BOT_TOKEN env oder bot_token in config.yaml", file=sys.stderr) sys.exit(1) setup_logging(config.get('log_level', 'INFO')) counters = load_counters() counters = init_counters(counters, config) allowed_chat_ids = config.get('allowed_chat_ids') if allowed_chat_ids is not None: try: allowed_chat_ids = [int(x) for x in allowed_chat_ids] except Exception: logger.error("allowed_chat_ids in config.yaml müssen Ganzzahlen sein") allowed_chat_ids = None def access(chat_id: int) -> bool: if not is_allowed(chat_id, allowed_chat_ids): logger.info("Verweigerter Zugriff von Chat %s", chat_id) return False return True application = ApplicationBuilder().token(token).build() # bot_data shared state application.bot_data['counters'] = counters application.bot_data['access'] = access application.add_handler(CommandHandler("help", help_command)) application.add_handler(CommandHandler("add", add_counter)) application.add_handler(CommandHandler("remove", remove_counter)) application.add_handler(CommandHandler("increment", increment_counter)) application.add_handler(CallbackQueryHandler(handle_increment_button, pattern=r"^inc:")) application.add_handler(CommandHandler("decrement", decrement_counter)) application.add_handler(CommandHandler("list", list_counters)) # Group / membership events application.add_handler(MessageHandler(filters.StatusUpdate.NEW_CHAT_MEMBERS, new_members)) application.add_handler(ChatMemberHandler(my_chat_member, ChatMemberHandler.MY_CHAT_MEMBER)) application.add_handler(InlineQueryHandler(inline_query)) # Debug raw messages (placed last with low priority) application.add_handler(MessageHandler(filters.ALL, debug_all_messages), group=100) # Error handler application.add_error_handler(error_handler) # Startup callback application.post_init = on_startup application.run_polling( stop_signals=(signal.SIGINT, signal.SIGTERM), allowed_updates=["message", "chat_member", "my_chat_member", "inline_query", "callback_query"], ) if __name__ == '__main__': main()