import time
import traceback
import threading
import logging
import collections
import re
import inspect
from functools import partial
from . import filtering, exception
from . import (
flavor, chat_flavors, inline_flavors, is_event,
message_identifier, origin_identifier)
try:
import Queue as queue
except ImportError:
import queue
class Microphone(object):
def __init__(self):
self._queues = set()
self._lock = threading.Lock()
def _locked(func):
def k(self, *args, **kwargs):
with self._lock:
return func(self, *args, **kwargs)
return k
@_locked
def add(self, q):
self._queues.add(q)
@_locked
def remove(self, q):
self._queues.remove(q)
@_locked
def send(self, msg):
for q in self._queues:
try:
q.put_nowait(msg)
except queue.Full:
traceback.print_exc()
[docs]class Listener(object):
def __init__(self, mic, q):
self._mic = mic
self._queue = q
self._patterns = []
def __del__(self):
self._mic.remove(self._queue)
[docs] def capture(self, pattern):
"""
Add a pattern to capture.
:param pattern: a list of templates.
A template may be a function that:
- takes one argument - a message
- returns ``True`` to indicate a match
A template may also be a dictionary whose:
- **keys** are used to *select* parts of message. Can be strings or
regular expressions (as obtained by ``re.compile()``)
- **values** are used to match against the selected parts. Can be
typical data or a function.
All templates must produce a match for a message to be considered a match.
"""
self._patterns.append(pattern)
[docs] def wait(self):
"""
Block until a matched message appears.
"""
if not self._patterns:
raise RuntimeError('Listener has nothing to capture')
while 1:
msg = self._queue.get(block=True)
if any(map(lambda p: filtering.match_all(msg, p), self._patterns)):
return msg
[docs]class Sender(object):
"""
When you are dealing with a particular chat, it is tedious to have to supply
the same ``chat_id`` every time to send a message, or to send anything.
This object is a proxy to a bot's ``send*`` and ``forwardMessage`` methods,
automatically fills in a fixed chat id for you. Available methods have
identical signatures as those of the underlying bot, **except there is no need
to supply the aforementioned** ``chat_id``:
- :meth:`.Bot.sendMessage`
- :meth:`.Bot.forwardMessage`
- :meth:`.Bot.sendPhoto`
- :meth:`.Bot.sendAudio`
- :meth:`.Bot.sendDocument`
- :meth:`.Bot.sendSticker`
- :meth:`.Bot.sendVideo`
- :meth:`.Bot.sendVoice`
- :meth:`.Bot.sendVideoNote`
- :meth:`.Bot.sendMediaGroup`
- :meth:`.Bot.sendLocation`
- :meth:`.Bot.sendVenue`
- :meth:`.Bot.sendContact`
- :meth:`.Bot.sendGame`
- :meth:`.Bot.sendChatAction`
"""
def __init__(self, bot, chat_id):
for method in ['sendMessage',
'forwardMessage',
'sendPhoto',
'sendAudio',
'sendDocument',
'sendSticker',
'sendVideo',
'sendVoice',
'sendVideoNote',
'sendMediaGroup',
'sendLocation',
'sendVenue',
'sendContact',
'sendGame',
'sendChatAction',]:
setattr(self, method, partial(getattr(bot, method), chat_id))
# Essentially doing:
# self.sendMessage = partial(bot.sendMessage, chat_id)
[docs]class Administrator(object):
"""
When you are dealing with a particular chat, it is tedious to have to supply
the same ``chat_id`` every time to get a chat's info or to perform administrative
tasks.
This object is a proxy to a bot's chat administration methods,
automatically fills in a fixed chat id for you. Available methods have
identical signatures as those of the underlying bot, **except there is no need
to supply the aforementioned** ``chat_id``:
- :meth:`.Bot.kickChatMember`
- :meth:`.Bot.unbanChatMember`
- :meth:`.Bot.restrictChatMember`
- :meth:`.Bot.promoteChatMember`
- :meth:`.Bot.exportChatInviteLink`
- :meth:`.Bot.setChatPhoto`
- :meth:`.Bot.deleteChatPhoto`
- :meth:`.Bot.setChatTitle`
- :meth:`.Bot.setChatDescription`
- :meth:`.Bot.pinChatMessage`
- :meth:`.Bot.unpinChatMessage`
- :meth:`.Bot.leaveChat`
- :meth:`.Bot.getChat`
- :meth:`.Bot.getChatAdministrators`
- :meth:`.Bot.getChatMembersCount`
- :meth:`.Bot.getChatMember`
- :meth:`.Bot.setChatStickerSet`
- :meth:`.Bot.deleteChatStickerSet`
"""
def __init__(self, bot, chat_id):
for method in ['kickChatMember',
'unbanChatMember',
'restrictChatMember',
'promoteChatMember',
'exportChatInviteLink',
'setChatPhoto',
'deleteChatPhoto',
'setChatTitle',
'setChatDescription',
'pinChatMessage',
'unpinChatMessage',
'leaveChat',
'getChat',
'getChatAdministrators',
'getChatMembersCount',
'getChatMember',
'setChatStickerSet',
'deleteChatStickerSet']:
setattr(self, method, partial(getattr(bot, method), chat_id))
[docs]class Editor(object):
"""
If you want to edit a message over and over, it is tedious to have to supply
the same ``msg_identifier`` every time.
This object is a proxy to a bot's message-editing methods, automatically fills
in a fixed message identifier for you. Available methods have
identical signatures as those of the underlying bot, **except there is no need
to supply the aforementioned** ``msg_identifier``:
- :meth:`.Bot.editMessageText`
- :meth:`.Bot.editMessageCaption`
- :meth:`.Bot.editMessageReplyMarkup`
- :meth:`.Bot.deleteMessage`
- :meth:`.Bot.editMessageLiveLocation`
- :meth:`.Bot.stopMessageLiveLocation`
A message's identifier can be easily extracted with :func:`telepot.message_identifier`.
"""
def __init__(self, bot, msg_identifier):
"""
:param msg_identifier:
a message identifier as mentioned above, or a message (whose
identifier will be automatically extracted).
"""
# Accept dict as argument. Maybe expand this convenience to other cases in future.
if isinstance(msg_identifier, dict):
msg_identifier = message_identifier(msg_identifier)
for method in ['editMessageText',
'editMessageCaption',
'editMessageReplyMarkup',
'deleteMessage',
'editMessageLiveLocation',
'stopMessageLiveLocation']:
setattr(self, method, partial(getattr(bot, method), msg_identifier))
[docs]class Answerer(object):
"""
When processing inline queries, ensure **at most one active thread** per user id.
"""
def __init__(self, bot):
self._bot = bot
self._workers = {} # map: user id --> worker thread
self._lock = threading.Lock() # control access to `self._workers`
[docs] def answer(outerself, inline_query, compute_fn, *compute_args, **compute_kwargs):
"""
Spawns a thread that calls ``compute fn`` (along with additional arguments
``*compute_args`` and ``**compute_kwargs``), then applies the returned value to
:meth:`.Bot.answerInlineQuery` to answer the inline query.
If a preceding thread is already working for a user, that thread is cancelled,
thus ensuring at most one active thread per user id.
:param inline_query:
The inline query to be processed. The originating user is inferred from ``msg['from']['id']``.
:param compute_fn:
A **thread-safe** function whose returned value is given to :meth:`.Bot.answerInlineQuery` to send.
May return:
- a *list* of `InlineQueryResult <https://core.telegram.org/bots/api#inlinequeryresult>`_
- a *tuple* whose first element is a list of `InlineQueryResult <https://core.telegram.org/bots/api#inlinequeryresult>`_,
followed by positional arguments to be supplied to :meth:`.Bot.answerInlineQuery`
- a *dictionary* representing keyword arguments to be supplied to :meth:`.Bot.answerInlineQuery`
:param \*compute_args: positional arguments to ``compute_fn``
:param \*\*compute_kwargs: keyword arguments to ``compute_fn``
"""
from_id = inline_query['from']['id']
class Worker(threading.Thread):
def __init__(innerself):
super(Worker, innerself).__init__()
innerself._cancelled = False
def cancel(innerself):
innerself._cancelled = True
def run(innerself):
try:
query_id = inline_query['id']
if innerself._cancelled:
return
# Important: compute function must be thread-safe.
ans = compute_fn(*compute_args, **compute_kwargs)
if innerself._cancelled:
return
if isinstance(ans, list):
outerself._bot.answerInlineQuery(query_id, ans)
elif isinstance(ans, tuple):
outerself._bot.answerInlineQuery(query_id, *ans)
elif isinstance(ans, dict):
outerself._bot.answerInlineQuery(query_id, **ans)
else:
raise ValueError('Invalid answer format')
finally:
with outerself._lock:
# Delete only if I have NOT been cancelled.
if not innerself._cancelled:
del outerself._workers[from_id]
# If I have been cancelled, that position in `outerself._workers`
# no longer belongs to me. I should not delete that key.
# Several threads may access `outerself._workers`. Use `outerself._lock` to protect.
with outerself._lock:
if from_id in outerself._workers:
outerself._workers[from_id].cancel()
outerself._workers[from_id] = Worker()
outerself._workers[from_id].start()
[docs]class AnswererMixin(object):
"""
Install an :class:`.Answerer` to handle inline query.
"""
Answerer = Answerer # let subclass customize Answerer class
def __init__(self, *args, **kwargs):
self._answerer = self.Answerer(self.bot)
super(AnswererMixin, self).__init__(*args, **kwargs)
@property
def answerer(self):
return self._answerer
[docs]class CallbackQueryCoordinator(object):
def __init__(self, id, origin_set, enable_chat, enable_inline):
"""
:param origin_set:
Callback query whose origin belongs to this set will be captured
:param enable_chat:
- ``False``: Do not intercept *chat-originated* callback query
- ``True``: Do intercept
- Notifier function: Do intercept and call the notifier function
on adding or removing an origin
:param enable_inline:
Same meaning as ``enable_chat``, but apply to *inline-originated*
callback query
Notifier functions should have the signature ``notifier(origin, id, adding)``:
- On adding an origin, ``notifier(origin, my_id, True)`` will be called.
- On removing an origin, ``notifier(origin, my_id, False)`` will be called.
"""
self._id = id
self._origin_set = origin_set
def dissolve(enable):
if not enable:
return False, None
elif enable is True:
return True, None
elif callable(enable):
return True, enable
else:
raise ValueError()
self._enable_chat, self._chat_notify = dissolve(enable_chat)
self._enable_inline, self._inline_notify = dissolve(enable_inline)
def _chat_origin_included(self, msg):
try:
return (msg['chat']['id'], msg['message_id']) in self._origin_set
except KeyError:
return False
def _inline_origin_included(self, inline_message_id):
return (inline_message_id,) in self._origin_set
def _rectify(self, msg_identifier):
if isinstance(msg_identifier, tuple):
if len(msg_identifier) == 2:
return msg_identifier, self._chat_notify
elif len(msg_identifier) == 1:
return msg_identifier, self._inline_notify
else:
raise ValueError()
else:
return (msg_identifier,), self._inline_notify
[docs] def capture_origin(self, msg_identifier, notify=True):
msg_identifier, notifier = self._rectify(msg_identifier)
self._origin_set.add(msg_identifier)
notify and notifier and notifier(msg_identifier, self._id, True)
[docs] def uncapture_origin(self, msg_identifier, notify=True):
msg_identifier, notifier = self._rectify(msg_identifier)
self._origin_set.discard(msg_identifier)
notify and notifier and notifier(msg_identifier, self._id, False)
def _contains_callback_data(self, message_kw):
def contains(obj, key):
if isinstance(obj, dict):
return key in obj
else:
return hasattr(obj, key)
if contains(message_kw, 'reply_markup'):
reply_markup = filtering.pick(message_kw, 'reply_markup')
if contains(reply_markup, 'inline_keyboard'):
inline_keyboard = filtering.pick(reply_markup, 'inline_keyboard')
for array in inline_keyboard:
if any(filter(lambda button: contains(button, 'callback_data'), array)):
return True
return False
[docs] def augment_send(self, send_func):
"""
:param send_func:
a function that sends messages, such as :meth:`.Bot.send\*`
:return:
a function that wraps around ``send_func`` and examines whether the
sent message contains an inline keyboard with callback data. If so,
future callback query originating from the sent message will be captured.
"""
def augmented(*aa, **kw):
sent = send_func(*aa, **kw)
if self._enable_chat and self._contains_callback_data(kw):
self.capture_origin(message_identifier(sent))
return sent
return augmented
[docs] def augment_edit(self, edit_func):
"""
:param edit_func:
a function that edits messages, such as :meth:`.Bot.edit*`
:return:
a function that wraps around ``edit_func`` and examines whether the
edited message contains an inline keyboard with callback data. If so,
future callback query originating from the edited message will be captured.
If not, such capturing will be stopped.
"""
def augmented(msg_identifier, *aa, **kw):
edited = edit_func(msg_identifier, *aa, **kw)
if (edited is True and self._enable_inline) or (isinstance(edited, dict) and self._enable_chat):
if self._contains_callback_data(kw):
self.capture_origin(msg_identifier)
else:
self.uncapture_origin(msg_identifier)
return edited
return augmented
[docs] def augment_delete(self, delete_func):
"""
:param delete_func:
a function that deletes messages, such as :meth:`.Bot.deleteMessage`
:return:
a function that wraps around ``delete_func`` and stops capturing
callback query originating from that deleted message.
"""
def augmented(msg_identifier, *aa, **kw):
deleted = delete_func(msg_identifier, *aa, **kw)
if deleted is True:
self.uncapture_origin(msg_identifier)
return deleted
return augmented
[docs] def augment_on_message(self, handler):
"""
:param handler:
an ``on_message()`` handler function
:return:
a function that wraps around ``handler`` and examines whether the
incoming message is a chosen inline result with an ``inline_message_id``
field. If so, future callback query originating from this chosen
inline result will be captured.
"""
def augmented(msg):
if (self._enable_inline
and flavor(msg) == 'chosen_inline_result'
and 'inline_message_id' in msg):
inline_message_id = msg['inline_message_id']
self.capture_origin(inline_message_id)
return handler(msg)
return augmented
[docs] def augment_bot(self, bot):
"""
:return:
a proxy to ``bot`` with these modifications:
- all ``send*`` methods augmented by :meth:`augment_send`
- all ``edit*`` methods augmented by :meth:`augment_edit`
- ``deleteMessage()`` augmented by :meth:`augment_delete`
- all other public methods, including properties, copied unchanged
"""
# Because a plain object cannot be set attributes, we need a class.
class BotProxy(object):
pass
proxy = BotProxy()
send_methods = ['sendMessage',
'forwardMessage',
'sendPhoto',
'sendAudio',
'sendDocument',
'sendSticker',
'sendVideo',
'sendVoice',
'sendVideoNote',
'sendLocation',
'sendVenue',
'sendContact',
'sendGame',
'sendInvoice',
'sendChatAction',]
for method in send_methods:
setattr(proxy, method, self.augment_send(getattr(bot, method)))
edit_methods = ['editMessageText',
'editMessageCaption',
'editMessageReplyMarkup',]
for method in edit_methods:
setattr(proxy, method, self.augment_edit(getattr(bot, method)))
delete_methods = ['deleteMessage']
for method in delete_methods:
setattr(proxy, method, self.augment_delete(getattr(bot, method)))
def public_untouched(nv):
name, value = nv
return (not name.startswith('_')
and name not in send_methods + edit_methods + delete_methods)
for name, value in filter(public_untouched, inspect.getmembers(bot)):
setattr(proxy, name, value)
return proxy
[docs]class SafeDict(dict):
"""
A subclass of ``dict``, thread-safety added::
d = SafeDict() # Thread-safe operations include:
d['a'] = 3 # key assignment
d['a'] # key retrieval
del d['a'] # key deletion
"""
def __init__(self, *args, **kwargs):
super(SafeDict, self).__init__(*args, **kwargs)
self._lock = threading.Lock()
def _locked(func):
def k(self, *args, **kwargs):
with self._lock:
return func(self, *args, **kwargs)
return k
@_locked
def __getitem__(self, key):
return super(SafeDict, self).__getitem__(key)
@_locked
def __setitem__(self, key, value):
return super(SafeDict, self).__setitem__(key, value)
@_locked
def __delitem__(self, key):
return super(SafeDict, self).__delitem__(key)
_cqc_origins = SafeDict()
[docs]class InterceptCallbackQueryMixin(object):
"""
Install a :class:`.CallbackQueryCoordinator` to capture callback query
dynamically.
Using this mixin has one consequence. The :meth:`self.bot` property no longer
returns the original :class:`.Bot` object. Instead, it returns an augmented
version of the :class:`.Bot` (augmented by :class:`.CallbackQueryCoordinator`).
The original :class:`.Bot` can be accessed with ``self.__bot`` (double underscore).
"""
CallbackQueryCoordinator = CallbackQueryCoordinator
def __init__(self, intercept_callback_query, *args, **kwargs):
"""
:param intercept_callback_query:
a 2-tuple (enable_chat, enable_inline) to pass to
:class:`.CallbackQueryCoordinator`
"""
global _cqc_origins
# Restore origin set to CallbackQueryCoordinator
if self.id in _cqc_origins:
origin_set = _cqc_origins[self.id]
else:
origin_set = set()
_cqc_origins[self.id] = origin_set
if isinstance(intercept_callback_query, tuple):
cqc_enable = intercept_callback_query
else:
cqc_enable = (intercept_callback_query,) * 2
self._callback_query_coordinator = self.CallbackQueryCoordinator(self.id, origin_set, *cqc_enable)
cqc = self._callback_query_coordinator
cqc.configure(self.listener)
self.__bot = self._bot # keep original version of bot
self._bot = cqc.augment_bot(self._bot) # modify send* and edit* methods
self.on_message = cqc.augment_on_message(self.on_message) # modify on_message()
super(InterceptCallbackQueryMixin, self).__init__(*args, **kwargs)
def __del__(self):
global _cqc_origins
if self.id in _cqc_origins and not _cqc_origins[self.id]:
del _cqc_origins[self.id]
# Remove empty set from dictionary
@property
def callback_query_coordinator(self):
return self._callback_query_coordinator
[docs]class IdleEventCoordinator(object):
def __init__(self, scheduler, timeout):
self._scheduler = scheduler
self._timeout_seconds = timeout
self._timeout_event = None
[docs] def refresh(self):
""" Refresh timeout timer """
try:
if self._timeout_event:
self._scheduler.cancel(self._timeout_event)
# Timeout event has been popped from queue prematurely
except exception.EventNotFound:
pass
# Ensure a new event is scheduled always
finally:
self._timeout_event = self._scheduler.event_later(
self._timeout_seconds,
('_idle', {'seconds': self._timeout_seconds}))
[docs] def augment_on_message(self, handler):
"""
:return:
a function wrapping ``handler`` to refresh timer for every
non-event message
"""
def augmented(msg):
# Reset timer if this is an external message
is_event(msg) or self.refresh()
# Ignore timeout event that have been popped from queue prematurely
if flavor(msg) == '_idle' and msg is not self._timeout_event.data:
return
return handler(msg)
return augmented
[docs] def augment_on_close(self, handler):
"""
:return:
a function wrapping ``handler`` to cancel timeout event
"""
def augmented(ex):
try:
if self._timeout_event:
self._scheduler.cancel(self._timeout_event)
self._timeout_event = None
# This closing may have been caused by my own timeout, in which case
# the timeout event can no longer be found in the scheduler.
except exception.EventNotFound:
self._timeout_event = None
return handler(ex)
return augmented
[docs]class IdleTerminateMixin(object):
"""
Install an :class:`.IdleEventCoordinator` to manage idle timeout. Also define
instance method ``on__idle()`` to handle idle timeout events.
"""
IdleEventCoordinator = IdleEventCoordinator
def __init__(self, timeout, *args, **kwargs):
self._idle_event_coordinator = self.IdleEventCoordinator(self.scheduler, timeout)
idlec = self._idle_event_coordinator
idlec.refresh() # start timer
self.on_message = idlec.augment_on_message(self.on_message)
self.on_close = idlec.augment_on_close(self.on_close)
super(IdleTerminateMixin, self).__init__(*args, **kwargs)
@property
def idle_event_coordinator(self):
return self._idle_event_coordinator
[docs] def on__idle(self, event):
"""
Raise an :class:`.IdleTerminate` to close the delegate.
"""
raise exception.IdleTerminate(event['_idle']['seconds'])
[docs]class StandardEventScheduler(object):
"""
A proxy to the underlying :class:`.Bot`\'s scheduler, this object implements
the *standard event format*. A standard event looks like this::
{'_flavor': {
'source': {
'space': event_space, 'id': source_id}
'custom_key1': custom_value1,
'custom_key2': custom_value2,
... }}
- There is a single top-level key indicating the flavor, starting with an _underscore.
- On the second level, there is a ``source`` key indicating the event source.
- An event source consists of an *event space* and a *source id*.
- An event space is shared by all delegates in a group. Source id simply refers
to a delegate's id. They combine to ensure a delegate is always able to capture
its own events, while its own events would not be mistakenly captured by others.
Events scheduled through this object always have the second-level ``source`` key fixed,
while the flavor and other data may be customized.
"""
def __init__(self, scheduler, event_space, source_id):
self._base = scheduler
self._event_space = event_space
self._source_id = source_id
@property
def event_space(self):
return self._event_space
[docs] def make_event_data(self, flavor, data):
"""
Marshall ``flavor`` and ``data`` into a standard event.
"""
if not flavor.startswith('_'):
raise ValueError('Event flavor must start with _underscore')
d = {'source': {'space': self._event_space, 'id': self._source_id}}
d.update(data)
return {flavor: d}
[docs] def event_at(self, when, data_tuple):
"""
Schedule an event to be emitted at a certain time.
:param when: an absolute timestamp
:param data_tuple: a 2-tuple (flavor, data)
:return: an event object, useful for cancelling.
"""
return self._base.event_at(when, self.make_event_data(*data_tuple))
[docs] def event_later(self, delay, data_tuple):
"""
Schedule an event to be emitted after a delay.
:param delay: number of seconds
:param data_tuple: a 2-tuple (flavor, data)
:return: an event object, useful for cancelling.
"""
return self._base.event_later(delay, self.make_event_data(*data_tuple))
[docs] def event_now(self, data_tuple):
"""
Schedule an event to be emitted now.
:param data_tuple: a 2-tuple (flavor, data)
:return: an event object, useful for cancelling.
"""
return self._base.event_now(self.make_event_data(*data_tuple))
[docs] def cancel(self, event):
""" Cancel an event. """
return self._base.cancel(event)
[docs]class StandardEventMixin(object):
"""
Install a :class:`.StandardEventScheduler`.
"""
StandardEventScheduler = StandardEventScheduler
def __init__(self, event_space, *args, **kwargs):
self._scheduler = self.StandardEventScheduler(self.bot.scheduler, event_space, self.id)
self._scheduler.configure(self.listener)
super(StandardEventMixin, self).__init__(*args, **kwargs)
@property
def scheduler(self):
return self._scheduler
[docs]class ListenerContext(object):
def __init__(self, bot, context_id, *args, **kwargs):
# Initialize members before super() so mixin could use them.
self._bot = bot
self._id = context_id
self._listener = bot.create_listener()
super(ListenerContext, self).__init__(*args, **kwargs)
@property
def bot(self):
"""
The underlying :class:`.Bot` or an augmented version thereof
"""
return self._bot
@property
def id(self):
return self._id
@property
def listener(self):
""" See :class:`.Listener` """
return self._listener
[docs]class ChatContext(ListenerContext):
def __init__(self, bot, context_id, *args, **kwargs):
super(ChatContext, self).__init__(bot, context_id, *args, **kwargs)
self._chat_id = context_id
self._sender = Sender(self.bot, self._chat_id)
self._administrator = Administrator(self.bot, self._chat_id)
@property
def chat_id(self):
return self._chat_id
@property
def sender(self):
""" A :class:`.Sender` for this chat """
return self._sender
@property
def administrator(self):
""" An :class:`.Administrator` for this chat """
return self._administrator
[docs]class UserContext(ListenerContext):
def __init__(self, bot, context_id, *args, **kwargs):
super(UserContext, self).__init__(bot, context_id, *args, **kwargs)
self._user_id = context_id
self._sender = Sender(self.bot, self._user_id)
@property
def user_id(self):
return self._user_id
@property
def sender(self):
""" A :class:`.Sender` for this user """
return self._sender
[docs]class CallbackQueryOriginContext(ListenerContext):
def __init__(self, bot, context_id, *args, **kwargs):
super(CallbackQueryOriginContext, self).__init__(bot, context_id, *args, **kwargs)
self._origin = context_id
self._editor = Editor(self.bot, self._origin)
@property
def origin(self):
""" Mesasge identifier of callback query's origin """
return self._origin
@property
def editor(self):
""" An :class:`.Editor` to the originating message """
return self._editor
[docs]class InvoiceContext(ListenerContext):
def __init__(self, bot, context_id, *args, **kwargs):
super(InvoiceContext, self).__init__(bot, context_id, *args, **kwargs)
self._payload = context_id
@property
def payload(self):
return self._payload
[docs]def openable(cls):
"""
A class decorator to fill in certain methods and properties to ensure
a class can be used by :func:`.create_open`.
These instance methods and property will be added, if not defined
by the class:
- ``open(self, initial_msg, seed)``
- ``on_message(self, msg)``
- ``on_close(self, ex)``
- ``close(self, ex=None)``
- property ``listener``
"""
def open(self, initial_msg, seed):
pass
def on_message(self, msg):
raise NotImplementedError()
def on_close(self, ex):
logging.error('on_close() called due to %s: %s', type(ex).__name__, ex)
def close(self, ex=None):
raise ex if ex else exception.StopListening()
@property
def listener(self):
raise NotImplementedError()
def ensure_method(name, fn):
if getattr(cls, name, None) is None:
setattr(cls, name, fn)
# set attribute if no such attribute
ensure_method('open', open)
ensure_method('on_message', on_message)
ensure_method('on_close', on_close)
ensure_method('close', close)
ensure_method('listener', listener)
return cls
[docs]class Router(object):
"""
Map a message to a handler function, using a **key function** and
a **routing table** (dictionary).
A *key function* digests a message down to a value. This value is treated
as a key to the *routing table* to look up a corresponding handler function.
"""
def __init__(self, key_function, routing_table):
"""
:param key_function:
A function that takes one argument (the message) and returns
one of the following:
- a key to the routing table
- a 1-tuple (key,)
- a 2-tuple (key, (positional, arguments, ...))
- a 3-tuple (key, (positional, arguments, ...), {keyword: arguments, ...})
Extra arguments, if returned, will be applied to the handler function
after using the key to look up the routing table.
:param routing_table:
A dictionary of ``{key: handler}``. A ``None`` key acts as a default
catch-all. If the key being looked up does not exist in the routing
table, the ``None`` key and its corresponding handler is used.
"""
super(Router, self).__init__()
self.key_function = key_function
self.routing_table = routing_table
[docs] def map(self, msg):
"""
Apply key function to ``msg`` to obtain a key. Return the routing table entry.
"""
k = self.key_function(msg)
key = k[0] if isinstance(k, (tuple, list)) else k
return self.routing_table[key]
[docs] def route(self, msg, *aa, **kw):
"""
Apply key function to ``msg`` to obtain a key, look up routing table
to obtain a handler function, then call the handler function with
positional and keyword arguments, if any is returned by the key function.
``*aa`` and ``**kw`` are dummy placeholders for easy chaining.
Regardless of any number of arguments returned by the key function,
multi-level routing may be achieved like this::
top_router.routing_table['key1'] = sub_router1.route
top_router.routing_table['key2'] = sub_router2.route
"""
k = self.key_function(msg)
if isinstance(k, (tuple, list)):
key, args, kwargs = {1: tuple(k) + ((),{}),
2: tuple(k) + ({},),
3: tuple(k),}[len(k)]
else:
key, args, kwargs = k, (), {}
try:
fn = self.routing_table[key]
except KeyError as e:
# Check for default handler, key=None
if None in self.routing_table:
fn = self.routing_table[None]
else:
raise RuntimeError('No handler for key: %s, and default handler not defined' % str(e.args))
return fn(msg, *args, **kwargs)
[docs]class DefaultRouterMixin(object):
"""
Install a default :class:`.Router` and the instance method ``on_message()``.
"""
def __init__(self, *args, **kwargs):
self._router = Router(flavor, {'chat': lambda msg: self.on_chat_message(msg),
'callback_query': lambda msg: self.on_callback_query(msg),
'inline_query': lambda msg: self.on_inline_query(msg),
'chosen_inline_result': lambda msg: self.on_chosen_inline_result(msg),
'shipping_query': lambda msg: self.on_shipping_query(msg),
'pre_checkout_query': lambda msg: self.on_pre_checkout_query(msg),
'_idle': lambda event: self.on__idle(event)})
# use lambda to delay evaluation of self.on_ZZZ to runtime because
# I don't want to require defining all methods right here.
super(DefaultRouterMixin, self).__init__(*args, **kwargs)
@property
def router(self):
return self._router
[docs] def on_message(self, msg):
""" Call :meth:`.Router.route` to handle the message. """
self._router.route(msg)
[docs]@openable
class Monitor(ListenerContext, DefaultRouterMixin):
def __init__(self, seed_tuple, capture, **kwargs):
"""
A delegate that never times-out, probably doing some kind of background monitoring
in the application. Most naturally paired with :func:`.per_application`.
:param capture: a list of patterns for :class:`.Listener` to capture
"""
bot, initial_msg, seed = seed_tuple
super(Monitor, self).__init__(bot, seed, **kwargs)
for pattern in capture:
self.listener.capture(pattern)
[docs]@openable
class ChatHandler(ChatContext,
DefaultRouterMixin,
StandardEventMixin,
IdleTerminateMixin):
def __init__(self, seed_tuple,
include_callback_query=False, **kwargs):
"""
A delegate to handle a chat.
"""
bot, initial_msg, seed = seed_tuple
super(ChatHandler, self).__init__(bot, seed, **kwargs)
self.listener.capture([{'chat': {'id': self.chat_id}}])
if include_callback_query:
self.listener.capture([{'message': {'chat': {'id': self.chat_id}}}])
[docs]@openable
class UserHandler(UserContext,
DefaultRouterMixin,
StandardEventMixin,
IdleTerminateMixin):
def __init__(self, seed_tuple,
include_callback_query=False,
flavors=chat_flavors+inline_flavors, **kwargs):
"""
A delegate to handle a user's actions.
:param flavors:
A list of flavors to capture. ``all`` covers all flavors.
"""
bot, initial_msg, seed = seed_tuple
super(UserHandler, self).__init__(bot, seed, **kwargs)
if flavors == 'all':
self.listener.capture([{'from': {'id': self.user_id}}])
else:
self.listener.capture([lambda msg: flavor(msg) in flavors, {'from': {'id': self.user_id}}])
if include_callback_query:
self.listener.capture([{'message': {'chat': {'id': self.user_id}}}])
[docs]class InlineUserHandler(UserHandler):
def __init__(self, seed_tuple, **kwargs):
"""
A delegate to handle a user's inline-related actions.
"""
super(InlineUserHandler, self).__init__(seed_tuple, flavors=inline_flavors, **kwargs)
[docs]@openable
class CallbackQueryOriginHandler(CallbackQueryOriginContext,
DefaultRouterMixin,
StandardEventMixin,
IdleTerminateMixin):
def __init__(self, seed_tuple, **kwargs):
"""
A delegate to handle callback query from one origin.
"""
bot, initial_msg, seed = seed_tuple
super(CallbackQueryOriginHandler, self).__init__(bot, seed, **kwargs)
self.listener.capture([
lambda msg:
flavor(msg) == 'callback_query' and origin_identifier(msg) == self.origin
])
[docs]@openable
class InvoiceHandler(InvoiceContext,
DefaultRouterMixin,
StandardEventMixin,
IdleTerminateMixin):
def __init__(self, seed_tuple, **kwargs):
"""
A delegate to handle messages related to an invoice.
"""
bot, initial_msg, seed = seed_tuple
super(InvoiceHandler, self).__init__(bot, seed, **kwargs)
self.listener.capture([{'invoice_payload': self.payload}])
self.listener.capture([{'successful_payment': {'invoice_payload': self.payload}}])