Автоматизируем чтение Telegram-каналов: LLM, Telethon и грабли на пути к MVP (Claude Code опыт)
Время на прочтение7 мин
Охват и читатели6.7K
Python *
Из песочницы
Расскажу, как собрал бота для AI-суммаризации Telegram-каналов: архитектура, выбор LLM-провайдера, оптимизация скорости и неочевидные проблемы при деплое на российский VPS.
Проблема
Подписан на 50+ Telegram-каналов. Каждое утро - 200+ непрочитанных сообщений. Читаю 10%, остальное скроллю. Классический information overload.
Идея: пусть LLM читает посты и присылает выжимку. Но не просто "саммари", а двухступенчатый подход:
Real Topic - о чём пост на самом деле (1-2 предложения)
TL;DR - полная выжимка с ключевыми тезисами
Логика: даже саммари 50 постов - много. А пробежать глазами "о чём это" - секунды.
Архитектура
Почему Telethon, а не Bot API
Bot API не умеет читать сообщения из каналов, на которые бот не добавлен администратором. Telethon работает через MTProto как обычный клиент - может читать любые публичные каналы.
python
from telethon import TelegramClient
client = TelegramClient('session', api_id, api_hash)
async def get_posts(channel_username: str, limit: int = 20):
entity = await client.get_entity(channel_username)
posts = []
async for message in client.iter_messages(entity, limit=limit):
if message.text and len(message.text) > 50:
posts.append({
'id': message.id,
'text': message.text,
'date': message.date,
'url': f'https://t.me/{channel_username}/{message.id}'
})
return posts
Стек
Компонент
Технология
Почему
Bot Framework
aiogram 3.x
Async, типизация, активная разработка
Channel Parser
Telethon
MTProto, читает любые публичные каналы
LLM
DeepSeek API
Работает из РФ, дёшево ($0.14/1M input)
Database
SQLite + aiosqlite
Достаточно для MVP, zero config
Scheduler
APScheduler
Простой, async-совместимый
Config
Pydantic Settings
Валидация, типизация, .env из коробки
Промпт для анализа постов
python
ANALYZE_POST_PROMPT = """Проанализируй пост из Telegram-канала и верни JSON.
Канал: {channel_name}
Текст поста:
---
{post_content}
---
Верни JSON строго в таком формате:
{{
"real_topic": "1-2 предложения о чём НА САМОМ ДЕЛЕ этот пост",
"tldr": "Краткое изложение в 2-4 предложениях с главными фактами",
"key_insights": ["ключевой инсайт 1", "ключевой инсайт 2"],
"relevance_score": 7,
"content_type": "news"
}}
Где content_type: "news", "opinion", "tutorial", "announcement", "other"
Отвечай ТОЛЬКО валидным JSON без markdown-разметки."""
Ключевые моменты:
Явное указание формата вывода (JSON)
Примеры полей прямо в промпте
Ограничение на markdown - иначе LLM оборачивает JSON в json
Грабля #1: Groq не работает из России
Изначально выбрал Groq - бесплатный, быстрый (Llama 3.3 70B за ~1 сек). Локально всё работало. После деплоя на Timeweb (российский VPS) - 403 Forbidden.
Сравнение провайдеров
Провайдер
Скорость
Доступ из РФ
Цена (1M input)
Цена (1M output)
Groq
~1 сек
❌ Blocked
Бесплатно (лимиты)
Бесплатно
OpenAI
~2-3 сек
❌ Blocked
$2.50 (GPT-4o-mini)
$10.00
DeepSeek
~5-10 сек
✅ Работает
$0.14
$0.28
Mistral
~3-5 сек
✅ Работает
$0.25
$0.25
Выбрал DeepSeek - работает из России, адекватная цена, качество на уровне GPT-4o-mini.
Универсальный клиент
Сделал клиент с поддержкой разных провайдеров через OpenAI-совместимый API:
python
from openai import AsyncOpenAI
class LLMClient:
PROVIDERS = {
"deepseek": {
"base_url": "https://api.deepseek.com",
"default_model": "deepseek-chat"
},
"groq": {
"base_url": "https://api.groq.com/openai/v1",
"default_model": "llama-3.3-70b-versatile"
},
}
def __init__(self, provider: str, api_key: str):
config = self.PROVIDERS[provider]
self.client = AsyncOpenAI(
base_url=config["base_url"],
api_key=api_key
)
self.model = config["default_model"]
async def complete(self, prompt: str, system: str = None) -> str:
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.3
)
return response.choices[0].message.content
Грабля #2: Дайджест генерируется 2 минуты
После перехода на DeepSeek время ответа выросло с ~1 сек (Groq) до ~8-10 сек. При 9 постах последовательная обработка занимала 80+ секунд.
Диагностика
Добавил логирование времени:
python
import time
async def analyze_post(self, content: str, channel: str):
start = time.time()
result = await self.client.complete(...)
elapsed = time.time() - start
logger.info(f"LLM call took {elapsed:.2f}s for {channel}")
return result
Вывод:
LLM call took 5.39s for channel: channel_1
LLM call took 8.57s for channel: channel_1
LLM call took 9.12s for channel: channel_2
...
Total: 86.3s for 9 posts
Решение: параллельная обработка
python
import asyncio
async def analyze_batch(
self,
posts: list[dict],
concurrency: int = 5
) -> list[dict]:
semaphore = asyncio.Semaphore(concurrency)
async def process_one(post: dict):
async with semaphore:
return await self.analyze_post(
content=post["content"],
channel=post["channel"]
)
tasks = [process_one(post) for post in posts]
return await asyncio.gather(*tasks)
Результаты
Режим
Время (9 постов)
Ускорение
Последовательно
86 сек
—
concurrency=3
28 сек
3x
concurrency=5
19 сек
4.5x
Семафор нужен, чтобы не упереться в rate limit API.
Грабля #3: Один канал забивает весь дайджест
Пользователь добавил 3 канала:
Канал A: 20 постов/день
Канал B: 5 постов/день
Канал C: 3 поста/день
При лимите 15 постов на дайджест канал C не попадал вообще - посты собирались в порядке добавления каналов.
Алгоритм равномерного распределения
python
def distribute_posts_evenly(
posts_by_channel: dict[str, list],
max_total: int = 30
) -> list:
"""
Round-robin распределение постов из разных каналов.
Args:
posts_by_channel: {"channel_1": [post1, post2], ...}
max_total: максимум постов в итоговом списке
Returns:
Список постов, равномерно распределённых по каналам
"""
channels = list(posts_by_channel.keys())
if not channels:
return []
fair_share = max_total // len(channels)
result = []
# Фаза 1: берём по fair_share из каждого канала
for channel in channels:
posts = posts_by_channel[channel][:fair_share]
for post in posts:
post['_channel'] = channel
result.extend(posts)
# Фаза 2: добираем оставшиеся слоты
remaining = max_total - len(result)
if remaining > 0:
for channel in channels:
leftover = posts_by_channel[channel][fair_share:]
for post in leftover[:remaining]:
post['_channel'] = channel
result.append(post)
remaining -= 1
if remaining <= 0:
break
if remaining <= 0:
break
return result
Теперь при 3 каналах и лимите 30: каждый получает минимум 10 слотов.
Модель данных
python
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, BigInteger
from sqlalchemy.orm import relationship, declarative_base
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
telegram_id = Column(BigInteger, unique=True, index=True)
digest_time = Column(String, default="07:00") # HH:MM
timezone = Column(String, default="Europe/Moscow")
is_active = Column(Boolean, default=True)
subscriptions = relationship("UserChannel", back_populates="user")
class Channel(Base):
__tablename__ = "channels"
id = Column(Integer, primary_key=True)
username = Column(String, unique=True, index=True) # без @
title = Column(String, nullable=True)
class UserChannel(Base):
"""Many-to-many: пользователь <-> канал"""
__tablename__ = "user_channels"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("users.id"))
channel_id = Column(Integer, ForeignKey("channels.id"))
added_at = Column(DateTime, default=datetime.utcnow)
user = relationship("User", back_populates="subscriptions")
channel = relationship("Channel")
SQLite с WAL-режимом справляется с конкурентными запросами:
python
async with engine.begin() as conn:
await conn.execute(text("PRAGMA journal_mode=WAL"))
Планировщик ежедневных дайджестов
python
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
scheduler = AsyncIOScheduler()
async def schedule_user_digests():
"""Планирует дайджест для каждого пользователя на его время"""
users = await get_active_users()
for user in users:
hour, minute = map(int, user.digest_time.split(':'))
scheduler.add_job(
send_digest,
CronTrigger(hour=hour, minute=minute, timezone=user.timezone),
args=[user.telegram_id],
id=f"digest_{user.telegram_id}",
replace_existing=True
)
async def send_digest(telegram_id: int):
"""Генерирует и отправляет дайджест пользователю"""
user = await get_user(telegram_id)
channels = await get_user_channels(user)
# Собираем посты
all_posts = {}
for channel in channels:
posts = await parser.get_posts(channel.username, hours_back=24)
all_posts[channel.username] = posts
# Равномерно распределяем
distributed = distribute_posts_evenly(all_posts, max_total=30)
# Анализируем через LLM (параллельно)
analyzed = await processor.analyze_batch(distributed, concurrency=5)
# Формируем и отправляем
digest_text = format_digest(analyzed)
await bot.send_message(telegram_id, digest_text)
Деплой
Systemd unit
ini
# /etc/systemd/system/briefka.service
[Unit]
Description=Briefka Telegram Bot
After=network.target
[Service]
Type=simple
User=briefka
WorkingDirectory=/opt/briefka
ExecStart=/opt/briefka/venv/bin/python scripts/run_bot.py
Restart=always
RestartSec=10
Environment=PYTHONUNBUFFERED=1
[Install]
WantedBy=multi-user.target
Бэкап базы (cron)
bash
#!/bin/bash
# /opt/briefka/scripts/backup.sh
BACKUP_DIR="/opt/briefka/backups"
DB_PATH="/opt/briefka/briefka.db"
DATE=$(date +%Y%m%d_%H%M%S)
mkdir -p $BACKUP_DIR
cp $DB_PATH "$BACKUP_DIR/briefka_$DATE.db"
# Удаляем бэкапы старше 7 дней
find $BACKUP_DIR -name "*.db" -mtime +7 -delete
cron
0 3 * * * /opt/briefka/scripts/backup.sh >> /opt/briefka/logs/backup.log 2>&1
Метрики
Параметр
Значение
Время генерации дайджеста (10 постов)
15-20 сек
Потребление RAM
~95 MB
Стоимость VPS (Timeweb)
530 ₽/мес
Стоимость LLM (DeepSeek, ~100 дайджестов)
~$0.10/мес
Что дальше
Персонализация - сохранять фидбек (лайк/дизлайк), обучать ранжирование под интересы пользователя
Кэширование - не парсить каналы при каждом запросе, фоновое обновление по cron
Rate limiting - защита от абуза (сейчас лимит: 10 каналов на пользователя)
Выводы
Telethon > Bot API для чтения каналов. Bot API не умеет читать сообщения из чужих каналов.
Региональные ограничения реальны. Groq, OpenAI заблокированы в РФ. DeepSeek работает.
Параллельность решает. Простой asyncio.gather() с семафором дал ускорение в 4.5 раза.
SQLite достаточно для MVP. С WAL-режимом справляется с конкурентными запросами.
MVP можно запустить за 500 ₽/мес. Не нужен дорогой сервер для Telegram-бота.
Бот работает в бета-режиме: t.me/briefka_bot
Буду рад вопросам и фидбеку в комментариях.
Теги:claude code
Хабы:Python
Meta* (Instagram*, Facebook*) и другие признанные экстремистскими организации/ресурсы запрещены в РФ.
Упоминания иностранных агентов сопровождаются маркировкой по закону.
Информационный материал. 18+.