-"""
-Telegram Base
--------------
-This module implements a base interface for interactions with the
-Telegram API.
-"""
-
-from pathlib import Path
-from typing import Any, Callable, Optional
-
-from chatsky.utils.devel.extra_field_helpers import grab_extra_fields
-
-from chatsky.messengers.common import MessengerInterfaceWithAttachments
-from chatsky.core.service.types import PipelineRunnerFunction
-from chatsky.core.message import (
- Animation,
- Audio,
- CallbackQuery,
- Contact,
- Document,
- Image,
- Invoice,
- Location,
- Message,
- Poll,
- PollOption,
- Sticker,
- Video,
- VideoMessage,
- VoiceMessage,
- MediaGroup,
-)
-
-try:
- from telegram import (
- InputMediaAnimation,
- InputMediaAudio,
- InputMediaDocument,
- InputMediaPhoto,
- InputMediaVideo,
- Update,
- Message as TelegramMessage,
- )
- from telegram.ext import Application, ExtBot, MessageHandler, CallbackQueryHandler
- from telegram.ext.filters import ALL
-
- telegram_available = True
-except ImportError:
- ExtBot = Any
- Update = Any
- TelegramMessage = Any
-
- telegram_available = False
-
-
-[docs]class _AbstractTelegramInterface(MessengerInterfaceWithAttachments):
-
"""
-
Messenger interface mixin for Telegram API usage.
-
"""
-
-
supported_request_attachment_types = {
-
Location,
-
Contact,
-
Poll,
-
Sticker,
-
Audio,
-
Video,
-
Animation,
-
Image,
-
Document,
-
VoiceMessage,
-
VideoMessage,
-
Invoice,
-
}
-
supported_response_attachment_types = {
-
Location,
-
Contact,
-
Poll,
-
Sticker,
-
Audio,
-
Video,
-
Animation,
-
Image,
-
Document,
-
VoiceMessage,
-
VideoMessage,
-
MediaGroup,
-
}
-
-
def __init__(self, token: str, attachments_directory: Optional[Path] = None) -> None:
-
super().__init__(attachments_directory)
-
if not telegram_available:
-
raise ImportError("`python-telegram-bot` package is missing.\nTry to run `pip install chatsky[telegram]`.")
-
-
self.application = Application.builder().token(token).build()
-
self.application.add_handler(MessageHandler(ALL, self.on_message))
-
self.application.add_handler(CallbackQueryHandler(self.on_callback))
-
-
[docs] async def get_attachment_bytes(self, source: str) -> bytes:
-
file = await self.application.bot.get_file(source)
-
data = await file.download_as_bytearray()
-
return bytes(data)
-
-
-
-
[docs] async def cast_message_to_telegram_and_send(self, bot: ExtBot, chat_id: int, message: Message) -> None:
-
"""
-
Send Chatsky message to Telegram.
-
Sometimes, if several attachments included into message can not be sent as one update,
-
several Telegram updates will be produced.
-
Sometimes, if no text and none of the supported attachments are included,
-
nothing will happen.
-
-
:param bot: Telegram bot, that is used for connection to Telegram API.
-
:param chat_id: Telegram dialog ID that the message will be sent to.
-
:param message: Chatsky message that will be processed into Telegram updates.
-
"""
-
-
if message.text is not None:
-
await bot.send_message(
-
chat_id,
-
message.text,
-
**grab_extra_fields(
-
message,
-
[
-
"parse_mode",
-
"disable_notification",
-
"protect_content",
-
"reply_markup",
-
"message_effect_id",
-
"reply_to_message_id",
-
"disable_web_page_preview",
-
],
-
),
-
)
-
if message.attachments is not None:
-
for attachment in message.attachments:
-
if isinstance(attachment, Location):
-
await bot.send_location(
-
chat_id,
-
attachment.latitude,
-
attachment.longitude,
-
**grab_extra_fields(
-
attachment,
-
[
-
"horizontal_accuracy",
-
"disable_notification",
-
"protect_content",
-
"reply_markup",
-
"message_effect_id",
-
"reply_to_message_id",
-
],
-
),
-
)
-
elif isinstance(attachment, Contact):
-
await bot.send_contact(
-
chat_id,
-
attachment.phone_number,
-
attachment.first_name,
-
attachment.last_name,
-
**grab_extra_fields(
-
attachment,
-
[
-
"vcard",
-
"disable_notification",
-
"protect_content",
-
"reply_markup",
-
"message_effect_id",
-
"reply_to_message_id",
-
],
-
),
-
)
-
elif isinstance(attachment, Poll):
-
await bot.send_poll(
-
chat_id,
-
attachment.question,
-
[option.text for option in attachment.options],
-
**grab_extra_fields(
-
attachment,
-
[
-
"is_anonymous",
-
"type",
-
"allows_multiple_answers",
-
"correct_option_id",
-
"explanation",
-
"explanation_parse_mode",
-
"open_period",
-
"is_closed",
-
"disable_notification",
-
"protect_content",
-
"reply_markup",
-
"question_parse_mode",
-
"message_effect_id",
-
"reply_to_message_id",
-
],
-
),
-
)
-
elif isinstance(attachment, Audio):
-
attachment_bytes = await attachment.get_bytes(self)
-
if attachment_bytes is not None:
-
await bot.send_audio(
-
chat_id,
-
attachment_bytes,
-
**grab_extra_fields(
-
attachment,
-
[
-
"caption",
-
"parse_mode",
-
"performer",
-
"title",
-
"disable_notification",
-
"protect_content",
-
"reply_markup",
-
"thumbnail",
-
"message_effect_id",
-
"reply_to_message_id",
-
"filename",
-
],
-
),
-
)
-
elif isinstance(attachment, Video):
-
attachment_bytes = await attachment.get_bytes(self)
-
if attachment_bytes is not None:
-
await bot.send_video(
-
chat_id,
-
attachment_bytes,
-
**grab_extra_fields(
-
attachment,
-
[
-
"caption",
-
"parse_mode",
-
"supports_streaming",
-
"disable_notification",
-
"protect_content",
-
"reply_markup",
-
"has_spoiler",
-
"thumbnail",
-
"message_effect_id",
-
"show_caption_above_media",
-
"reply_to_message_id",
-
"filename",
-
],
-
),
-
)
-
elif isinstance(attachment, Animation):
-
attachment_bytes = await attachment.get_bytes(self)
-
if attachment_bytes is not None:
-
await bot.send_animation(
-
chat_id,
-
attachment_bytes,
-
**grab_extra_fields(
-
attachment,
-
[
-
"caption",
-
"parse_mode",
-
"disable_notification",
-
"protect_content",
-
"reply_markup",
-
"has_spoiler",
-
"thumbnail",
-
"message_effect_id",
-
"show_caption_above_media",
-
"reply_to_message_id",
-
"filename",
-
],
-
),
-
)
-
elif isinstance(attachment, Image):
-
attachment_bytes = await attachment.get_bytes(self)
-
if attachment_bytes is not None:
-
await bot.send_photo(
-
chat_id,
-
attachment_bytes,
-
**grab_extra_fields(
-
attachment,
-
[
-
"caption",
-
"parse_mode",
-
"disable_notification",
-
"protect_content",
-
"reply_markup",
-
"has_spoiler",
-
"message_effect_id",
-
"reply_to_message_id",
-
"filename",
-
],
-
),
-
)
-
elif isinstance(attachment, Sticker):
-
sticker = await attachment.get_bytes(self) if attachment.id is None else attachment.id
-
if sticker is not None:
-
await bot.send_sticker(
-
chat_id,
-
sticker,
-
**grab_extra_fields(
-
attachment,
-
[
-
"emoji",
-
"disable_notification",
-
"protect_content",
-
"reply_markup",
-
"message_effect_id",
-
"reply_to_message_id",
-
],
-
),
-
)
-
elif isinstance(attachment, Document):
-
attachment_bytes = await attachment.get_bytes(self)
-
if attachment_bytes is not None:
-
await bot.send_document(
-
chat_id,
-
attachment_bytes,
-
**grab_extra_fields(
-
attachment,
-
[
-
"caption",
-
"parse_mode",
-
"disable_notification",
-
"protect_content",
-
"reply_markup",
-
"thumbnail",
-
"message_effect_id",
-
"reply_to_message_id",
-
"filename",
-
],
-
),
-
)
-
elif isinstance(attachment, VoiceMessage):
-
attachment_bytes = await attachment.get_bytes(self)
-
if attachment_bytes is not None:
-
await bot.send_voice(
-
chat_id,
-
attachment_bytes,
-
**grab_extra_fields(
-
attachment,
-
[
-
"caption",
-
"parse_mode",
-
"disable_notification",
-
"protect_content",
-
"reply_markup",
-
"message_effect_id",
-
"reply_to_message_id",
-
"filename",
-
],
-
),
-
)
-
elif isinstance(attachment, VideoMessage):
-
attachment_bytes = await attachment.get_bytes(self)
-
if attachment_bytes is not None:
-
await bot.send_video_note(
-
chat_id,
-
attachment_bytes,
-
**grab_extra_fields(
-
attachment,
-
[
-
"disable_notification",
-
"protect_content",
-
"reply_markup",
-
"thumbnail",
-
"message_effect_id",
-
"reply_to_message_id",
-
"filename",
-
],
-
),
-
)
-
elif isinstance(attachment, MediaGroup):
-
files = list()
-
for media in attachment.group:
-
if isinstance(media, Image):
-
media_bytes = await media.get_bytes(self)
-
files += [
-
InputMediaPhoto(
-
media_bytes,
-
**grab_extra_fields(
-
media,
-
[
-
"filename",
-
"caption",
-
"parse_mode",
-
"has_spoiler",
-
"show_caption_above_media",
-
],
-
),
-
),
-
]
-
elif isinstance(media, Video):
-
media_bytes = await media.get_bytes(self)
-
files += [
-
InputMediaVideo(
-
media_bytes,
-
**grab_extra_fields(
-
media,
-
[
-
"filename",
-
"caption",
-
"parse_mode",
-
"supports_streaming",
-
"has_spoiler",
-
"thumbnail",
-
"show_caption_above_media",
-
],
-
),
-
),
-
]
-
elif isinstance(media, Animation):
-
media_bytes = await media.get_bytes(self)
-
files += [
-
InputMediaAnimation(
-
media_bytes,
-
**grab_extra_fields(
-
media,
-
[
-
"filename",
-
"caption",
-
"parse_mode",
-
"has_spoiler",
-
"thumbnail",
-
"show_caption_above_media",
-
],
-
),
-
),
-
]
-
elif isinstance(media, Audio):
-
media_bytes = await media.get_bytes(self)
-
files += [
-
InputMediaAudio(
-
media_bytes,
-
**grab_extra_fields(
-
media,
-
["filename", "caption", "parse_mode", "performer", "title", "thumbnail"],
-
),
-
),
-
]
-
elif isinstance(media, Document):
-
media_bytes = await media.get_bytes(self)
-
files += [
-
InputMediaDocument(
-
media_bytes,
-
**grab_extra_fields(media, ["filename", "caption", "parse_mode", "thumbnail"]),
-
),
-
]
-
else:
-
raise ValueError(f"Attachment {type(media).__name__} can not be sent in a media group!")
-
await bot.send_media_group(
-
chat_id,
-
files,
-
**grab_extra_fields(
-
attachment,
-
[
-
"caption",
-
"disable_notification",
-
"protect_content",
-
"message_effect_id",
-
"reply_to_message_id",
-
"parse_mode",
-
],
-
),
-
)
-
else:
-
raise ValueError(f"Attachment {type(attachment).__name__} is not supported!")
-
-
[docs] async def _on_event(self, update: Update, _: Any, create_message: Callable[[Update], Message]) -> None:
-
"""
-
Process Telegram update, run pipeline and send response to Telegram.
-
-
:param update: Telegram update that will be processed.
-
:param create_message: function that converts Telegram update to Chatsky message.
-
"""
-
-
data_available = update.message is not None or update.callback_query is not None
-
if update.effective_chat is not None and data_available:
-
message = create_message(update)
-
message.original_message = update
-
resp = await self._pipeline_runner(message, update.effective_chat.id)
-
if resp.last_response is not None:
-
await self.cast_message_to_telegram_and_send(
-
self.application.bot, update.effective_chat.id, resp.last_response
-
)
-
-
[docs] async def on_message(self, update: Update, _: Any) -> None:
-
"""
-
Process normal Telegram update, extracting Chatsky message from it
-
using :py:meth:`~._AbstractTelegramInterface.extract_message_from_telegram`.
-
-
:param update: Telegram update that will be processed.
-
"""
-
-
await self._on_event(update, _, lambda s: self.extract_message_from_telegram(s.message))
-
-
[docs] async def on_callback(self, update: Update, _: Any) -> None:
-
"""
-
Process Telegram callback update, creating empty Chatsky message
-
with only one callback query attachment from `callback_query.data` field.
-
-
:param update: Telegram update that will be processed.
-
"""
-
-
await self._on_event(
-
update, _, lambda s: Message(attachments=[CallbackQuery(query_string=s.callback_query.data)])
-
)
-
-
[docs] async def connect(self, pipeline_runner: PipelineRunnerFunction, *args, **kwargs):
-
self._pipeline_runner = pipeline_runner
-
-
-