from contextlib import asynccontextmanager from pathlib import Path from typing import List from fastapi import FastAPI, HTTPException from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from .config import get_settings from .models import ( HookCreate, HookResponse, HookUpdate, LoginStartRequest, LoginVerifyRequest, MessageTriggerResponse, RecentChat, StatusResponse, ) from .storage import ( create_hook_async, delete_hook_async, get_hook_async, list_hooks_async, record_hook_trigger_async, update_hook_async, ) from .telegram_service import telegram_service settings = get_settings() @asynccontextmanager async def lifespan(_: FastAPI): await telegram_service.ensure_connected() try: yield finally: await telegram_service.disconnect() app = FastAPI(title="Telegram Message Hook", lifespan=lifespan) static_directory = Path(__file__).parent / "static" app.mount("/static", StaticFiles(directory=static_directory), name="static") index_path = Path(__file__).parent / "templates" / "index.html" @app.get("/", response_class=HTMLResponse) async def index() -> HTMLResponse: if not index_path.exists(): raise HTTPException(status_code=500, detail="Frontend not found") return HTMLResponse(index_path.read_text(encoding="utf-8")) @app.get("/api/status", response_model=StatusResponse) async def status() -> StatusResponse: authorized = await telegram_service.is_authorized() user = await telegram_service.get_user() if authorized else None login_state = telegram_service.get_login_state() return StatusResponse( authorized=authorized, user=user, session_active=telegram_service.is_connected(), phone_number=login_state.phone_number, code_sent=login_state.code_sent, ) @app.post("/api/login/start", response_model=StatusResponse) async def login_start(payload: LoginStartRequest) -> StatusResponse: try: await telegram_service.start_login(payload.phone_number) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc login_state = telegram_service.get_login_state() return StatusResponse( authorized=False, user=None, session_active=telegram_service.is_connected(), phone_number=login_state.phone_number, code_sent=login_state.code_sent, ) @app.post("/api/login/verify", response_model=StatusResponse) async def login_verify(payload: LoginVerifyRequest) -> StatusResponse: try: await telegram_service.verify_code( code=payload.code, password=payload.password, ) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc except Exception as exc: # noqa: BLE001 raise HTTPException(status_code=400, detail=str(exc)) from exc authorized = await telegram_service.is_authorized() user = await telegram_service.get_user() if authorized else None login_state = telegram_service.get_login_state() return StatusResponse( authorized=authorized, user=user, session_active=telegram_service.is_connected(), phone_number=login_state.phone_number, code_sent=login_state.code_sent, ) @app.post("/api/logout", response_model=StatusResponse) async def logout() -> StatusResponse: await telegram_service.logout() login_state = telegram_service.get_login_state() return StatusResponse( authorized=False, user=None, session_active=telegram_service.is_connected(), phone_number=login_state.phone_number, code_sent=login_state.code_sent, ) @app.get("/api/hooks", response_model=List[HookResponse]) async def list_hooks() -> List[HookResponse]: hooks = await list_hooks_async() return [ HookResponse( hook_id=hook.hook_id, chat_id=hook.chat_id, message=hook.message, created_at=hook.created_at, last_triggered_at=hook.last_triggered_at, action_url=f"{settings.base_url}{hook.action_path}", ) for hook in hooks ] @app.get("/api/recent-chats", response_model=List[RecentChat]) async def recent_chats() -> List[RecentChat]: if not await telegram_service.is_authorized(): raise HTTPException(status_code=401, detail="Session not authorized") chats = await telegram_service.fetch_recent_chats() return chats @app.post("/api/hooks", response_model=HookResponse, status_code=201) async def create_hook(payload: HookCreate) -> HookResponse: hook = await create_hook_async(payload) return HookResponse( hook_id=hook.hook_id, chat_id=hook.chat_id, message=hook.message, created_at=hook.created_at, last_triggered_at=hook.last_triggered_at, action_url=f"{settings.base_url}{hook.action_path}", ) @app.delete("/api/hooks/{hook_id}", status_code=204) async def delete_hook(hook_id: str) -> None: deleted = await delete_hook_async(hook_id) if not deleted: raise HTTPException(status_code=404, detail="Hook not found") @app.patch("/api/hooks/{hook_id}", response_model=HookResponse) async def update_hook(hook_id: str, payload: HookUpdate) -> HookResponse: try: updated = await update_hook_async( hook_id, new_hook_id=payload.hook_id, chat_id=payload.chat_id, message=payload.message, ) except KeyError as exc: raise HTTPException(status_code=404, detail="Hook not found") from exc except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc return HookResponse( hook_id=updated.hook_id, chat_id=updated.chat_id, message=updated.message, created_at=updated.created_at, last_triggered_at=updated.last_triggered_at, action_url=f"{settings.base_url}{updated.action_path}", ) @app.get("/action/{hook_id}", response_model=MessageTriggerResponse) async def trigger_hook(hook_id: str) -> MessageTriggerResponse: hook = await get_hook_async(hook_id) if not hook: raise HTTPException(status_code=404, detail="Hook not found") try: await telegram_service.send_message(hook.chat_id, hook.message) except PermissionError as exc: raise HTTPException(status_code=401, detail=str(exc)) from exc except Exception as exc: # noqa: BLE001 raise HTTPException(status_code=500, detail=str(exc)) from exc await record_hook_trigger_async(hook.hook_id) return MessageTriggerResponse(status="sent", hook_id=hook.hook_id, chat_id=hook.chat_id)