471 lines
19 KiB
Python
471 lines
19 KiB
Python
import os
|
|
import json
|
|
import yaml
|
|
import logging
|
|
import signal
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, Any, Optional
|
|
|
|
from telegram import (
|
|
Update,
|
|
InlineQueryResultArticle,
|
|
InputTextMessageContent,
|
|
InlineKeyboardButton,
|
|
InlineKeyboardMarkup,
|
|
)
|
|
from telegram.constants import ParseMode
|
|
from telegram.ext import (
|
|
ApplicationBuilder,
|
|
CommandHandler,
|
|
ContextTypes,
|
|
MessageHandler,
|
|
filters,
|
|
ChatMemberHandler,
|
|
InlineQueryHandler,
|
|
CallbackQueryHandler,
|
|
)
|
|
|
|
DATA_DIR = Path(os.environ.get("DATA_DIR", "/data"))
|
|
COUNTERS_FILE = DATA_DIR / "counters.json"
|
|
CONFIG_FILE = Path(os.environ.get("CONFIG_FILE", "config.yaml"))
|
|
|
|
logger = logging.getLogger("counter_bot")
|
|
|
|
# ---------------------- Persistence Layer ----------------------
|
|
|
|
def load_config() -> Dict[str, Any]:
|
|
if not CONFIG_FILE.exists():
|
|
logger.warning("Konfigurationsdatei %s nicht gefunden. Nutze Umgebungsvariablen / Defaults.", CONFIG_FILE)
|
|
return {}
|
|
with CONFIG_FILE.open("r", encoding="utf-8") as f:
|
|
try:
|
|
return yaml.safe_load(f) or {}
|
|
except yaml.YAMLError as e:
|
|
logger.error("Fehler beim Lesen der Konfigurationsdatei: %s", e)
|
|
return {}
|
|
|
|
|
|
def load_counters() -> Dict[str, int]:
|
|
if not COUNTERS_FILE.exists():
|
|
return {}
|
|
try:
|
|
with COUNTERS_FILE.open("r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
if isinstance(data, dict):
|
|
# ensure int values
|
|
fixed = {}
|
|
for k, v in data.items():
|
|
try:
|
|
fixed[k] = int(v)
|
|
except (ValueError, TypeError):
|
|
logger.warning("Ungültiger Wert für Counter %s -> %s, setze 0", k, v)
|
|
fixed[k] = 0
|
|
return fixed
|
|
return {}
|
|
except json.JSONDecodeError:
|
|
logger.error("Konnte %s nicht lesen (JSON Fehler). Starte mit leerem Satz.", COUNTERS_FILE)
|
|
return {}
|
|
|
|
|
|
def atomic_write(path: Path, content: str) -> None:
|
|
tmp = path.with_suffix(path.suffix + ".tmp")
|
|
with tmp.open("w", encoding="utf-8") as f:
|
|
f.write(content)
|
|
tmp.replace(path)
|
|
|
|
|
|
def save_counters(counters: Dict[str, int]) -> None:
|
|
DATA_DIR.mkdir(parents=True, exist_ok=True)
|
|
atomic_write(COUNTERS_FILE, json.dumps(counters, ensure_ascii=False, sort_keys=True, indent=2))
|
|
|
|
# ---------------------- Access Control ----------------------
|
|
|
|
def is_allowed(chat_id: int, allowed_list: Optional[list]) -> bool:
|
|
if not allowed_list: # None oder leere Liste => alle erlaubt
|
|
return True
|
|
return chat_id in allowed_list
|
|
|
|
# ---------------------- Command Handlers ----------------------
|
|
|
|
HELP_TEXT = (
|
|
"Verfügbare Befehle:\n"
|
|
"/add <name> - Legt einen neuen Counter mit Wert 0 an\n"
|
|
"/remove <name> - Löscht einen Counter\n"
|
|
"/increment <name> - Erhöht den Counter um 1 und zeigt neuen Wert\n"
|
|
"/decrement <name> - Verringert den Counter um 1 und zeigt neuen Wert\n"
|
|
"/list - Listet alle Counter\n"
|
|
"/help - Zeigt diese Hilfe an"
|
|
)
|
|
|
|
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
await update.message.reply_text(HELP_TEXT)
|
|
|
|
|
|
def norm_key(raw: str) -> str:
|
|
return raw.strip().lower()
|
|
|
|
|
|
def html_escape(text: str) -> str:
|
|
return (text
|
|
.replace('&', '&')
|
|
.replace('<', '<')
|
|
.replace('>', '>')
|
|
)
|
|
|
|
|
|
async def add_counter(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
chat_id = update.effective_chat.id
|
|
if not context.bot_data['access'](chat_id):
|
|
return
|
|
if not context.args:
|
|
await update.message.reply_text("Bitte einen Counternamen angeben: /add <name>")
|
|
return
|
|
key = norm_key(context.args[0])
|
|
counters = context.bot_data['counters']
|
|
if key in counters:
|
|
await update.message.reply_text(f"Counter <b>{html_escape(key)}</b> existiert bereits.", parse_mode=ParseMode.HTML)
|
|
return
|
|
counters[key] = 0
|
|
save_counters(counters)
|
|
await update.message.reply_text(f"Counter <b>{html_escape(key)}</b> wurde angelegt (Wert 0).", parse_mode=ParseMode.HTML)
|
|
|
|
|
|
async def remove_counter(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
chat_id = update.effective_chat.id
|
|
if not context.bot_data['access'](chat_id):
|
|
return
|
|
if not context.args:
|
|
await update.message.reply_text("Bitte einen Counternamen angeben: /remove <name>")
|
|
return
|
|
key = norm_key(context.args[0])
|
|
counters = context.bot_data['counters']
|
|
if key not in counters:
|
|
await update.message.reply_text(f"Counter <b>{html_escape(key)}</b> existiert nicht.", parse_mode=ParseMode.HTML)
|
|
return
|
|
del counters[key]
|
|
save_counters(counters)
|
|
await update.message.reply_text(f"Counter <b>{html_escape(key)}</b> wurde gelöscht.", parse_mode=ParseMode.HTML)
|
|
|
|
|
|
async def increment_counter(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
chat_id = update.effective_chat.id
|
|
if not context.bot_data['access'](chat_id):
|
|
logger.info("Access denied for chat %s on /increment", chat_id)
|
|
try:
|
|
await update.message.reply_text("Zugriff verweigert: Dieser Chat ist nicht freigeschaltet.")
|
|
except Exception:
|
|
pass
|
|
return
|
|
counters = context.bot_data['counters']
|
|
# If no argument: show inline keyboard with counters
|
|
if not context.args:
|
|
logger.debug("/increment without args in chat %s -> showing keyboard (%d counters)", chat_id, len(counters))
|
|
if not counters:
|
|
await update.message.reply_text("Keine Counter vorhanden. Lege einen an mit /add <name>.")
|
|
return
|
|
buttons = []
|
|
row: list = []
|
|
for idx, name in enumerate(sorted(counters.keys())):
|
|
row.append(InlineKeyboardButton(text=f"{name} ({counters[name]})", callback_data=f"inc:{name}"))
|
|
if len(row) == 3:
|
|
buttons.append(row)
|
|
row = []
|
|
if row:
|
|
buttons.append(row)
|
|
markup = InlineKeyboardMarkup(buttons)
|
|
await update.message.reply_text(
|
|
"Wähle einen Counter zum Inkrementieren:", reply_markup=markup
|
|
)
|
|
return
|
|
key = norm_key(context.args[0])
|
|
if key not in counters:
|
|
esc = html_escape(key)
|
|
await update.message.reply_text(f"Counter <b>{esc}</b> existiert nicht. Lege ihn mit /add {esc} an.", parse_mode=ParseMode.HTML)
|
|
return
|
|
counters[key] += 1
|
|
save_counters(counters)
|
|
await update.message.reply_text(f"Counter <b>{html_escape(key)}</b> steht jetzt auf <b>{counters[key]}</b>.", parse_mode=ParseMode.HTML)
|
|
|
|
|
|
async def handle_increment_button(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Handle callback button presses for incrementing counters."""
|
|
query = update.callback_query
|
|
if not query:
|
|
return
|
|
await query.answer()
|
|
data = query.data or ""
|
|
if not data.startswith("inc:"):
|
|
return
|
|
name = data.split(":", 1)[1]
|
|
counters = context.bot_data['counters']
|
|
chat_id = query.message.chat_id if query.message else None
|
|
if chat_id is not None and not context.bot_data['access'](chat_id):
|
|
logger.info("Access denied for chat %s on increment button %s", chat_id, name)
|
|
return
|
|
key = norm_key(name)
|
|
if key not in counters:
|
|
await query.edit_message_text(f"Counter <b>{html_escape(key)}</b> existiert nicht mehr.", parse_mode=ParseMode.HTML)
|
|
return
|
|
counters[key] += 1
|
|
save_counters(counters)
|
|
logger.debug("Increment via button: %s -> %d", key, counters[key])
|
|
# Rebuild keyboard to reflect new values
|
|
buttons = []
|
|
row: list = []
|
|
for idx, cname in enumerate(sorted(counters.keys())):
|
|
row.append(InlineKeyboardButton(text=f"{cname} ({counters[cname]})", callback_data=f"inc:{cname}"))
|
|
if len(row) == 3:
|
|
buttons.append(row)
|
|
row = []
|
|
if row:
|
|
buttons.append(row)
|
|
markup = InlineKeyboardMarkup(buttons)
|
|
try:
|
|
await query.edit_message_text(
|
|
f"Counter <b>{html_escape(key)}</b> steht jetzt auf <b>{counters[key]}</b>. Wähle weiteren Counter:",
|
|
parse_mode=ParseMode.HTML,
|
|
reply_markup=markup,
|
|
)
|
|
except Exception:
|
|
# Fallback: send separate message if edit fails
|
|
if chat_id is not None:
|
|
await context.bot.send_message(chat_id, f"Counter <b>{html_escape(key)}</b> steht jetzt auf <b>{counters[key]}</b>.", parse_mode=ParseMode.HTML)
|
|
|
|
|
|
async def decrement_counter(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
chat_id = update.effective_chat.id
|
|
if not context.bot_data['access'](chat_id):
|
|
return
|
|
if not context.args:
|
|
await update.message.reply_text("Bitte einen Counternamen angeben: /decrement <name>")
|
|
return
|
|
key = norm_key(context.args[0])
|
|
counters = context.bot_data['counters']
|
|
if key not in counters:
|
|
esc = html_escape(key)
|
|
await update.message.reply_text(f"Counter <b>{esc}</b> existiert nicht. Lege ihn mit /add {esc} an.", parse_mode=ParseMode.HTML)
|
|
return
|
|
counters[key] -= 1
|
|
save_counters(counters)
|
|
await update.message.reply_text(f"Counter <b>{html_escape(key)}</b> steht jetzt auf <b>{counters[key]}</b>.", parse_mode=ParseMode.HTML)
|
|
|
|
|
|
async def list_counters(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
chat_id = update.effective_chat.id
|
|
if not context.bot_data['access'](chat_id):
|
|
return
|
|
counters = context.bot_data['counters']
|
|
if not counters:
|
|
await update.message.reply_text("Keine Counter vorhanden. Lege einen an mit /add <name>.")
|
|
return
|
|
lines = ["<b>Aktuelle Counter:</b>"]
|
|
for k in sorted(counters.keys()):
|
|
lines.append(f"- <b>{html_escape(k)}</b>: {counters[k]}")
|
|
await update.message.reply_text("\n".join(lines), parse_mode=ParseMode.HTML)
|
|
|
|
|
|
async def unknown(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
await update.message.reply_text("Unbekannter Befehl. Nutze /help für eine Übersicht.")
|
|
|
|
|
|
async def error_handler(update: object, context: ContextTypes.DEFAULT_TYPE):
|
|
logger.error("Fehler während Update Verarbeitung", exc_info=context.error)
|
|
# Optionale kurze Rückmeldung nur bei klassischen Nachrichten
|
|
try:
|
|
if isinstance(update, Update) and update.effective_chat:
|
|
await context.bot.send_message(update.effective_chat.id, "⚠️ Interner Fehler beim Verarbeiten der Anfrage.")
|
|
except Exception:
|
|
pass
|
|
|
|
# ---------------------- Inline Query ----------------------
|
|
|
|
async def inline_query(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Answer inline queries listing counters; selecting one increments it.
|
|
|
|
Hinweis: Auswahl eines Counters inkrementiert ihn und sendet den neuen Wert.
|
|
Usage: @DeinBot [filter]
|
|
"""
|
|
if update.inline_query is None:
|
|
return
|
|
query_text = (update.inline_query.query or "").strip().lower()
|
|
counters: Dict[str, int] = context.bot_data.get('counters', {})
|
|
results = []
|
|
if not counters:
|
|
results.append(
|
|
InlineQueryResultArticle(
|
|
id="_no_counters_",
|
|
title="Keine Counter vorhanden",
|
|
description="/add <name> legt einen neuen Counter an",
|
|
input_message_content=InputTextMessageContent(
|
|
"Noch keine Counter vorhanden. Nutze /add <name>."
|
|
),
|
|
)
|
|
)
|
|
else:
|
|
# Erste Ergebniszeile ist ein Hinweis
|
|
results.append(
|
|
InlineQueryResultArticle(
|
|
id="_hint_",
|
|
title="Hinweis: Auswahl inkrementiert",
|
|
description="Beim Antippen eines Counters wird er um 1 erhöht",
|
|
input_message_content=InputTextMessageContent(
|
|
"(Inline Hinweis) — Auswahl eines Counters erhöht ihn um 1."
|
|
),
|
|
)
|
|
)
|
|
all_items = sorted(counters.items())
|
|
if query_text:
|
|
all_items = [kv for kv in all_items if query_text in kv[0]]
|
|
for name, value in all_items[:50]:
|
|
new_val = value + 1
|
|
results.append(
|
|
InlineQueryResultArticle(
|
|
id=f"counter_{name}",
|
|
title=f"{name} -> {new_val}",
|
|
description=f"Aktuell: {value}; nach Auswahl: {new_val}",
|
|
input_message_content=InputTextMessageContent(
|
|
f"/increment {name}"
|
|
),
|
|
)
|
|
)
|
|
if len(results) == 1: # nur Hinweis, keine Treffer
|
|
results.append(
|
|
InlineQueryResultArticle(
|
|
id="_no_match_",
|
|
title="Keine Treffer",
|
|
description="Filter ändern oder neuen Counter anlegen",
|
|
input_message_content=InputTextMessageContent(
|
|
"Keine passenden Counter gefunden."
|
|
),
|
|
)
|
|
)
|
|
try:
|
|
await update.inline_query.answer(results, cache_time=0, is_personal=True)
|
|
except Exception as e:
|
|
logger.warning("Fehler beim Beantworten der Inline Query: %s", e)
|
|
|
|
|
|
# ---------------------- Group Events ----------------------
|
|
|
|
async def new_members(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
"""Send a ready message when the bot itself is added to a group or any new member joins (configurable)."""
|
|
if update.message is None or not update.message.new_chat_members:
|
|
return
|
|
bot_user = (await context.bot.get_me())
|
|
announce_all = os.environ.get("ANNOUNCE_ALL_JOINS", "false").lower() in {"1", "true", "yes"}
|
|
for member in update.message.new_chat_members:
|
|
if member.id == bot_user.id:
|
|
await update.message.reply_text("🤖 Bot ist bereit! Verwende /help für Befehle.")
|
|
elif announce_all:
|
|
await update.message.reply_text(f"Willkommen {member.full_name}! Tippe /help für Befehle.")
|
|
|
|
|
|
async def debug_all_messages(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
if update.message:
|
|
logger.debug("Empfangene Nachricht chat=%s user=%s text=%r", update.effective_chat.id, update.effective_user.id if update.effective_user else None, update.message.text)
|
|
|
|
|
|
async def my_chat_member(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
# Fired when bot's status in a chat changes (e.g., added, removed, promoted)
|
|
chat = update.effective_chat
|
|
diff = update.my_chat_member
|
|
if diff:
|
|
logger.info("my_chat_member update chat=%s old=%s new=%s", chat.id if chat else None, diff.old_chat_member.status, diff.new_chat_member.status)
|
|
# If bot was just added / became member
|
|
try:
|
|
if diff.new_chat_member.status in {"member", "administrator"}:
|
|
await context.bot.send_message(chat.id, "🤖 Bot ist jetzt aktiv in diesem Chat. /help für Befehle.")
|
|
except Exception as e:
|
|
logger.warning("Fehler beim Senden der Aktiv-Nachricht: %s", e)
|
|
|
|
# ---------------------- Setup & Main ----------------------
|
|
|
|
def setup_logging(level: str):
|
|
numeric = getattr(logging, level.upper(), logging.INFO)
|
|
logging.basicConfig(stream=sys.stdout, level=numeric, format='%(asctime)s %(levelname)s %(name)s: %(message)s')
|
|
|
|
|
|
def init_counters(existing: Dict[str, int], config: Dict[str, Any]) -> Dict[str, int]:
|
|
"""Return existing counters or (optionally) seed initial ones.
|
|
|
|
Seeding now only happens if BOTH conditions apply:
|
|
1) No existing counters file/content
|
|
2) Env USE_INITIAL_COUNTERS is truthy (1/true/yes)
|
|
"""
|
|
if existing:
|
|
return existing
|
|
if os.environ.get("USE_INITIAL_COUNTERS", "false").lower() not in {"1", "true", "yes"}:
|
|
return {}
|
|
initial = config.get('initial_counters') or {}
|
|
normalized = {norm_key(k): int(v) for k, v in initial.items()}
|
|
if normalized:
|
|
save_counters(normalized)
|
|
return normalized
|
|
|
|
async def on_startup(app):
|
|
logger.info("Bot gestartet und bereit.")
|
|
|
|
|
|
def main():
|
|
config = load_config()
|
|
|
|
token = os.environ.get('BOT_TOKEN') or config.get('bot_token')
|
|
if not token:
|
|
print("Fehlendes Bot Token: Setze BOT_TOKEN env oder bot_token in config.yaml", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
setup_logging(config.get('log_level', 'INFO'))
|
|
|
|
counters = load_counters()
|
|
counters = init_counters(counters, config)
|
|
|
|
allowed_chat_ids = config.get('allowed_chat_ids')
|
|
if allowed_chat_ids is not None:
|
|
try:
|
|
allowed_chat_ids = [int(x) for x in allowed_chat_ids]
|
|
except Exception:
|
|
logger.error("allowed_chat_ids in config.yaml müssen Ganzzahlen sein")
|
|
allowed_chat_ids = None
|
|
|
|
def access(chat_id: int) -> bool:
|
|
if not is_allowed(chat_id, allowed_chat_ids):
|
|
logger.info("Verweigerter Zugriff von Chat %s", chat_id)
|
|
return False
|
|
return True
|
|
|
|
application = ApplicationBuilder().token(token).build()
|
|
|
|
# bot_data shared state
|
|
application.bot_data['counters'] = counters
|
|
application.bot_data['access'] = access
|
|
|
|
application.add_handler(CommandHandler("help", help_command))
|
|
application.add_handler(CommandHandler("add", add_counter))
|
|
application.add_handler(CommandHandler("remove", remove_counter))
|
|
application.add_handler(CommandHandler("increment", increment_counter))
|
|
application.add_handler(CallbackQueryHandler(handle_increment_button, pattern=r"^inc:"))
|
|
application.add_handler(CommandHandler("decrement", decrement_counter))
|
|
application.add_handler(CommandHandler("list", list_counters))
|
|
# Group / membership events
|
|
application.add_handler(MessageHandler(filters.StatusUpdate.NEW_CHAT_MEMBERS, new_members))
|
|
application.add_handler(ChatMemberHandler(my_chat_member, ChatMemberHandler.MY_CHAT_MEMBER))
|
|
application.add_handler(InlineQueryHandler(inline_query))
|
|
# Debug raw messages (placed last with low priority)
|
|
application.add_handler(MessageHandler(filters.ALL, debug_all_messages), group=100)
|
|
|
|
# Error handler
|
|
application.add_error_handler(error_handler)
|
|
|
|
# Startup callback
|
|
application.post_init = on_startup
|
|
|
|
application.run_polling(
|
|
stop_signals=(signal.SIGINT, signal.SIGTERM),
|
|
allowed_updates=["message", "chat_member", "my_chat_member", "inline_query", "callback_query"],
|
|
)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|