Source code for telepot.loop

import sys
import time
import json
import threading
import traceback
import collections

try:
    import Queue as queue
except ImportError:
    import queue

from . import exception
from . import _find_first_key, flavor_router


class RunForeverAsThread(object):
    def run_as_thread(self, *args, **kwargs):
        t = threading.Thread(target=self.run_forever, args=args, kwargs=kwargs)
        t.daemon = True
        t.start()


class CollectLoop(RunForeverAsThread):
    def __init__(self, handle):
        self._handle = handle
        self._inqueue = queue.Queue()

    @property
    def input_queue(self):
        return self._inqueue

    def run_forever(self):
        while 1:
            try:
                msg = self._inqueue.get(block=True)
                self._handle(msg)
            except:
                traceback.print_exc()


class GetUpdatesLoop(RunForeverAsThread):
    def __init__(self, bot, on_update):
        self._bot = bot
        self._update_handler = on_update

    def run_forever(self, relax=0.1, offset=None, timeout=20, allowed_updates=None):
        """
        Process new updates in infinity loop

        :param relax: float
        :param offset: int
        :param timeout: int
        :param allowed_updates: bool
        """
        while 1:
            try:
                result = self._bot.getUpdates(offset=offset,
                                              timeout=timeout,
                                              allowed_updates=allowed_updates)

                # Once passed, this parameter is no longer needed.
                allowed_updates = None

                # No sort. Trust server to give messages in correct order.
                for update in result:
                    self._update_handler(update)
                    offset = update['update_id'] + 1

            except exception.BadHTTPResponse as e:
                traceback.print_exc()

                # Servers probably down. Wait longer.
                if e.status == 502:
                    time.sleep(30)
            except:
                traceback.print_exc()
            finally:
                time.sleep(relax)


def _dictify3(data):
    if type(data) is bytes:
        return json.loads(data.decode('utf-8'))
    elif type(data) is str:
        return json.loads(data)
    elif type(data) is dict:
        return data
    else:
        raise ValueError()

def _dictify27(data):
    if type(data) in [str, unicode]:
        return json.loads(data)
    elif type(data) is dict:
        return data
    else:
        raise ValueError()

_dictify = _dictify3 if sys.version_info >= (3,) else _dictify27

def _extract_message(update):
    key = _find_first_key(update, ['message',
                                   'edited_message',
                                   'channel_post',
                                   'edited_channel_post',
                                   'callback_query',
                                   'inline_query',
                                   'chosen_inline_result',
                                   'shipping_query',
                                   'pre_checkout_query'])
    return key, update[key]

def _infer_handler_function(bot, h):
    if h is None:
        return bot.handle
    elif isinstance(h, dict):
        return flavor_router(h)
    else:
        return h


[docs]class MessageLoop(RunForeverAsThread): def __init__(self, bot, handle=None): self._bot = bot self._handle = _infer_handler_function(bot, handle)
[docs] def run_forever(self, *args, **kwargs): """ :type relax: float :param relax: seconds between each :meth:`.getUpdates` :type offset: int :param offset: initial ``offset`` parameter supplied to :meth:`.getUpdates` :type timeout: int :param timeout: ``timeout`` parameter supplied to :meth:`.getUpdates`, controlling how long to poll. :type allowed_updates: array of string :param allowed_updates: ``allowed_updates`` parameter supplied to :meth:`.getUpdates`, controlling which types of updates to receive. Calling this method will block forever. Use :meth:`.run_as_thread` to run it non-blockingly. """ collectloop = CollectLoop(self._handle) updatesloop = GetUpdatesLoop(self._bot, lambda update: collectloop.input_queue.put(_extract_message(update)[1])) # feed messages to collect loop # feed events to collect loop self._bot.scheduler.on_event(collectloop.input_queue.put) self._bot.scheduler.run_as_thread() updatesloop.run_as_thread(*args, **kwargs) collectloop.run_forever() # blocking
[docs]class Webhook(RunForeverAsThread): def __init__(self, bot, handle=None): self._bot = bot self._collectloop = CollectLoop(_infer_handler_function(bot, handle))
[docs] def run_forever(self): # feed events to collect loop self._bot.scheduler.on_event(self._collectloop.input_queue.put) self._bot.scheduler.run_as_thread() self._collectloop.run_forever()
[docs] def feed(self, data): update = _dictify(data) self._collectloop.input_queue.put(_extract_message(update)[1])
class Orderer(RunForeverAsThread): def __init__(self, on_ordered_update): self._on_ordered_update = on_ordered_update self._inqueue = queue.Queue() @property def input_queue(self): return self._inqueue def run_forever(self, maxhold=3): def handle(update): self._on_ordered_update(update) return update['update_id'] # Here is the re-ordering mechanism, ensuring in-order delivery of updates. max_id = None # max update_id passed to callback buffer = collections.deque() # keep those updates which skip some update_id qwait = None # how long to wait for updates, # because buffer's content has to be returned in time. while 1: try: update = self._inqueue.get(block=True, timeout=qwait) if max_id is None: # First message received, handle regardless. max_id = handle(update) elif update['update_id'] == max_id + 1: # No update_id skipped, handle naturally. max_id = handle(update) # clear contagious updates in buffer if len(buffer) > 0: buffer.popleft() # first element belongs to update just received, useless now. while 1: try: if type(buffer[0]) is dict: max_id = handle(buffer.popleft()) # updates that arrived earlier, handle them. else: break # gap, no more contagious updates except IndexError: break # buffer empty elif update['update_id'] > max_id + 1: # Update arrives pre-maturely, insert to buffer. nbuf = len(buffer) if update['update_id'] <= max_id + nbuf: # buffer long enough, put update at position buffer[update['update_id'] - max_id - 1] = update else: # buffer too short, lengthen it expire = time.time() + maxhold for a in range(nbuf, update['update_id']-max_id-1): buffer.append(expire) # put expiry time in gaps buffer.append(update) else: pass # discard except queue.Empty: # debug message # print('Timeout') # some buffer contents have to be handled # flush buffer until a non-expired time is encountered while 1: try: if type(buffer[0]) is dict: max_id = handle(buffer.popleft()) else: expire = buffer[0] if expire <= time.time(): max_id += 1 buffer.popleft() else: break # non-expired except IndexError: break # buffer empty except: traceback.print_exc() finally: try: # don't wait longer than next expiry time qwait = buffer[0] - time.time() if qwait < 0: qwait = 0 except IndexError: # buffer empty, can wait forever qwait = None # debug message # print ('Buffer:', str(buffer), ', To Wait:', qwait, ', Max ID:', max_id)
[docs]class OrderedWebhook(RunForeverAsThread): def __init__(self, bot, handle=None): self._bot = bot self._collectloop = CollectLoop(_infer_handler_function(bot, handle)) self._orderer = Orderer(lambda update: self._collectloop.input_queue.put(_extract_message(update)[1])) # feed messages to collect loop
[docs] def run_forever(self, *args, **kwargs): """ :type maxhold: float :param maxhold: The maximum number of seconds an update is held waiting for a not-yet-arrived smaller ``update_id``. When this number of seconds is up, the update is delivered to the message-handling function even if some smaller ``update_id``\s have not yet arrived. If those smaller ``update_id``\s arrive at some later time, they are discarded. Calling this method will block forever. Use :meth:`.run_as_thread` to run it non-blockingly. """ # feed events to collect loop self._bot.scheduler.on_event(self._collectloop.input_queue.put) self._bot.scheduler.run_as_thread() self._orderer.run_as_thread(*args, **kwargs) self._collectloop.run_forever()
[docs] def feed(self, data): """ :param data: One of these: - ``str``, ``unicode`` (Python 2.7), or ``bytes`` (Python 3, decoded using UTF-8) representing a JSON-serialized `Update <https://core.telegram.org/bots/api#update>`_ object. - a ``dict`` representing an Update object. """ update = _dictify(data) self._orderer.input_queue.put(update)