Files
GigalativBot/bot.py

481 lines
19 KiB
Python

import os
import json
import yaml
import logging
import signal
import sys
from pathlib import Path
from typing import Dict, Any, Optional
from telegram import (
Update,
InlineQueryResultArticle,
InputTextMessageContent,
InlineKeyboardButton,
InlineKeyboardMarkup,
)
from telegram.constants import ParseMode
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
ContextTypes,
MessageHandler,
filters,
ChatMemberHandler,
InlineQueryHandler,
CallbackQueryHandler,
)
DATA_DIR = Path(os.environ.get("DATA_DIR", "/data"))
COUNTERS_FILE = DATA_DIR / "counters.json"
CONFIG_FILE = Path(os.environ.get("CONFIG_FILE", "config.yaml"))
logger = logging.getLogger("counter_bot")
# ---------------------- Persistence Layer ----------------------
def load_config() -> Dict[str, Any]:
if not CONFIG_FILE.exists():
logger.warning("Konfigurationsdatei %s nicht gefunden. Nutze Umgebungsvariablen / Defaults.", CONFIG_FILE)
return {}
with CONFIG_FILE.open("r", encoding="utf-8") as f:
try:
return yaml.safe_load(f) or {}
except yaml.YAMLError as e:
logger.error("Fehler beim Lesen der Konfigurationsdatei: %s", e)
return {}
def load_counters() -> Dict[str, int]:
if not COUNTERS_FILE.exists():
return {}
try:
with COUNTERS_FILE.open("r", encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, dict):
# ensure int values
fixed = {}
for k, v in data.items():
try:
fixed[k] = int(v)
except (ValueError, TypeError):
logger.warning("Ungültiger Wert für Counter %s -> %s, setze 0", k, v)
fixed[k] = 0
return fixed
return {}
except json.JSONDecodeError:
logger.error("Konnte %s nicht lesen (JSON Fehler). Starte mit leerem Satz.", COUNTERS_FILE)
return {}
def atomic_write(path: Path, content: str) -> None:
tmp = path.with_suffix(path.suffix + ".tmp")
with tmp.open("w", encoding="utf-8") as f:
f.write(content)
tmp.replace(path)
def save_counters(counters: Dict[str, int]) -> None:
DATA_DIR.mkdir(parents=True, exist_ok=True)
atomic_write(COUNTERS_FILE, json.dumps(counters, ensure_ascii=False, sort_keys=True, indent=2))
# ---------------------- Access Control ----------------------
def is_allowed(chat_id: int, allowed_list: Optional[list]) -> bool:
if not allowed_list: # None oder leere Liste => alle erlaubt
return True
return chat_id in allowed_list
# ---------------------- Command Handlers ----------------------
HELP_TEXT = (
"Verfügbare Befehle:\n"
"/add <name> - Legt einen neuen Counter mit Wert 0 an\n"
"/remove <name> - Löscht einen Counter\n"
"/increment <name> - Erhöht den Counter um 1 und zeigt neuen Wert\n"
"/decrement <name> - Verringert den Counter um 1 und zeigt neuen Wert\n"
"/list - Listet alle Counter\n"
"/help - Zeigt diese Hilfe an"
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text(HELP_TEXT)
def norm_key(raw: str) -> str:
return raw.strip().lower()
def html_escape(text: str) -> str:
return (text
.replace('&', '&amp;')
.replace('<', '&lt;')
.replace('>', '&gt;')
)
async def add_counter(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
if not context.bot_data['access'](chat_id):
return
if not context.args:
await update.message.reply_text("Bitte einen Counternamen angeben: /add <name>")
return
key = norm_key(context.args[0])
counters = context.bot_data['counters']
if key in counters:
await update.message.reply_text(f"Counter <b>{html_escape(key)}</b> existiert bereits.", parse_mode=ParseMode.HTML)
return
counters[key] = 0
save_counters(counters)
await update.message.reply_text(f"Counter <b>{html_escape(key)}</b> wurde angelegt (Wert 0).", parse_mode=ParseMode.HTML)
async def remove_counter(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
if not context.bot_data['access'](chat_id):
return
if not context.args:
await update.message.reply_text("Bitte einen Counternamen angeben: /remove <name>")
return
key = norm_key(context.args[0])
counters = context.bot_data['counters']
if key not in counters:
await update.message.reply_text(f"Counter <b>{html_escape(key)}</b> existiert nicht.", parse_mode=ParseMode.HTML)
return
del counters[key]
save_counters(counters)
await update.message.reply_text(f"Counter <b>{html_escape(key)}</b> wurde gelöscht.", parse_mode=ParseMode.HTML)
async def increment_counter(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
if not context.bot_data['access'](chat_id):
return
counters = context.bot_data['counters']
# If no argument: show inline keyboard with counters
if not context.args:
if not counters:
await update.message.reply_text("Keine Counter vorhanden. Lege einen an mit /add <name>.")
return
buttons = []
row: list = []
for idx, name in enumerate(sorted(counters.keys())):
row.append(InlineKeyboardButton(text=f"{name} ({counters[name]})", callback_data=f"inc:{name}"))
if len(row) == 3:
buttons.append(row)
row = []
if row:
buttons.append(row)
markup = InlineKeyboardMarkup(buttons)
await update.message.reply_text(
"Wähle einen Counter zum Inkrementieren:", reply_markup=markup
)
return
key = norm_key(context.args[0])
if key not in counters:
esc = html_escape(key)
await update.message.reply_text(f"Counter <b>{esc}</b> existiert nicht. Lege ihn mit /add {esc} an.", parse_mode=ParseMode.HTML)
return
counters[key] += 1
save_counters(counters)
await update.message.reply_text(f"Counter <b>{html_escape(key)}</b> steht jetzt auf <b>{counters[key]}</b>.", parse_mode=ParseMode.HTML)
async def handle_increment_button(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle callback button presses for incrementing counters."""
query = update.callback_query
if not query:
return
await query.answer()
data = query.data or ""
if not data.startswith("inc:"):
return
name = data.split(":", 1)[1]
counters = context.bot_data['counters']
chat_id = query.message.chat_id if query.message else None
if chat_id is not None and not context.bot_data['access'](chat_id):
return
key = norm_key(name)
if key not in counters:
await query.edit_message_text(f"Counter <b>{html_escape(key)}</b> existiert nicht mehr.", parse_mode=ParseMode.HTML)
return
counters[key] += 1
save_counters(counters)
# Rebuild keyboard to reflect new values
buttons = []
row: list = []
for idx, cname in enumerate(sorted(counters.keys())):
row.append(InlineKeyboardButton(text=f"{cname} ({counters[cname]})", callback_data=f"inc:{cname}"))
if len(row) == 3:
buttons.append(row)
row = []
if row:
buttons.append(row)
markup = InlineKeyboardMarkup(buttons)
try:
await query.edit_message_text(
f"Counter <b>{html_escape(key)}</b> steht jetzt auf <b>{counters[key]}</b>. Wähle weiteren Counter:",
parse_mode=ParseMode.HTML,
reply_markup=markup,
)
except Exception:
# Fallback: send separate message if edit fails
if chat_id is not None:
await context.bot.send_message(chat_id, f"Counter <b>{html_escape(key)}</b> steht jetzt auf <b>{counters[key]}</b>.", parse_mode=ParseMode.HTML)
async def decrement_counter(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
if not context.bot_data['access'](chat_id):
return
if not context.args:
await update.message.reply_text("Bitte einen Counternamen angeben: /decrement <name>")
return
key = norm_key(context.args[0])
counters = context.bot_data['counters']
if key not in counters:
esc = html_escape(key)
await update.message.reply_text(f"Counter <b>{esc}</b> existiert nicht. Lege ihn mit /add {esc} an.", parse_mode=ParseMode.HTML)
return
counters[key] -= 1
save_counters(counters)
await update.message.reply_text(f"Counter <b>{html_escape(key)}</b> steht jetzt auf <b>{counters[key]}</b>.", parse_mode=ParseMode.HTML)
async def list_counters(update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
if not context.bot_data['access'](chat_id):
return
counters = context.bot_data['counters']
if not counters:
await update.message.reply_text("Keine Counter vorhanden. Lege einen an mit /add <name>.")
return
lines = ["<b>Aktuelle Counter:</b>"]
for k in sorted(counters.keys()):
lines.append(f"- <b>{html_escape(k)}</b>: {counters[k]}")
await update.message.reply_text("\n".join(lines), parse_mode=ParseMode.HTML)
async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("Unbekannter Befehl. Nutze /help für eine Übersicht.")
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE):
logger.error("Fehler während Update Verarbeitung", exc_info=context.error)
# Optionale kurze Rückmeldung nur bei klassischen Nachrichten
try:
if isinstance(update, Update) and update.effective_chat:
await context.bot.send_message(update.effective_chat.id, "⚠️ Interner Fehler beim Verarbeiten der Anfrage.")
except Exception:
pass
# ---------------------- Inline Query ----------------------
async def inline_query(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Answer inline queries listing counters; selecting one increments it.
Hinweis: Auswahl eines Counters inkrementiert ihn und sendet den neuen Wert.
Usage: @DeinBot [filter]
"""
if update.inline_query is None:
return
query_text = (update.inline_query.query or "").strip().lower()
counters: Dict[str, int] = context.bot_data.get('counters', {})
results = []
if not counters:
results.append(
InlineQueryResultArticle(
id="_no_counters_",
title="Keine Counter vorhanden",
description="/add <name> legt einen neuen Counter an",
input_message_content=InputTextMessageContent(
"Noch keine Counter vorhanden. Nutze /add <name>."
),
)
)
else:
# Erste Ergebniszeile ist ein Hinweis
results.append(
InlineQueryResultArticle(
id="_hint_",
title="Hinweis: Auswahl inkrementiert",
description="Beim Antippen eines Counters wird er um 1 erhöht",
input_message_content=InputTextMessageContent(
"(Inline Hinweis) — Auswahl eines Counters erhöht ihn um 1."
),
)
)
all_items = sorted(counters.items())
if query_text:
all_items = [kv for kv in all_items if query_text in kv[0]]
for name, value in all_items[:50]:
new_val = value + 1
results.append(
InlineQueryResultArticle(
id=f"counter_{name}",
title=f"{name} -> {new_val}",
description=f"Aktuell: {value}; nach Auswahl: {new_val}",
input_message_content=InputTextMessageContent(
f"/increment {name}"
),
)
)
if len(results) == 1: # nur Hinweis, keine Treffer
results.append(
InlineQueryResultArticle(
id="_no_match_",
title="Keine Treffer",
description="Filter ändern oder neuen Counter anlegen",
input_message_content=InputTextMessageContent(
"Keine passenden Counter gefunden."
),
)
)
try:
await update.inline_query.answer(results, cache_time=0, is_personal=True)
except Exception as e:
logger.warning("Fehler beim Beantworten der Inline Query: %s", e)
# ---------------------- Group Events ----------------------
async def new_members(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Send a ready message when the bot itself is added to a group or any new member joins (configurable)."""
if update.message is None or not update.message.new_chat_members:
return
bot_user = (await context.bot.get_me())
announce_all = os.environ.get("ANNOUNCE_ALL_JOINS", "false").lower() in {"1", "true", "yes"}
for member in update.message.new_chat_members:
if member.id == bot_user.id:
await update.message.reply_text("🤖 Bot ist bereit! Verwende /help für Befehle.")
elif announce_all:
await update.message.reply_text(f"Willkommen {member.full_name}! Tippe /help für Befehle.")
async def debug_all_messages(update: Update, context: ContextTypes.DEFAULT_TYPE):
if update.message:
logger.debug("Empfangene Nachricht chat=%s user=%s text=%r", update.effective_chat.id, update.effective_user.id if update.effective_user else None, update.message.text)
async def my_chat_member(update: Update, context: ContextTypes.DEFAULT_TYPE):
# Fired when bot's status in a chat changes (e.g., added, removed, promoted)
chat = update.effective_chat
diff = update.my_chat_member
if diff:
logger.info("my_chat_member update chat=%s old=%s new=%s", chat.id if chat else None, diff.old_chat_member.status, diff.new_chat_member.status)
# If bot was just added / became member
try:
if diff.new_chat_member.status in {"member", "administrator"}:
await context.bot.send_message(chat.id, "🤖 Bot ist jetzt aktiv in diesem Chat. /help für Befehle.")
except Exception as e:
logger.warning("Fehler beim Senden der Aktiv-Nachricht: %s", e)
# ---------------------- Setup & Main ----------------------
def setup_logging(level: str):
numeric = getattr(logging, level.upper(), logging.INFO)
logging.basicConfig(stream=sys.stdout, level=numeric, format='%(asctime)s %(levelname)s %(name)s: %(message)s')
def init_counters(existing: Dict[str, int], config: Dict[str, Any]) -> Dict[str, int]:
"""Return existing counters or (optionally) seed initial ones.
Seeding now only happens if BOTH conditions apply:
1) No existing counters file/content
2) Env USE_INITIAL_COUNTERS is truthy (1/true/yes)
"""
if existing:
return existing
if os.environ.get("USE_INITIAL_COUNTERS", "false").lower() not in {"1", "true", "yes"}:
return {}
initial = config.get('initial_counters') or {}
normalized = {norm_key(k): int(v) for k, v in initial.items()}
if normalized:
save_counters(normalized)
return normalized
async def on_startup(app):
logger.info("Bot gestartet und bereit.")
announce_ids = os.environ.get("STARTUP_ANNOUNCE_CHAT_IDS")
if announce_ids:
ids = []
for raw in announce_ids.split(','):
raw = raw.strip()
if not raw:
continue
try:
ids.append(int(raw))
except ValueError:
logger.warning("Kann Chat ID %s nicht in int umwandeln", raw)
if ids:
me = await app.bot.get_me()
for cid in ids:
try:
await app.bot.send_message(cid, f"🤖 {me.first_name} ist bereit. Nutze /help für Befehle.")
except Exception as e:
logger.warning("Konnte Startup-Nachricht an %s nicht senden: %s", cid, e)
def main():
config = load_config()
token = os.environ.get('BOT_TOKEN') or config.get('bot_token')
if not token:
print("Fehlendes Bot Token: Setze BOT_TOKEN env oder bot_token in config.yaml", file=sys.stderr)
sys.exit(1)
setup_logging(config.get('log_level', 'INFO'))
counters = load_counters()
counters = init_counters(counters, config)
allowed_chat_ids = config.get('allowed_chat_ids')
if allowed_chat_ids is not None:
try:
allowed_chat_ids = [int(x) for x in allowed_chat_ids]
except Exception:
logger.error("allowed_chat_ids in config.yaml müssen Ganzzahlen sein")
allowed_chat_ids = None
def access(chat_id: int) -> bool:
if not is_allowed(chat_id, allowed_chat_ids):
logger.info("Verweigerter Zugriff von Chat %s", chat_id)
return False
return True
application = ApplicationBuilder().token(token).build()
# bot_data shared state
application.bot_data['counters'] = counters
application.bot_data['access'] = access
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("add", add_counter))
application.add_handler(CommandHandler("remove", remove_counter))
application.add_handler(CommandHandler("increment", increment_counter))
application.add_handler(CallbackQueryHandler(handle_increment_button, pattern=r"^inc:"))
application.add_handler(CommandHandler("decrement", decrement_counter))
application.add_handler(CommandHandler("list", list_counters))
# Group / membership events
application.add_handler(MessageHandler(filters.StatusUpdate.NEW_CHAT_MEMBERS, new_members))
application.add_handler(ChatMemberHandler(my_chat_member, ChatMemberHandler.MY_CHAT_MEMBER))
application.add_handler(InlineQueryHandler(inline_query))
# Debug raw messages (placed last with low priority)
application.add_handler(MessageHandler(filters.ALL, debug_all_messages), group=100)
# Error handler
application.add_error_handler(error_handler)
# Startup callback
application.post_init = on_startup
application.run_polling(
stop_signals=(signal.SIGINT, signal.SIGTERM),
allowed_updates=["message", "chat_member", "my_chat_member", "inline_query", "callback_query"],
)
if __name__ == '__main__':
main()