337 lines
13 KiB
Python
337 lines
13 KiB
Python
import os
|
|
import json
|
|
import yaml
|
|
import logging
|
|
import signal
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, Any, Optional
|
|
|
|
from telegram import Update
|
|
from telegram.constants import ParseMode
|
|
from telegram.ext import (
|
|
ApplicationBuilder,
|
|
CommandHandler,
|
|
ContextTypes,
|
|
MessageHandler,
|
|
filters,
|
|
ChatMemberHandler,
|
|
)
|
|
|
|
DATA_DIR = Path(os.environ.get("DATA_DIR", "/data"))
|
|
COUNTERS_FILE = DATA_DIR / "counters.json"
|
|
CONFIG_FILE = Path(os.environ.get("CONFIG_FILE", "config.yaml"))
|
|
|
|
logger = logging.getLogger("counter_bot")
|
|
|
|
# ---------------------- Persistence Layer ----------------------
|
|
|
|
def load_config() -> Dict[str, Any]:
|
|
if not CONFIG_FILE.exists():
|
|
logger.warning("Konfigurationsdatei %s nicht gefunden. Nutze Umgebungsvariablen / Defaults.", CONFIG_FILE)
|
|
return {}
|
|
with CONFIG_FILE.open("r", encoding="utf-8") as f:
|
|
try:
|
|
return yaml.safe_load(f) or {}
|
|
except yaml.YAMLError as e:
|
|
logger.error("Fehler beim Lesen der Konfigurationsdatei: %s", e)
|
|
return {}
|
|
|
|
|
|
def load_counters() -> Dict[str, int]:
|
|
if not COUNTERS_FILE.exists():
|
|
return {}
|
|
try:
|
|
with COUNTERS_FILE.open("r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
if isinstance(data, dict):
|
|
# ensure int values
|
|
fixed = {}
|
|
for k, v in data.items():
|
|
try:
|
|
fixed[k] = int(v)
|
|
except (ValueError, TypeError):
|
|
logger.warning("Ungültiger Wert für Counter %s -> %s, setze 0", k, v)
|
|
fixed[k] = 0
|
|
return fixed
|
|
return {}
|
|
except json.JSONDecodeError:
|
|
logger.error("Konnte %s nicht lesen (JSON Fehler). Starte mit leerem Satz.", COUNTERS_FILE)
|
|
return {}
|
|
|
|
|
|
def atomic_write(path: Path, content: str) -> None:
|
|
tmp = path.with_suffix(path.suffix + ".tmp")
|
|
with tmp.open("w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
tmp.replace(path)
|
|
|
|
|
|
def save_counters(counters: Dict[str, int]) -> None:
|
|
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
|
atomic_write(COUNTERS_FILE, json.dumps(counters, ensure_ascii=False, sort_keys=True, indent=2))
|
|
|
|
# ---------------------- Access Control ----------------------
|
|
|
|
def is_allowed(chat_id: int, allowed_list: Optional[list]) -> bool:
|
|
if not allowed_list: # None oder leere Liste => alle erlaubt
|
|
return True
|
|
return chat_id in allowed_list
|
|
|
|
# ---------------------- Command Handlers ----------------------
|
|
|
|
HELP_TEXT = (
|
|
"Verfügbare Befehle:\n"
|
|
"/add <name> - Legt einen neuen Counter mit Wert 0 an\n"
|
|
"/remove <name> - Löscht einen Counter\n"
|
|
"/increment <name> - Erhöht den Counter um 1 und zeigt neuen Wert\n"
|
|
"/decrement <name> - Verringert den Counter um 1 und zeigt neuen Wert\n"
|
|
"/list - Listet alle Counter\n"
|
|
"/help - Zeigt diese Hilfe an"
|
|
)
|
|
|
|
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
await update.message.reply_text(HELP_TEXT)
|
|
|
|
|
|
def norm_key(raw: str) -> str:
|
|
return raw.strip().lower()
|
|
|
|
|
|
def html_escape(text: str) -> str:
|
|
return (text
|
|
.replace('&', '&')
|
|
.replace('<', '<')
|
|
.replace('>', '>')
|
|
)
|
|
|
|
|
|
async def add_counter(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
chat_id = update.effective_chat.id
|
|
if not context.bot_data['access'](chat_id):
|
|
return
|
|
if not context.args:
|
|
await update.message.reply_text("Bitte einen Counternamen angeben: /add <name>")
|
|
return
|
|
key = norm_key(context.args[0])
|
|
counters = context.bot_data['counters']
|
|
if key in counters:
|
|
await update.message.reply_text(f"Counter <b>{html_escape(key)}</b> existiert bereits.", parse_mode=ParseMode.HTML)
|
|
return
|
|
counters[key] = 0
|
|
save_counters(counters)
|
|
await update.message.reply_text(f"Counter <b>{html_escape(key)}</b> wurde angelegt (Wert 0).", parse_mode=ParseMode.HTML)
|
|
|
|
|
|
async def remove_counter(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
chat_id = update.effective_chat.id
|
|
if not context.bot_data['access'](chat_id):
|
|
return
|
|
if not context.args:
|
|
await update.message.reply_text("Bitte einen Counternamen angeben: /remove <name>")
|
|
return
|
|
key = norm_key(context.args[0])
|
|
counters = context.bot_data['counters']
|
|
if key not in counters:
|
|
await update.message.reply_text(f"Counter <b>{html_escape(key)}</b> existiert nicht.", parse_mode=ParseMode.HTML)
|
|
return
|
|
del counters[key]
|
|
save_counters(counters)
|
|
await update.message.reply_text(f"Counter <b>{html_escape(key)}</b> wurde gelöscht.", parse_mode=ParseMode.HTML)
|
|
|
|
|
|
async def increment_counter(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
chat_id = update.effective_chat.id
|
|
if not context.bot_data['access'](chat_id):
|
|
return
|
|
if not context.args:
|
|
await update.message.reply_text("Bitte einen Counternamen angeben: /increment <name>")
|
|
return
|
|
key = norm_key(context.args[0])
|
|
counters = context.bot_data['counters']
|
|
if key not in counters:
|
|
esc = html_escape(key)
|
|
await update.message.reply_text(f"Counter <b>{esc}</b> existiert nicht. Lege ihn mit /add {esc} an.", parse_mode=ParseMode.HTML)
|
|
return
|
|
counters[key] += 1
|
|
save_counters(counters)
|
|
await update.message.reply_text(f"Counter <b>{html_escape(key)}</b> steht jetzt auf <b>{counters[key]}</b>.", parse_mode=ParseMode.HTML)
|
|
|
|
|
|
async def decrement_counter(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
chat_id = update.effective_chat.id
|
|
if not context.bot_data['access'](chat_id):
|
|
return
|
|
if not context.args:
|
|
await update.message.reply_text("Bitte einen Counternamen angeben: /decrement <name>")
|
|
return
|
|
key = norm_key(context.args[0])
|
|
counters = context.bot_data['counters']
|
|
if key not in counters:
|
|
esc = html_escape(key)
|
|
await update.message.reply_text(f"Counter <b>{esc}</b> existiert nicht. Lege ihn mit /add {esc} an.", parse_mode=ParseMode.HTML)
|
|
return
|
|
counters[key] -= 1
|
|
save_counters(counters)
|
|
await update.message.reply_text(f"Counter <b>{html_escape(key)}</b> steht jetzt auf <b>{counters[key]}</b>.", parse_mode=ParseMode.HTML)
|
|
|
|
|
|
async def list_counters(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
chat_id = update.effective_chat.id
|
|
if not context.bot_data['access'](chat_id):
|
|
return
|
|
counters = context.bot_data['counters']
|
|
if not counters:
|
|
await update.message.reply_text("Keine Counter vorhanden. Lege einen an mit /add <name>.")
|
|
return
|
|
lines = ["<b>Aktuelle Counter:</b>"]
|
|
for k in sorted(counters.keys()):
|
|
lines.append(f"- <b>{html_escape(k)}</b>: {counters[k]}")
|
|
await update.message.reply_text("\n".join(lines), parse_mode=ParseMode.HTML)
|
|
|
|
|
|
async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
await update.message.reply_text("Unbekannter Befehl. Nutze /help für eine Übersicht.")
|
|
|
|
|
|
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE):
|
|
logger.error("Fehler während Update Verarbeitung", exc_info=context.error)
|
|
# Optionale kurze Rückmeldung nur bei klassischen Nachrichten
|
|
try:
|
|
if isinstance(update, Update) and update.effective_chat:
|
|
await context.bot.send_message(update.effective_chat.id, "⚠️ Interner Fehler beim Verarbeiten der Anfrage.")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
# ---------------------- Group Events ----------------------
|
|
|
|
async def new_members(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Send a ready message when the bot itself is added to a group or any new member joins (configurable)."""
|
|
if update.message is None or not update.message.new_chat_members:
|
|
return
|
|
bot_user = (await context.bot.get_me())
|
|
announce_all = os.environ.get("ANNOUNCE_ALL_JOINS", "false").lower() in {"1", "true", "yes"}
|
|
for member in update.message.new_chat_members:
|
|
if member.id == bot_user.id:
|
|
await update.message.reply_text("🤖 Bot ist bereit! Verwende /help für Befehle.")
|
|
elif announce_all:
|
|
await update.message.reply_text(f"Willkommen {member.full_name}! Tippe /help für Befehle.")
|
|
|
|
|
|
async def debug_all_messages(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
if update.message:
|
|
logger.debug("Empfangene Nachricht chat=%s user=%s text=%r", update.effective_chat.id, update.effective_user.id if update.effective_user else None, update.message.text)
|
|
|
|
|
|
async def my_chat_member(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
# Fired when bot's status in a chat changes (e.g., added, removed, promoted)
|
|
chat = update.effective_chat
|
|
diff = update.my_chat_member
|
|
if diff:
|
|
logger.info("my_chat_member update chat=%s old=%s new=%s", chat.id if chat else None, diff.old_chat_member.status, diff.new_chat_member.status)
|
|
# If bot was just added / became member
|
|
try:
|
|
if diff.new_chat_member.status in {"member", "administrator"}:
|
|
await context.bot.send_message(chat.id, "🤖 Bot ist jetzt aktiv in diesem Chat. /help für Befehle.")
|
|
except Exception as e:
|
|
logger.warning("Fehler beim Senden der Aktiv-Nachricht: %s", e)
|
|
|
|
# ---------------------- Setup & Main ----------------------
|
|
|
|
def setup_logging(level: str):
|
|
numeric = getattr(logging, level.upper(), logging.INFO)
|
|
logging.basicConfig(stream=sys.stdout, level=numeric, format='%(asctime)s %(levelname)s %(name)s: %(message)s')
|
|
|
|
|
|
def init_counters(existing: Dict[str, int], config: Dict[str, Any]) -> Dict[str, int]:
|
|
if existing:
|
|
return existing
|
|
initial = config.get('initial_counters') or {}
|
|
normalized = {norm_key(k): int(v) for k, v in initial.items()}
|
|
if normalized:
|
|
save_counters(normalized)
|
|
return normalized
|
|
|
|
async def on_startup(app):
|
|
logger.info("Bot gestartet und bereit.")
|
|
announce_ids = os.environ.get("STARTUP_ANNOUNCE_CHAT_IDS")
|
|
if announce_ids:
|
|
ids = []
|
|
for raw in announce_ids.split(','):
|
|
raw = raw.strip()
|
|
if not raw:
|
|
continue
|
|
try:
|
|
ids.append(int(raw))
|
|
except ValueError:
|
|
logger.warning("Kann Chat ID %s nicht in int umwandeln", raw)
|
|
if ids:
|
|
me = await app.bot.get_me()
|
|
for cid in ids:
|
|
try:
|
|
await app.bot.send_message(cid, f"🤖 {me.first_name} ist bereit. Nutze /help für Befehle.")
|
|
except Exception as e:
|
|
logger.warning("Konnte Startup-Nachricht an %s nicht senden: %s", cid, e)
|
|
|
|
|
|
def main():
|
|
config = load_config()
|
|
|
|
token = os.environ.get('BOT_TOKEN') or config.get('bot_token')
|
|
if not token:
|
|
print("Fehlendes Bot Token: Setze BOT_TOKEN env oder bot_token in config.yaml", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
setup_logging(config.get('log_level', 'INFO'))
|
|
|
|
counters = load_counters()
|
|
counters = init_counters(counters, config)
|
|
|
|
allowed_chat_ids = config.get('allowed_chat_ids')
|
|
if allowed_chat_ids is not None:
|
|
try:
|
|
allowed_chat_ids = [int(x) for x in allowed_chat_ids]
|
|
except Exception:
|
|
logger.error("allowed_chat_ids in config.yaml müssen Ganzzahlen sein")
|
|
allowed_chat_ids = None
|
|
|
|
def access(chat_id: int) -> bool:
|
|
if not is_allowed(chat_id, allowed_chat_ids):
|
|
logger.info("Verweigerter Zugriff von Chat %s", chat_id)
|
|
return False
|
|
return True
|
|
|
|
application = ApplicationBuilder().token(token).build()
|
|
|
|
# bot_data shared state
|
|
application.bot_data['counters'] = counters
|
|
application.bot_data['access'] = access
|
|
|
|
application.add_handler(CommandHandler("help", help_command))
|
|
application.add_handler(CommandHandler("add", add_counter))
|
|
application.add_handler(CommandHandler("remove", remove_counter))
|
|
application.add_handler(CommandHandler("increment", increment_counter))
|
|
application.add_handler(CommandHandler("decrement", decrement_counter))
|
|
application.add_handler(CommandHandler("list", list_counters))
|
|
# Group / membership events
|
|
application.add_handler(MessageHandler(filters.StatusUpdate.NEW_CHAT_MEMBERS, new_members))
|
|
application.add_handler(ChatMemberHandler(my_chat_member, ChatMemberHandler.MY_CHAT_MEMBER))
|
|
# Debug raw messages (placed last with low priority)
|
|
application.add_handler(MessageHandler(filters.ALL, debug_all_messages), group=100)
|
|
|
|
# Error handler
|
|
application.add_error_handler(error_handler)
|
|
|
|
# Startup callback
|
|
application.post_init = on_startup
|
|
|
|
application.run_polling(
|
|
stop_signals=(signal.SIGINT, signal.SIGTERM),
|
|
allowed_updates=["message", "chat_member", "my_chat_member"],
|
|
)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|