import sys
import io
import time
import json
import threading
import traceback
import collections
import bisect

    import Queue as queue
except ImportError:
    import queue

# Patch urllib3 for sending unicode filename
from . import hack

from . import exception

__version_info__ = (10, 5)
__version__ = '.'.join(map(str, __version_info__))

[docs]def flavor(msg): """ Return flavor of message or event. A message's flavor may be one of these: - ``chat`` - ``callback_query`` - ``inline_query`` - ``chosen_inline_result`` An event's flavor is determined by the single top-level key. """ if 'message_id' in msg: return 'chat' elif 'id' in msg and 'chat_instance' in msg: return 'callback_query' elif 'id' in msg and 'query' in msg: return 'inline_query' elif 'result_id' in msg: return 'chosen_inline_result' else: top_keys = list(msg.keys()) if len(top_keys) == 1: return top_keys[0] raise exception.BadFlavor(msg)
chat_flavors = ['chat'] inline_flavors = ['inline_query', 'chosen_inline_result'] def _find_first_key(d, keys): for k in keys: if k in d: return k raise KeyError('No suggested keys %s in %s' % (str(keys), str(d))) all_content_types = [ 'text', 'audio', 'document', 'game', 'photo', 'sticker', 'video', 'voice', 'contact', 'location', 'venue', 'new_chat_member', 'left_chat_member', 'new_chat_title', 'new_chat_photo', 'delete_chat_photo', 'group_chat_created', 'supergroup_chat_created', 'channel_chat_created', 'migrate_to_chat_id', 'migrate_from_chat_id', 'pinned_message', ]
[docs]def glance(msg, flavor='chat', long=False): """ Extract "headline" info about a message. Use parameter ``long`` to control whether a short or long tuple is returned. When ``flavor`` is ``chat`` (``msg`` being a `Message <>`_ object): - short: (content_type, ``msg['chat']['type']``, ``msg['chat']['id']``) - long: (content_type, ``msg['chat']['type']``, ``msg['chat']['id']``, ``msg['date']``, ``msg['message_id']``) *content_type* can be: ``text``, ``audio``, ``document``, ``game``, ``photo``, ``sticker``, ``video``, ``voice``, ``contact``, ``location``, ``venue``, ``new_chat_member``, ``left_chat_member``, ``new_chat_title``, ``new_chat_photo``, ``delete_chat_photo``, ``group_chat_created``, ``supergroup_chat_created``, ``channel_chat_created``, ``migrate_to_chat_id``, ``migrate_from_chat_id``, ``pinned_message``. When ``flavor`` is ``callback_query`` (``msg`` being a `CallbackQuery <>`_ object): - regardless: (``msg['id']``, ``msg['from']['id']``, ``msg['data']``) When ``flavor`` is ``inline_query`` (``msg`` being a `InlineQuery <>`_ object): - short: (``msg['id']``, ``msg['from']['id']``, ``msg['query']``) - long: (``msg['id']``, ``msg['from']['id']``, ``msg['query']``, ``msg['offset']``) When ``flavor`` is ``chosen_inline_result`` (``msg`` being a `ChosenInlineResult <>`_ object): - regardless: (``msg['result_id']``, ``msg['from']['id']``, ``msg['query']``) """ def gl_chat(): content_type = _find_first_key(msg, all_content_types) if long: return content_type, msg['chat']['type'], msg['chat']['id'], msg['date'], msg['message_id'] else: return content_type, msg['chat']['type'], msg['chat']['id'] def gl_callback_query(): return msg['id'], msg['from']['id'], msg['data'] def gl_inline_query(): if long: return msg['id'], msg['from']['id'], msg['query'], msg['offset'] else: return msg['id'], msg['from']['id'], msg['query'] def gl_chosen_inline_result(): return msg['result_id'], msg['from']['id'], msg['query'] try: fn = {'chat': gl_chat, 'callback_query': gl_callback_query, 'inline_query': gl_inline_query, 'chosen_inline_result': gl_chosen_inline_result}[flavor] except KeyError: raise exception.BadFlavor(flavor) return fn()
[docs]def flance(msg, long=False): """ A combination of :meth:`telepot.flavor` and :meth:`telepot.glance`, return a 2-tuple (flavor, headline_info), where *headline_info* is whatever extracted by :meth:`telepot.glance` depending on the message flavor and the ``long`` parameter. """ f = flavor(msg) g = glance(msg, flavor=f, long=long) return f,g
[docs]def peel(event): """ Remove an event's top-level skin (where its flavor is determined), and return the core content. """ return list(event.values())[0]
[docs]def fleece(event): """ A combination of :meth:`telepot.flavor` and :meth:`telepot.peel`, return a 2-tuple (flavor, content) of an event. """ return flavor(event), peel(event)
[docs]def is_event(msg): """ Return whether the message looks like an event. That is, whether it has a flavor that starts with an underscore. """ return flavor(msg).startswith('_')
[docs]def origin_identifier(msg): """ Extract the message identifier of a callback query's origin. Returned value is guaranteed to be a tuple. ``msg`` is expected to be ``callback_query``. """ if 'message' in msg: return msg['message']['chat']['id'], msg['message']['message_id'] elif 'inline_message_id' in msg: return msg['inline_message_id'], else: raise ValueError()
[docs]def message_identifier(msg): """ Extract an identifier for message editing. Useful with :meth:`telepot.Bot.editMessageText` and similar methods. Returned value is guaranteed to be a tuple. ``msg`` is expected to be ``chat`` or ``choson_inline_result``. """ if 'chat' in msg and 'message_id' in msg: return msg['chat']['id'], msg['message_id'] elif 'inline_message_id' in msg: return msg['inline_message_id'], else: raise ValueError()
def _dismantle_message_identifier(f): if isinstance(f, tuple): if len(f) == 2: return {'chat_id': f[0], 'message_id': f[1]} elif len(f) == 1: return {'inline_message_id': f[0]} else: raise ValueError() else: return {'inline_message_id': f} PY_3 = sys.version_info.major >= 3 _string_type = str if PY_3 else basestring _file_type = io.IOBase if PY_3 else file def _isstring(s): return isinstance(s, _string_type) def _isfile(f): return isinstance(f, _file_type) from . import helper def flavor_router(routing_table): router = helper.Router(flavor, routing_table) return router.route class _BotBase(object): def __init__(self, token): self._token = token self._file_chunk_size = 65536 def _strip(params, more=[]): return {key: value for key,value in params.items() if key not in ['self']+more} def _rectify(params): def namedtuple_to_dict(value): if isinstance(value, list): return [namedtuple_to_dict(v) for v in value] elif isinstance(value, dict): return {k:namedtuple_to_dict(v) for k,v in value.items() if v is not None} elif isinstance(value, tuple) and hasattr(value, '_asdict'): return {k:namedtuple_to_dict(v) for k,v in value._asdict().items() if v is not None} else: return value def flatten(value): v = namedtuple_to_dict(value) if isinstance(v, (dict, list)): return json.dumps(v, separators=(',',':')) else: return v # remove None, then json-serialize if needed return {k: flatten(v) for k,v in params.items() if v is not None} from . import api
[docs]class Bot(_BotBase): class Scheduler(threading.Thread): # A class that is sorted by timestamp. Use `bisect` module to ensure order in event queue. Event = collections.namedtuple('Event', ['timestamp', 'data']) Event.__eq__ = lambda self, other: self.timestamp == other.timestamp Event.__ne__ = lambda self, other: self.timestamp != other.timestamp Event.__gt__ = lambda self, other: self.timestamp > other.timestamp Event.__ge__ = lambda self, other: self.timestamp >= other.timestamp Event.__lt__ = lambda self, other: self.timestamp < other.timestamp Event.__le__ = lambda self, other: self.timestamp <= other.timestamp def __init__(self): super(Bot.Scheduler, self).__init__() self._eventq = [] self._lock = threading.RLock() # reentrant lock to allow locked method calling locked method self._output_queue = None def _locked(fn): def k(self, *args, **kwargs): with self._lock: return fn(self, *args, **kwargs) return k @_locked def _insert_event(self, data, when): ev = self.Event(when, data) bisect.insort(self._eventq, ev) return ev @_locked def _remove_event(self, event): # Find event according to its timestamp. # Index returned should be one behind. i = bisect.bisect(self._eventq, event) # Having two events with identical timestamp is unlikely but possible. # I am going to move forward and compare timestamp AND object address # to make sure the correct object is found. while i > 0: i -= 1 e = self._eventq[i] if e.timestamp != event.timestamp: raise exception.EventNotFound(event) elif id(e) == id(event): self._eventq.pop(i) return raise exception.EventNotFound(event) @_locked def _pop_expired_event(self): if not self._eventq: return None if self._eventq[0].timestamp <= time.time(): return self._eventq.pop(0) else: return None def event_at(self, when, data): """ Schedule some data to emit at an absolute timestamp. :type when: int or float :type data: dictionary :return: an internal Event object """ return self._insert_event(data, when) def event_later(self, delay, data): """ Schedule some data to emit after a number of seconds. :type delay: int or float :type data: dictionary :return: an internal Event object """ return self._insert_event(data, time.time()+delay) def event_now(self, data): """ Emit some data as soon as possible. :type data: dictionary :return: an internal Event object """ return self._insert_event(data, time.time()) def cancel(self, event): """ Cancel an event. :type event: an internal Event object """ self._remove_event(event) def run(self): while 1: e = self._pop_expired_event() while e: if callable( d = if d is not None: self._output_queue.put(d) else: self._output_queue.put( e = self._pop_expired_event() time.sleep(0.1) def __init__(self, token): super(Bot, self).__init__(token) self._scheduler = self.Scheduler() self._router = helper.Router(flavor, {'chat': lambda msg: self.on_chat_message(msg), 'callback_query': lambda msg: self.on_callback_query(msg), 'inline_query': lambda msg: self.on_inline_query(msg), 'chosen_inline_result': lambda msg: self.on_chosen_inline_result(msg)}) # use lambda to delay evaluation of self.on_ZZZ to runtime because # I don't want to require defining all methods right here. @property def scheduler(self): return self._scheduler @property def router(self): return self._router def handle(self, msg): self._router.route(msg) def _api_request(self, method, params=None, files=None, **kwargs): return api.request((self._token, method, params, files), **kwargs)
[docs] def getMe(self): """ See: """ return self._api_request('getMe')
[docs] def sendMessage(self, chat_id, text, parse_mode=None, disable_web_page_preview=None, disable_notification=None, reply_to_message_id=None, reply_markup=None): """ See: """ p = _strip(locals()) return self._api_request('sendMessage', _rectify(p))
[docs] def forwardMessage(self, chat_id, from_chat_id, message_id, disable_notification=None): """ See: """ p = _strip(locals()) return self._api_request('forwardMessage', _rectify(p))
def _sendfile(self, inputfile, filetype, params): method = {'photo': 'sendPhoto', 'audio': 'sendAudio', 'document': 'sendDocument', 'sticker': 'sendSticker', 'video': 'sendVideo', 'voice': 'sendVoice',}[filetype] if _isstring(inputfile): params[filetype] = inputfile return self._api_request(method, _rectify(params)) else: files = {filetype: inputfile} return self._api_request(method, _rectify(params), files)
[docs] def sendPhoto(self, chat_id, photo, caption=None, disable_notification=None, reply_to_message_id=None, reply_markup=None): """ See: :param photo: a string indicating a ``file_id`` on server, a file-like object as obtained by ``open()`` or ``urlopen()``, or a (filename, file-like object) tuple. If the file-like object is obtained by ``urlopen()``, you most likely have to supply a filename because Telegram servers require to know the file extension. If the filename contains non-ASCII characters and you are using Python 2.7, make sure the filename is a unicode string. """ p = _strip(locals(), more=['photo']) return self._sendfile(photo, 'photo', p)
[docs] def sendAudio(self, chat_id, audio, caption=None, duration=None, performer=None, title=None, disable_notification=None, reply_to_message_id=None, reply_markup=None): """ See: :param audio: Same as ``photo`` in :meth:`telepot.Bot.sendPhoto` """ p = _strip(locals(), more=['audio']) return self._sendfile(audio, 'audio', p)
[docs] def sendDocument(self, chat_id, document, caption=None, disable_notification=None, reply_to_message_id=None, reply_markup=None): """ See: :param document: Same as ``photo`` in :meth:`telepot.Bot.sendPhoto` """ p = _strip(locals(), more=['document']) return self._sendfile(document, 'document', p)
[docs] def sendSticker(self, chat_id, sticker, disable_notification=None, reply_to_message_id=None, reply_markup=None): """ See: :param sticker: Same as ``photo`` in :meth:`telepot.Bot.sendPhoto` """ p = _strip(locals(), more=['sticker']) return self._sendfile(sticker, 'sticker', p)
[docs] def sendVideo(self, chat_id, video, duration=None, width=None, height=None, caption=None, disable_notification=None, reply_to_message_id=None, reply_markup=None): """ See: :param video: Same as ``photo`` in :meth:`telepot.Bot.sendPhoto` """ p = _strip(locals(), more=['video']) return self._sendfile(video, 'video', p)
[docs] def sendVoice(self, chat_id, voice, caption=None, duration=None, disable_notification=None, reply_to_message_id=None, reply_markup=None): """ See: :param voice: Same as ``photo`` in :meth:`telepot.Bot.sendPhoto` """ p = _strip(locals(), more=['voice']) return self._sendfile(voice, 'voice', p)
[docs] def sendLocation(self, chat_id, latitude, longitude, disable_notification=None, reply_to_message_id=None, reply_markup=None): """ See: """ p = _strip(locals()) return self._api_request('sendLocation', _rectify(p))
[docs] def sendVenue(self, chat_id, latitude, longitude, title, address, foursquare_id=None, disable_notification=None, reply_to_message_id=None, reply_markup=None): """ See: """ p = _strip(locals()) return self._api_request('sendVenue', _rectify(p))
[docs] def sendContact(self, chat_id, phone_number, first_name, last_name=None, disable_notification=None, reply_to_message_id=None, reply_markup=None): """ See: """ p = _strip(locals()) return self._api_request('sendContact', _rectify(p))
[docs] def sendGame(self, chat_id, game_short_name, disable_notification=None, reply_to_message_id=None, reply_markup=None): """ See: """ p = _strip(locals()) return self._api_request('sendGame', _rectify(p))
[docs] def sendChatAction(self, chat_id, action): """ See: """ p = _strip(locals()) return self._api_request('sendChatAction', _rectify(p))
[docs] def getUserProfilePhotos(self, user_id, offset=None, limit=None): """ See: """ p = _strip(locals()) return self._api_request('getUserProfilePhotos', _rectify(p))
[docs] def getFile(self, file_id): """ See: """ p = _strip(locals()) return self._api_request('getFile', _rectify(p))
[docs] def kickChatMember(self, chat_id, user_id): """ See: """ p = _strip(locals()) return self._api_request('kickChatMember', _rectify(p))
[docs] def leaveChat(self, chat_id): """ See: """ p = _strip(locals()) return self._api_request('leaveChat', _rectify(p))
[docs] def unbanChatMember(self, chat_id, user_id): """ See: """ p = _strip(locals()) return self._api_request('unbanChatMember', _rectify(p))
[docs] def getChat(self, chat_id): """ See: """ p = _strip(locals()) return self._api_request('getChat', _rectify(p))
[docs] def getChatAdministrators(self, chat_id): """ See: """ p = _strip(locals()) return self._api_request('getChatAdministrators', _rectify(p))
[docs] def getChatMembersCount(self, chat_id): """ See: """ p = _strip(locals()) return self._api_request('getChatMembersCount', _rectify(p))
[docs] def getChatMember(self, chat_id, user_id): """ See: """ p = _strip(locals()) return self._api_request('getChatMember', _rectify(p))
[docs] def answerCallbackQuery(self, callback_query_id, text=None, show_alert=None, url=None, cache_time=None): """ See: """ p = _strip(locals()) return self._api_request('answerCallbackQuery', _rectify(p))
[docs] def editMessageText(self, msg_identifier, text, parse_mode=None, disable_web_page_preview=None, reply_markup=None): """ See: :param msg_identifier: a 2-tuple (``chat_id``, ``message_id``), a 1-tuple (``inline_message_id``), or simply ``inline_message_id``. You may extract this value easily with :meth:`telepot.message_identifier` """ p = _strip(locals(), more=['msg_identifier']) p.update(_dismantle_message_identifier(msg_identifier)) return self._api_request('editMessageText', _rectify(p))
[docs] def editMessageCaption(self, msg_identifier, caption=None, reply_markup=None): """ See: :param msg_identifier: Same as ``msg_identifier`` in :meth:`telepot.Bot.editMessageText` """ p = _strip(locals(), more=['msg_identifier']) p.update(_dismantle_message_identifier(msg_identifier)) return self._api_request('editMessageCaption', _rectify(p))
[docs] def editMessageReplyMarkup(self, msg_identifier, reply_markup=None): """ See: :param msg_identifier: Same as ``msg_identifier`` in :meth:`telepot.Bot.editMessageText` """ p = _strip(locals(), more=['msg_identifier']) p.update(_dismantle_message_identifier(msg_identifier)) return self._api_request('editMessageReplyMarkup', _rectify(p))
[docs] def answerInlineQuery(self, inline_query_id, results, cache_time=None, is_personal=None, next_offset=None, switch_pm_text=None, switch_pm_parameter=None): """ See: """ p = _strip(locals()) return self._api_request('answerInlineQuery', _rectify(p))
[docs] def getUpdates(self, offset=None, limit=None, timeout=None, allowed_updates=None): """ See: """ p = _strip(locals()) return self._api_request('getUpdates', _rectify(p))
[docs] def setWebhook(self, url=None, certificate=None, max_connections=None, allowed_updates=None): """ See: """ p = _strip(locals(), more=['certificate']) if certificate: files = {'certificate': certificate} return self._api_request('setWebhook', _rectify(p), files) else: return self._api_request('setWebhook', _rectify(p))
[docs] def deleteWebhook(self): """ See: """ return self._api_request('deleteWebhook')
[docs] def getWebhookInfo(self): """ See: """ return self._api_request('getWebhookInfo')
[docs] def setGameScore(self, user_id, score, game_message_identifier, force=None, disable_edit_message=None): """ See: :param game_message_identifier: Same as ``msg_identifier`` in :meth:`telepot.Bot.editMessageText` """ p = _strip(locals(), more=['game_message_identifier']) p.update(_dismantle_message_identifier(game_message_identifier)) return self._api_request('setGameScore', _rectify(p))
[docs] def getGameHighScores(self, user_id, game_message_identifier): """ See: :param game_message_identifier: Same as ``msg_identifier`` in :meth:`telepot.Bot.editMessageText` """ p = _strip(locals(), more=['game_message_identifier']) p.update(_dismantle_message_identifier(game_message_identifier)) return self._api_request('getGameHighScores', _rectify(p))
[docs] def download_file(self, file_id, dest): """ Download a file to local disk. :param dest: a path or a ``file`` object """ f = self.getFile(file_id) try: d = dest if _isfile(dest) else open(dest, 'wb') r =, f['file_path']), preload_content=False) while 1: data = if not data: break d.write(data) finally: if not _isfile(dest) and 'd' in locals(): d.close() if 'r' in locals(): r.release_conn()
[docs] def message_loop(self, callback=None, relax=0.1, timeout=20, allowed_updates=None, source=None, ordered=True, maxhold=3, run_forever=False): """ Spawn a thread to constantly ``getUpdates`` or pull updates from a queue. Apply ``callback`` to every message received. Also starts the scheduler thread for internal events. :param callback: a function that takes one argument (the message), or a routing table. If ``None``, the bot's ``handle`` method is used. A *routing table* is a dictionary of ``{flavor: function}``, mapping messages to appropriate handler functions according to their flavors. It allows you to define functions specifically to handle one flavor of messages. It usually looks like this: ``{'chat': fn1, 'callback_query': fn2, 'inline_query': fn3, ...}``. Each handler function should take one argument (the message). :param source: Source of updates. If ``None``, ``getUpdates`` is used to obtain new messages from Telegram servers. If it is a synchronized queue (``Queue.Queue`` in Python 2.7 or ``queue.Queue`` in Python 3), new messages are pulled from the queue. A web application implementing a webhook can dump updates into the queue, while the bot pulls from it. This is how telepot can be integrated with webhooks. Acceptable contents in queue: - ``str``, ``unicode`` (Python 2.7), or ``bytes`` (Python 3, decoded using UTF-8) representing a JSON-serialized `Update <>`_ object. - a ``dict`` representing an Update object. When ``source`` is ``None``, these parameters are meaningful: :type relax: float :param relax: seconds between each ``getUpdates`` :type timeout: int :param timeout: ``timeout`` parameter supplied to :meth:`telepot.Bot.getUpdates`, controlling how long to poll. :type allowed_updates: array of string :param allowed_updates: ``allowed_updates`` parameter supplied to :meth:`telepot.Bot.getUpdates`, controlling which types of updates to receive. When ``source`` is a queue, these parameters are meaningful: :type ordered: bool :param ordered: If ``True``, ensure in-order delivery of messages to ``callback`` (i.e. updates with a smaller ``update_id`` always come before those with a larger ``update_id``). If ``False``, no re-ordering is done. ``callback`` is applied to messages as soon as they are pulled from queue. :type maxhold: float :param maxhold: Applied only when ``ordered`` is ``True``. The maximum number of seconds an update is held waiting for a not-yet-arrived smaller ``update_id``. When this number of seconds is up, the update is delivered to ``callback`` even if some smaller ``update_id``\s have not yet arrived. If those smaller ``update_id``\s arrive at some later time, they are discarded. Finally, there is this parameter, meaningful always: :type run_forever: bool or str :param run_forever: If ``True`` or any non-empty string, append an infinite loop at the end of this method, so it never returns. Useful as the very last line in a program. A non-empty string will also be printed, useful as an indication that the program is listening. """ if callback is None: callback = self.handle elif isinstance(callback, dict): callback = flavor_router(callback) collect_queue = queue.Queue() def collector(): while 1: try: item = collect_queue.get(block=True) callback(item) except: # Localize error so thread can keep going. traceback.print_exc() def relay_to_collector(update): key = _find_first_key(update, ['message', 'edited_message', 'channel_post', 'edited_channel_post', 'callback_query', 'inline_query', 'chosen_inline_result']) collect_queue.put(update[key]) return update['update_id'] def get_from_telegram_server(): offset = None # running offset allowed_upd = allowed_updates while 1: try: result = self.getUpdates(offset=offset, timeout=timeout, allowed_updates=allowed_upd) # Once passed, this parameter is no longer needed. allowed_upd = None if len(result) > 0: # No sort. Trust server to give messages in correct order. # Update offset to max(update_id) + 1 offset = max([relay_to_collector(update) for update in result]) + 1 except exception.BadHTTPResponse as e: traceback.print_exc() # Servers probably down. Wait longer. if e.status == 502: time.sleep(30) except: traceback.print_exc() finally: time.sleep(relax) def dictify3(data): if type(data) is bytes: return json.loads(data.decode('utf-8')) elif type(data) is str: return json.loads(data) elif type(data) is dict: return data else: raise ValueError() def dictify27(data): if type(data) in [str, unicode]: return json.loads(data) elif type(data) is dict: return data else: raise ValueError() def get_from_queue_unordered(qu): dictify = dictify3 if sys.version_info >= (3,) else dictify27 while 1: try: data = qu.get(block=True) update = dictify(data) relay_to_collector(update) except: traceback.print_exc() def get_from_queue(qu): dictify = dictify3 if sys.version_info >= (3,) else dictify27 # Here is the re-ordering mechanism, ensuring in-order delivery of updates. max_id = None # max update_id passed to callback buffer = collections.deque() # keep those updates which skip some update_id qwait = None # how long to wait for updates, # because buffer's content has to be returned in time. while 1: try: data = qu.get(block=True, timeout=qwait) update = dictify(data) if max_id is None: # First message received, handle regardless. max_id = relay_to_collector(update) elif update['update_id'] == max_id + 1: # No update_id skipped, handle naturally. max_id = relay_to_collector(update) # clear contagious updates in buffer if len(buffer) > 0: buffer.popleft() # first element belongs to update just received, useless now. while 1: try: if type(buffer[0]) is dict: max_id = relay_to_collector(buffer.popleft()) # updates that arrived earlier, handle them. else: break # gap, no more contagious updates except IndexError: break # buffer empty elif update['update_id'] > max_id + 1: # Update arrives pre-maturely, insert to buffer. nbuf = len(buffer) if update['update_id'] <= max_id + nbuf: # buffer long enough, put update at position buffer[update['update_id'] - max_id - 1] = update else: # buffer too short, lengthen it expire = time.time() + maxhold for a in range(nbuf, update['update_id']-max_id-1): buffer.append(expire) # put expiry time in gaps buffer.append(update) else: pass # discard except queue.Empty: # debug message # print('Timeout') # some buffer contents have to be handled # flush buffer until a non-expired time is encountered while 1: try: if type(buffer[0]) is dict: max_id = relay_to_collector(buffer.popleft()) else: expire = buffer[0] if expire <= time.time(): max_id += 1 buffer.popleft() else: break # non-expired except IndexError: break # buffer empty except: traceback.print_exc() finally: try: # don't wait longer than next expiry time qwait = buffer[0] - time.time() if qwait < 0: qwait = 0 except IndexError: # buffer empty, can wait forever qwait = None # debug message # print ('Buffer:', str(buffer), ', To Wait:', qwait, ', Max ID:', max_id) collector_thread = threading.Thread(target=collector) collector_thread.daemon = True collector_thread.start() if source is None: message_thread = threading.Thread(target=get_from_telegram_server) elif isinstance(source, queue.Queue): if ordered: message_thread = threading.Thread(target=get_from_queue, args=(source,)) else: message_thread = threading.Thread(target=get_from_queue_unordered, args=(source,)) else: raise ValueError('Invalid source') message_thread.daemon = True # need this for main thread to be killable by Ctrl-C message_thread.start() self._scheduler._output_queue = collect_queue self._scheduler.daemon = True self._scheduler.start() if run_forever: if _isstring(run_forever): print(run_forever) while 1: time.sleep(10)
import inspect class SpeakerBot(Bot): def __init__(self, token): super(SpeakerBot, self).__init__(token) self._mic = helper.Microphone() @property def mic(self): return self._mic def create_listener(self): q = queue.Queue() self._mic.add(q) ln = helper.Listener(self._mic, q) return ln
[docs]class DelegatorBot(SpeakerBot): def __init__(self, token, delegation_patterns): """ :param delegation_patterns: a list of (seeder, delegator) tuples. """ super(DelegatorBot, self).__init__(token) self._delegate_records = [p+({},) for p in delegation_patterns] def _startable(self, delegate): return ((hasattr(delegate, 'start') and inspect.ismethod(delegate.start)) and (hasattr(delegate, 'is_alive') and inspect.ismethod(delegate.is_alive))) def _tuple_is_valid(self, t): return len(t) == 3 and callable(t[0]) and type(t[1]) in [list, tuple] and type(t[2]) is dict def _ensure_startable(self, delegate): if self._startable(delegate): return delegate elif callable(delegate): return threading.Thread(target=delegate) elif type(delegate) is tuple and self._tuple_is_valid(delegate): func, args, kwargs = delegate return threading.Thread(target=func, args=args, kwargs=kwargs) else: raise RuntimeError('Delegate does not have the required methods, is not callable, and is not a valid tuple.') def handle(self, msg): self._mic.send(msg) for calculate_seed, make_delegate, dict in self._delegate_records: id = calculate_seed(msg) if id is None: continue elif isinstance(id, collections.Hashable): if id not in dict or not dict[id].is_alive(): d = make_delegate((self, msg, id)) d = self._ensure_startable(d) dict[id] = d dict[id].start() else: d = make_delegate((self, msg, id)) d = self._ensure_startable(d) d.start()