Files
TgMessageHook/app/telegram_service.py

190 lines
6.7 KiB
Python

import asyncio
from dataclasses import dataclass
from typing import List, Optional
from telethon import TelegramClient, errors
from telethon.tl import types
from telethon.utils import get_display_name, get_peer_id
from .config import get_settings
from .models import RecentChat
@dataclass
class LoginState:
phone_number: Optional[str] = None
phone_code_hash: Optional[str] = None
code_sent: bool = False
class TelegramService:
def __init__(self) -> None:
self.settings = get_settings()
self.client = TelegramClient(
str(self.settings.session_path),
self.settings.api_id,
self.settings.api_hash,
)
self._lock = asyncio.Lock()
self._login_state = LoginState(phone_number=self.settings.phone_number)
self._connected = False
async def connect(self) -> None:
async with self._lock:
if not self._connected:
await self.client.connect()
self._connected = True
async def ensure_connected(self) -> None:
if not self.client.is_connected():
async with self._lock:
if not self.client.is_connected():
await self.client.connect()
async def disconnect(self) -> None:
async with self._lock:
if self.client.is_connected():
await self.client.disconnect()
self._connected = False
def is_connected(self) -> bool:
return self.client.is_connected()
async def is_authorized(self) -> bool:
await self.ensure_connected()
result = self.client.is_user_authorized()
if asyncio.iscoroutine(result):
result = await result
return bool(result)
async def get_user(self) -> Optional[str]:
await self.ensure_connected()
authorized = self.client.is_user_authorized()
if asyncio.iscoroutine(authorized):
authorized = await authorized
if not authorized:
return None
me = await self.client.get_me()
if not me:
return None
full_name = " ".join(filter(None, [me.first_name, me.last_name])).strip()
return full_name or me.username or str(me.id)
async def start_login(self, phone_number: Optional[str]) -> None:
phone = phone_number or self.settings.phone_number
if not phone:
raise ValueError("Phone number is required to start login")
await self.ensure_connected()
sent_code = await self.client.send_code_request(phone)
self._login_state = LoginState(
phone_number=phone,
phone_code_hash=sent_code.phone_code_hash,
code_sent=True,
)
async def verify_code(self, code: str, password: Optional[str] = None) -> None:
if not self._login_state.code_sent or not self._login_state.phone_code_hash:
raise ValueError("No login code has been requested")
await self.ensure_connected()
success = False
try:
await self.client.sign_in(
phone=self._login_state.phone_number,
code=code,
phone_code_hash=self._login_state.phone_code_hash,
)
success = True
except errors.SessionPasswordNeededError:
if not password:
raise
await self.client.sign_in(password=password)
success = True
except errors.PhoneCodeInvalidError as exc:
raise ValueError("Invalid verification code") from exc
except errors.PhoneCodeExpiredError as exc:
raise ValueError("Verification code has expired") from exc
finally:
if success:
self._login_state = LoginState(
phone_number=self._login_state.phone_number,
code_sent=False,
)
async def logout(self) -> None:
await self.ensure_connected()
await self.client.log_out()
self._login_state = LoginState(phone_number=self.settings.phone_number)
async def send_message(self, chat_id: str, message: str) -> None:
await self.ensure_connected()
authorized = self.client.is_user_authorized()
if asyncio.iscoroutine(authorized):
authorized = await authorized
if not authorized:
raise PermissionError("Telegram session is not authorized")
await self.client.send_message(entity=chat_id, message=message, parse_mode="md")
def get_login_state(self) -> LoginState:
return self._login_state
async def fetch_recent_chats(self, limit: int = 50) -> List[RecentChat]:
await self.ensure_connected()
authorized = self.client.is_user_authorized()
if asyncio.iscoroutine(authorized):
authorized = await authorized
if not authorized:
raise PermissionError("Telegram session is not authorized")
dialogs = await self.client.get_dialogs(limit=limit)
results: List[RecentChat] = []
for dialog in dialogs:
entity = dialog.entity
if entity is None:
chat_id = str(getattr(dialog, "id", ""))
display_name = getattr(dialog, "name", chat_id) or chat_id
chat_type = "unknown"
username = None
phone = None
else:
try:
chat_id = str(get_peer_id(entity))
except Exception: # noqa: BLE001
chat_id = str(getattr(entity, "id", ""))
try:
display_name = get_display_name(entity) or chat_id
except Exception: # noqa: BLE001
display_name = chat_id
chat_type = self._classify_entity(entity)
username = getattr(entity, "username", None)
phone = getattr(entity, "phone", None) if isinstance(entity, types.User) else None
results.append(
RecentChat(
chat_id=chat_id,
display_name=display_name,
chat_type=chat_type,
username=username,
phone_number=phone,
last_used_at=getattr(dialog, "date", None),
)
)
return results
@staticmethod
def _classify_entity(entity: object) -> str:
if isinstance(entity, types.User):
if getattr(entity, "bot", False):
return "bot"
return "user"
if isinstance(entity, types.Chat):
return "group"
if isinstance(entity, types.Channel):
return "channel" if getattr(entity, "broadcast", False) else "group"
return "unknown"
telegram_service = TelegramService()