Files
TgMessageHook/app/main.py
Andre Beging c7f694d820 Initial commit
2025-10-07 12:51:31 +02:00

185 lines
6.0 KiB
Python

from contextlib import asynccontextmanager
from pathlib import Path
from typing import List
from fastapi import FastAPI, HTTPException
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from .config import get_settings
from .models import (
HookCreate,
HookResponse,
HookUpdateId,
LoginStartRequest,
LoginVerifyRequest,
MessageTriggerResponse,
StatusResponse,
)
from .storage import (
create_hook_async,
delete_hook_async,
get_hook_async,
list_hooks_async,
update_hook_id_async,
)
from .telegram_service import telegram_service
settings = get_settings()
@asynccontextmanager
async def lifespan(_: FastAPI):
await telegram_service.ensure_connected()
try:
yield
finally:
await telegram_service.disconnect()
app = FastAPI(title="Telegram Message Hook", lifespan=lifespan)
static_directory = Path(__file__).parent / "static"
app.mount("/static", StaticFiles(directory=static_directory), name="static")
index_path = Path(__file__).parent / "templates" / "index.html"
@app.get("/", response_class=HTMLResponse)
async def index() -> HTMLResponse:
if not index_path.exists():
raise HTTPException(status_code=500, detail="Frontend not found")
return HTMLResponse(index_path.read_text(encoding="utf-8"))
@app.get("/api/status", response_model=StatusResponse)
async def status() -> StatusResponse:
authorized = await telegram_service.is_authorized()
user = await telegram_service.get_user() if authorized else None
login_state = telegram_service.get_login_state()
return StatusResponse(
authorized=authorized,
user=user,
session_active=telegram_service.is_connected(),
phone_number=login_state.phone_number,
code_sent=login_state.code_sent,
)
@app.post("/api/login/start", response_model=StatusResponse)
async def login_start(payload: LoginStartRequest) -> StatusResponse:
try:
await telegram_service.start_login(payload.phone_number)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
login_state = telegram_service.get_login_state()
return StatusResponse(
authorized=False,
user=None,
session_active=telegram_service.is_connected(),
phone_number=login_state.phone_number,
code_sent=login_state.code_sent,
)
@app.post("/api/login/verify", response_model=StatusResponse)
async def login_verify(payload: LoginVerifyRequest) -> StatusResponse:
try:
await telegram_service.verify_code(
code=payload.code,
password=payload.password,
)
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
except Exception as exc: # noqa: BLE001
raise HTTPException(status_code=400, detail=str(exc)) from exc
authorized = await telegram_service.is_authorized()
user = await telegram_service.get_user() if authorized else None
login_state = telegram_service.get_login_state()
return StatusResponse(
authorized=authorized,
user=user,
session_active=telegram_service.is_connected(),
phone_number=login_state.phone_number,
code_sent=login_state.code_sent,
)
@app.post("/api/logout", response_model=StatusResponse)
async def logout() -> StatusResponse:
await telegram_service.logout()
login_state = telegram_service.get_login_state()
return StatusResponse(
authorized=False,
user=None,
session_active=telegram_service.is_connected(),
phone_number=login_state.phone_number,
code_sent=login_state.code_sent,
)
@app.get("/api/hooks", response_model=List[HookResponse])
async def list_hooks() -> List[HookResponse]:
hooks = await list_hooks_async()
return [
HookResponse(
hook_id=hook.hook_id,
chat_id=hook.chat_id,
message=hook.message,
created_at=hook.created_at,
action_url=f"{settings.base_url}{hook.action_path}",
)
for hook in hooks
]
@app.post("/api/hooks", response_model=HookResponse, status_code=201)
async def create_hook(payload: HookCreate) -> HookResponse:
hook = await create_hook_async(payload)
return HookResponse(
hook_id=hook.hook_id,
chat_id=hook.chat_id,
message=hook.message,
created_at=hook.created_at,
action_url=f"{settings.base_url}{hook.action_path}",
)
@app.delete("/api/hooks/{hook_id}", status_code=204)
async def delete_hook(hook_id: str) -> None:
deleted = await delete_hook_async(hook_id)
if not deleted:
raise HTTPException(status_code=404, detail="Hook not found")
@app.patch("/api/hooks/{hook_id}", response_model=HookResponse)
async def update_hook(hook_id: str, payload: HookUpdateId) -> HookResponse:
try:
updated = await update_hook_id_async(hook_id, payload.hook_id)
except KeyError as exc:
raise HTTPException(status_code=404, detail="Hook not found") from exc
except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
return HookResponse(
hook_id=updated.hook_id,
chat_id=updated.chat_id,
message=updated.message,
created_at=updated.created_at,
action_url=f"{settings.base_url}{updated.action_path}",
)
@app.get("/action/{hook_id}", response_model=MessageTriggerResponse)
async def trigger_hook(hook_id: str) -> MessageTriggerResponse:
hook = await get_hook_async(hook_id)
if not hook:
raise HTTPException(status_code=404, detail="Hook not found")
try:
await telegram_service.send_message(hook.chat_id, hook.message)
except PermissionError as exc:
raise HTTPException(status_code=401, detail=str(exc)) from exc
except Exception as exc: # noqa: BLE001
raise HTTPException(status_code=500, detail=str(exc)) from exc
return MessageTriggerResponse(status="sent", hook_id=hook.hook_id, chat_id=hook.chat_id)