129 lines
4.4 KiB
Python
129 lines
4.4 KiB
Python
import asyncio
|
|
from dataclasses import dataclass
|
|
from typing import Optional
|
|
|
|
from telethon import TelegramClient, errors
|
|
|
|
from .config import get_settings
|
|
|
|
|
|
@dataclass
|
|
class LoginState:
|
|
phone_number: Optional[str] = None
|
|
phone_code_hash: Optional[str] = None
|
|
code_sent: bool = False
|
|
|
|
|
|
class TelegramService:
|
|
def __init__(self) -> None:
|
|
self.settings = get_settings()
|
|
self.client = TelegramClient(
|
|
str(self.settings.session_path),
|
|
self.settings.api_id,
|
|
self.settings.api_hash,
|
|
)
|
|
self._lock = asyncio.Lock()
|
|
self._login_state = LoginState(phone_number=self.settings.phone_number)
|
|
self._connected = False
|
|
|
|
async def connect(self) -> None:
|
|
async with self._lock:
|
|
if not self._connected:
|
|
await self.client.connect()
|
|
self._connected = True
|
|
|
|
async def ensure_connected(self) -> None:
|
|
if not self.client.is_connected():
|
|
async with self._lock:
|
|
if not self.client.is_connected():
|
|
await self.client.connect()
|
|
|
|
async def disconnect(self) -> None:
|
|
async with self._lock:
|
|
if self.client.is_connected():
|
|
await self.client.disconnect()
|
|
self._connected = False
|
|
|
|
def is_connected(self) -> bool:
|
|
return self.client.is_connected()
|
|
|
|
async def is_authorized(self) -> bool:
|
|
await self.ensure_connected()
|
|
result = self.client.is_user_authorized()
|
|
if asyncio.iscoroutine(result):
|
|
result = await result
|
|
return bool(result)
|
|
|
|
async def get_user(self) -> Optional[str]:
|
|
await self.ensure_connected()
|
|
authorized = self.client.is_user_authorized()
|
|
if asyncio.iscoroutine(authorized):
|
|
authorized = await authorized
|
|
if not authorized:
|
|
return None
|
|
me = await self.client.get_me()
|
|
if not me:
|
|
return None
|
|
full_name = " ".join(filter(None, [me.first_name, me.last_name])).strip()
|
|
return full_name or me.username or str(me.id)
|
|
|
|
async def start_login(self, phone_number: Optional[str]) -> None:
|
|
phone = phone_number or self.settings.phone_number
|
|
if not phone:
|
|
raise ValueError("Phone number is required to start login")
|
|
await self.ensure_connected()
|
|
sent_code = await self.client.send_code_request(phone)
|
|
self._login_state = LoginState(
|
|
phone_number=phone,
|
|
phone_code_hash=sent_code.phone_code_hash,
|
|
code_sent=True,
|
|
)
|
|
|
|
async def verify_code(self, code: str, password: Optional[str] = None) -> None:
|
|
if not self._login_state.code_sent or not self._login_state.phone_code_hash:
|
|
raise ValueError("No login code has been requested")
|
|
await self.ensure_connected()
|
|
success = False
|
|
try:
|
|
await self.client.sign_in(
|
|
phone=self._login_state.phone_number,
|
|
code=code,
|
|
phone_code_hash=self._login_state.phone_code_hash,
|
|
)
|
|
success = True
|
|
except errors.SessionPasswordNeededError:
|
|
if not password:
|
|
raise
|
|
await self.client.sign_in(password=password)
|
|
success = True
|
|
except errors.PhoneCodeInvalidError as exc:
|
|
raise ValueError("Invalid verification code") from exc
|
|
except errors.PhoneCodeExpiredError as exc:
|
|
raise ValueError("Verification code has expired") from exc
|
|
finally:
|
|
if success:
|
|
self._login_state = LoginState(
|
|
phone_number=self._login_state.phone_number,
|
|
code_sent=False,
|
|
)
|
|
|
|
async def logout(self) -> None:
|
|
await self.ensure_connected()
|
|
await self.client.log_out()
|
|
self._login_state = LoginState(phone_number=self.settings.phone_number)
|
|
|
|
async def send_message(self, chat_id: str, message: str) -> None:
|
|
await self.ensure_connected()
|
|
authorized = self.client.is_user_authorized()
|
|
if asyncio.iscoroutine(authorized):
|
|
authorized = await authorized
|
|
if not authorized:
|
|
raise PermissionError("Telegram session is not authorized")
|
|
await self.client.send_message(entity=chat_id, message=message, parse_mode="md")
|
|
|
|
def get_login_state(self) -> LoginState:
|
|
return self._login_state
|
|
|
|
|
|
telegram_service = TelegramService()
|