import asyncio from dataclasses import dataclass from typing import List, Optional from telethon import TelegramClient, errors from telethon.tl import types from telethon.utils import get_display_name, get_peer_id from .config import get_settings from .models import RecentChat @dataclass class LoginState: phone_number: Optional[str] = None phone_code_hash: Optional[str] = None code_sent: bool = False class TelegramService: def __init__(self) -> None: self.settings = get_settings() self.client = TelegramClient( str(self.settings.session_path), self.settings.api_id, self.settings.api_hash, ) self._lock = asyncio.Lock() self._login_state = LoginState(phone_number=self.settings.phone_number) self._connected = False async def connect(self) -> None: async with self._lock: if not self._connected: await self.client.connect() self._connected = True async def ensure_connected(self) -> None: if not self.client.is_connected(): async with self._lock: if not self.client.is_connected(): await self.client.connect() async def disconnect(self) -> None: async with self._lock: if self.client.is_connected(): await self.client.disconnect() self._connected = False def is_connected(self) -> bool: return self.client.is_connected() async def is_authorized(self) -> bool: await self.ensure_connected() result = self.client.is_user_authorized() if asyncio.iscoroutine(result): result = await result return bool(result) async def get_user(self) -> Optional[str]: await self.ensure_connected() authorized = self.client.is_user_authorized() if asyncio.iscoroutine(authorized): authorized = await authorized if not authorized: return None me = await self.client.get_me() if not me: return None full_name = " ".join(filter(None, [me.first_name, me.last_name])).strip() return full_name or me.username or str(me.id) async def start_login(self, phone_number: Optional[str]) -> None: phone = phone_number or self.settings.phone_number if not phone: raise ValueError("Phone number is required to start login") await self.ensure_connected() sent_code = await self.client.send_code_request(phone) self._login_state = LoginState( phone_number=phone, phone_code_hash=sent_code.phone_code_hash, code_sent=True, ) async def verify_code(self, code: str, password: Optional[str] = None) -> None: if not self._login_state.code_sent or not self._login_state.phone_code_hash: raise ValueError("No login code has been requested") await self.ensure_connected() success = False try: await self.client.sign_in( phone=self._login_state.phone_number, code=code, phone_code_hash=self._login_state.phone_code_hash, ) success = True except errors.SessionPasswordNeededError: if not password: raise await self.client.sign_in(password=password) success = True except errors.PhoneCodeInvalidError as exc: raise ValueError("Invalid verification code") from exc except errors.PhoneCodeExpiredError as exc: raise ValueError("Verification code has expired") from exc finally: if success: self._login_state = LoginState( phone_number=self._login_state.phone_number, code_sent=False, ) async def logout(self) -> None: await self.ensure_connected() await self.client.log_out() self._login_state = LoginState(phone_number=self.settings.phone_number) async def send_message(self, chat_id: str, message: str) -> None: await self.ensure_connected() authorized = self.client.is_user_authorized() if asyncio.iscoroutine(authorized): authorized = await authorized if not authorized: raise PermissionError("Telegram session is not authorized") await self.client.send_message(entity=chat_id, message=message, parse_mode="md") def get_login_state(self) -> LoginState: return self._login_state async def fetch_recent_chats(self, limit: int = 50) -> List[RecentChat]: await self.ensure_connected() authorized = self.client.is_user_authorized() if asyncio.iscoroutine(authorized): authorized = await authorized if not authorized: raise PermissionError("Telegram session is not authorized") dialogs = await self.client.get_dialogs(limit=limit) results: List[RecentChat] = [] for dialog in dialogs: entity = dialog.entity if entity is None: chat_id = str(getattr(dialog, "id", "")) display_name = getattr(dialog, "name", chat_id) or chat_id chat_type = "unknown" username = None phone = None else: try: chat_id = str(get_peer_id(entity)) except Exception: # noqa: BLE001 chat_id = str(getattr(entity, "id", "")) try: display_name = get_display_name(entity) or chat_id except Exception: # noqa: BLE001 display_name = chat_id chat_type = self._classify_entity(entity) username = getattr(entity, "username", None) phone = getattr(entity, "phone", None) if isinstance(entity, types.User) else None results.append( RecentChat( chat_id=chat_id, display_name=display_name, chat_type=chat_type, username=username, phone_number=phone, last_used_at=getattr(dialog, "date", None), ) ) return results @staticmethod def _classify_entity(entity: object) -> str: if isinstance(entity, types.User): if getattr(entity, "bot", False): return "bot" return "user" if isinstance(entity, types.Chat): return "group" if isinstance(entity, types.Channel): return "channel" if getattr(entity, "broadcast", False) else "group" return "unknown" telegram_service = TelegramService()