Source code for glQiwiApi.utils.executor

"""
Managing polling and webhooks
"""
from __future__ import annotations

import asyncio
import inspect
import logging
import types
from datetime import datetime, timedelta
from typing import Union, Optional, List, \
    Callable, Awaitable, Dict, TypeVar, TYPE_CHECKING, Coroutine, Any, cast

from aiohttp import ClientTimeout, web

from glQiwiApi.core.builtin import BaseProxy, logger
from glQiwiApi.core.constants import DEFAULT_TIMEOUT
from glQiwiApi.core.web_hooks import server
from glQiwiApi.core.web_hooks.config import Path
from glQiwiApi.core.web_hooks.dispatcher import Dispatcher
from glQiwiApi.types import Transaction
from glQiwiApi.utils.exceptions import NoUpdatesToExecute

__all__ = ['start_webhook', 'start_polling']

if TYPE_CHECKING:
    from glQiwiApi.qiwi.client import QiwiWrapper

T = TypeVar("T")


[docs]def start_webhook(client: QiwiWrapper, *, host: str = "localhost", port: int = 8080, path: Optional[Path] = None, on_startup: Optional[ Callable[ [QiwiWrapper], Awaitable[None] ]] = None, on_shutdown: Optional[ Callable[ [QiwiWrapper], Awaitable[None] ]] = None, tg_app: Optional[BaseProxy] = None, app: Optional["web.Application"] = None): """ Blocking function that listens for webhooks :param client: :param host: server host :param port: server port that open for tcp/ip trans. :param path: path for qiwi that will send requests :param app: pass web.Application :param on_startup: coroutine,which will be executed on startup :param on_shutdown: coroutine, which will be executed on shutdown :param tg_app: builtin TelegramWebhookProxy or other or class, that inherits from BaseProxy and deal with aiogram updates """ executor = Executor(client, tg_app=tg_app) _setup_callbacks(executor, on_startup, on_shutdown) executor.start_webhook(host=host, port=port, path=path, app=app)
[docs]def start_polling(client: QiwiWrapper, *, get_updates_from: datetime = datetime.now(), timeout: Union[float, int, ClientTimeout] = 5, on_startup: Optional[ Callable[ [QiwiWrapper], Any ]] = None, on_shutdown: Optional[ Callable[ [QiwiWrapper], Any ]] = None, tg_app: Optional[BaseProxy] = None): """ Setup for long-polling mode :param client: :param get_updates_from: date from which will be polling, if it's None, polling will skip all updates :param timeout: timeout of polling in seconds, if the timeout is too small, the API can throw an exception :param on_startup: function or coroutine, which will be executed on startup :param on_shutdown: function or coroutine, which will be executed on shutdown :param tg_app: builtin TelegramPollingProxy or other class, that inherits from BaseProxy, deal with aiogram updates """ executor = Executor(client, tg_app=tg_app) _setup_callbacks(executor, on_startup, on_shutdown) executor.start_polling( get_updates_from=get_updates_from, timeout=timeout )
async def _inspect_and_execute_callback(client: "QiwiWrapper", callback: Callable[[QiwiWrapper], Any]): if inspect.iscoroutinefunction(callback): await callback(client) else: callback(client) def _setup_callbacks( executor: Executor, on_startup: Optional[Callable] = None, on_shutdown: Optional[Callable] = None ): """ Function, which setup callbacks and set it to dispatcher object :param executor: :param on_startup: :param on_shutdown: """ if on_startup is not None: executor["on_startup"] = on_startup if on_shutdown is not None: executor["on_shutdown"] = on_shutdown def parse_timeout( timeout: Union[float, int, ClientTimeout] ) -> float: """ Parse timeout :param timeout: """ if isinstance(timeout, float): return timeout elif isinstance(timeout, int): return float(timeout) elif isinstance(timeout, ClientTimeout): return timeout.total or DEFAULT_TIMEOUT.total # type: ignore else: raise TypeError("Timeout must be float, int or ClientTimeout. You have " f"passed on {type(timeout)}") class Executor: """ Provides normal work of webhooks and polling """ def __init__(self, client: QiwiWrapper, tg_app: Optional[BaseProxy], loop: Optional[asyncio.AbstractEventLoop] = None): """ :param client: instance of BaseWrapper :param tg_app: optional proxy to connect aiogram polling/webhook mode """ if loop is not None: self._loop = loop self.dispatcher: Dispatcher = client.dispatcher self._logger_config: Dict[str, Union[List[logging.Handler], int]] = { "handlers": [logger.InterceptHandler()], "level": logging.DEBUG } self.tg_app: Optional[BaseProxy] = tg_app self._polling: bool = False self.offset: int = 10 ** 6 self.offset_start_date: Optional[datetime] = None self.offset_end_date: Optional[datetime] = None self.client: QiwiWrapper = client self._on_startup_calls: List[Callable] = [] self._on_shutdown_calls: List[Callable] = [] # add wrapper to context from glQiwiApi import QiwiWrapper QiwiWrapper.set_current(client) if isinstance(self.tg_app, BaseProxy): client["dispatcher"] = self.tg_app.dispatcher @property def loop(self) -> asyncio.AbstractEventLoop: return cast(asyncio.AbstractEventLoop, getattr(self, "_loop", asyncio.get_event_loop())) def __setitem__(self, key: str, callback: Callable): if key not in ["on_shutdown", "on_startup"]: raise TypeError("to __setitem__ you can only pass callbacks") if not isinstance(callback, types.FunctionType): raise TypeError("Invalid type of callback, expected function, got %s" % type(callback)) if key == "on_shutdown": self._on_shutdown_calls.append(callback) else: self._on_startup_calls.append(callback) async def _pre_process(self, get_updates_from: Optional[datetime]): """ Preprocess method, which set start date and end date of polling :param get_updates_from: date from which will be polling """ try: current_time = datetime.now() assert isinstance(get_updates_from, datetime) assert ( current_time - get_updates_from ).total_seconds() > 0 except AssertionError as ex: raise TypeError( "Invalid value of get_updates_from, it must " f"be instance of datetime and no more than the current time," f" got {type(get_updates_from)}" ) from ex self.offset_end_date = current_time if self.offset_start_date is None: self.offset_start_date = get_updates_from else: self.offset_start_date = current_time - timedelta(milliseconds=1) async def _get_history(self) -> List[Transaction]: """ Get history by call 'transactions' method from QiwiWrapper. If history is empty or not all transactions not isinstance class Transaction - raise exception """ history = await self.client.transactions( end_date=self.offset_end_date, start_date=self.offset_start_date ) if not history or not all( isinstance(txn, Transaction) for txn in history): raise NoUpdatesToExecute() return history async def _pool_process( self, get_updates_from: Optional[datetime] ): """ Method, which manage pool process :param get_updates_from: date from which will be polling """ await self._pre_process(get_updates_from) try: history: List[Transaction] = await self._get_history() except NoUpdatesToExecute: return last_payment: Transaction = history[0] last_txn_id: int = last_payment.transaction_id if self.offset is None: first_payment: Transaction = history[-1] self.offset = first_payment.transaction_id - 1 await self._parse_history_and_process_events( history=history, last_payment_id=last_txn_id ) async def _start_polling(self, **kwargs): """ Blocking method, which start polling process :param kwargs: """ self._polling = True timeout: float = parse_timeout(kwargs.pop("timeout")) while self._polling: try: await self._pool_process(**kwargs) except Exception as ex: self.dispatcher.logger.error( "Handle `%s`. Sleeping %s seconds", repr(ex), timeout + 100 ) timeout += 100 await asyncio.sleep(timeout) def _on_shutdown(self, loop: asyncio.AbstractEventLoop): """ On shutdown, we gracefully cancel all tasks, close event loop and call `close` method to clear resources """ coroutines: List[Coroutine] = [self.goodbye(), self.client.close()] if isinstance(self.tg_app, BaseProxy): coroutines.append(self._shutdown_tg_app()) loop.run_until_complete( asyncio.gather(*coroutines, loop=loop) ) async def _shutdown_tg_app(self): """ Gracefully shutdown tg application """ self.tg_app.dispatcher.stop_polling() await self.tg_app.dispatcher.storage.close() await self.tg_app.dispatcher.storage.wait_closed() await self.tg_app.dispatcher.bot.session.close() async def _parse_history_and_process_events( self, history: List[Transaction], last_payment_id: int ): """ Processing events and send callbacks to handlers :param history: [list] list of :class:`Transaction` :param last_payment_id: id of last payment in history """ history_iterator = iter(history[::-1]) while self.offset < last_payment_id: try: payment = next(history_iterator) await self.dispatcher.process_event(payment) self.offset = payment.transaction_id self.offset_start_date = self.offset_end_date except StopIteration: # handle exhausted iterator break def start_polling( self, *, get_updates_from: datetime = datetime.now(), timeout: Union[float, int, ClientTimeout] = DEFAULT_TIMEOUT ): loop: asyncio.AbstractEventLoop = self.loop try: loop.run_until_complete(self.welcome()) loop.create_task(self._start_polling( get_updates_from=get_updates_from, timeout=timeout )) if isinstance(self.tg_app, BaseProxy): self.tg_app.setup(loop=loop) loop.run_forever() except (SystemExit, KeyboardInterrupt): # pragma: no cover # Allow to graceful shutdown pass finally: self._polling = False self._on_shutdown(loop=loop) def start_webhook( self, *, host: str = "localhost", port: int = 8080, path: Optional[Path] = None, app: Optional[web.Application] = None ): loop: asyncio.AbstractEventLoop = self.loop application = app or web.Application() hook_config, key = loop.run_until_complete(self.client.bind_webhook()) server.setup( dispatcher=self.dispatcher, app=application, path=path, secret_key=self.client.secret_p2p, base64_key=key, tg_app=self.tg_app, host=host ) try: loop.run_until_complete(self.welcome()) web.run_app(application, host=host, port=port) except (KeyboardInterrupt, SystemExit): # Allow to graceful shutdown pass finally: self._on_shutdown(loop=loop) async def welcome(self) -> None: """ Execute on_startup callback""" self.dispatcher.logger.debug("Start polling!") for callback in self._on_startup_calls: await _inspect_and_execute_callback( callback=callback, client=self.client ) async def goodbye(self) -> None: """ Execute on_shutdown callback """ self.dispatcher.logger.debug("Goodbye!") for callback in self._on_shutdown_calls: await _inspect_and_execute_callback( callback=callback, client=self.client )