Source code for glQiwiApi.utils.basics

import asyncio
import base64
import binascii
import concurrent.futures as futures
import datetime
import functools as ft
import hashlib
import hmac
import pathlib
import re
import time
from contextvars import ContextVar
from copy import deepcopy

import aiofiles
import pytz
from pydantic import ValidationError, BaseModel
from pytz.reference import LocalTimezone

try:
    import orjson  # type: ignore
except (ModuleNotFoundError, ImportError):  # pragma: no cover # type: ignore
    import json as orjson

# Gets your local time zone
Local = LocalTimezone()


def measure_time(func):
    """
    Декоратор для замера времени выполнения функции

    :param func:
    """

    @ft.wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.monotonic()
        result = await func(*args, **kwargs)
        execute_time = time.monotonic() - start_time
        print(
            f'Function `{func.__name__}` executed for {execute_time} secs')
        return result

    return wrapper


def datetime_to_str_in_iso(obj, *, yoo_money_format=False):
    """
    Converts a date to a standard format for API's

    :param obj: datetime object to parse to string
    :param yoo_money_format: boolean
    :return: string - parsed date
    """
    if not isinstance(obj, datetime.datetime):
        return ''
    if yoo_money_format:
        # Приводим время к UTC,
        # так как yoomoney апи принимает именно в таком формате
        return pytz.utc.localize(obj).replace(tzinfo=None).isoformat(
            ' ').replace(" ", "T") + "Z"
    local_date = str(datetime.datetime.now(tz=Local))
    pattern = re.compile(r'[+]\d{2}[:]\d{2}')
    from_pattern = re.findall(pattern, local_date)[0]
    return obj.isoformat(' ').replace(" ", "T") + from_pattern


def parse_auth_link(response_data):
    """
    Parse link for getting code, which needs to be entered in the method
    get_access_token

    :param response_data:
    """
    regexp = re.compile(
        r'https://yoomoney.ru/oauth2/authorize[?]requestid[=]\w+'
    )
    return re.findall(regexp, str(response_data))[0]


def check_dates(start_date, end_date, payload_data):
    """ Check correctness of transferred dates and add it to request """
    if isinstance(
            start_date, (datetime.datetime, datetime.timedelta)
    ) and isinstance(
        end_date, (datetime.datetime, datetime.timedelta)
    ):
        if (end_date - start_date).total_seconds() > 0:
            payload_data.update(
                {
                    'startDate': datetime_to_str_in_iso(
                        start_date
                    ),
                    'endDate': datetime_to_str_in_iso(
                        end_date
                    )
                }
            )
        else:
            raise RuntimeError(
                'end_date не может быть больше чем start_date'
            )
    return payload_data


def parse_commission_request_payload(
        default_data,
        auth_maker,
        pay_sum,
        to_account
):
    """Set commission payload"""
    payload = deepcopy(default_data)
    payload.headers = auth_maker(payload.headers)
    payload.json['purchaseTotals']['total']['amount'] = pay_sum
    payload.json['account'] = to_account
    return payload, "99" if len(to_account) <= 15 else None


def parse_card_data(
        default_data,
        trans_sum,
        to_card,
        auth_maker,
):
    """Set card data payload"""
    data = deepcopy(default_data)
    data.json['sum']['amount'] = trans_sum
    data.json['fields']['account'] = to_card
    data.headers = auth_maker(headers=data.headers)
    return data


def parse_headers(content_json=False, auth=False):
    """
    Функция для добавления некоторых заголовков в запрос
    """
    headers = {
        'Host': 'yoomoney.ru',
        'Content-Type': 'application/x-www-form-urlencoded'
    }
    if content_json:
        headers.update(
            {'Accept': 'application/json'}
        )
    if auth:
        headers.update(
            {'Authorization': 'Bearer {token}'}
        )
    return headers


def set_data_to_wallet(data, to_number, trans_sum, comment, currency):
    """
    Setting data for "wallet to wallet" transfer

    :param data:
    :param trans_sum:
    :param to_number:
    :param comment:
    :param currency:
    """
    data.json['sum']['amount'] = str(trans_sum)
    data.json['sum']['currency'] = currency
    data.json['fields']['account'] = to_number
    data.json['comment'] = comment
    data.headers.update({'User-Agent': 'Android v3.2.0 MKT'})
    return data


def set_data_p2p_create(wrapped_data, amount, life_time, comment, theme_code,
                        pay_source_filter):
    """
    Setting data for p2p form creation transfer

    :param wrapped_data:
    :param amount:
    :param life_time:
    :param comment:
    :param theme_code:
    :param pay_source_filter:
    """
    wrapped_data.json['amount']['value'] = str(amount)
    wrapped_data.json['comment'] = comment
    wrapped_data.json['expirationDateTime'] = life_time
    if isinstance(pay_source_filter, list):
        source_filters = " ".join(pay_source_filter).replace(" ", ",")
        wrapped_data.json['customFields'][
            'paySourcesFilter'
        ] = source_filters
    if isinstance(theme_code, str):
        wrapped_data.json['customFields']['themeCode'] = theme_code
    if not isinstance(theme_code, str) and pay_source_filter not in [
        'qw',
        'card',
        'mobile'
    ]:
        wrapped_data.json.pop('customFields')
    return wrapped_data.json


#
def multiply_objects_parse(lst_of_objects, model):
    """
    Function to handle list of objects

    :param lst_of_objects: usually its response.response_data
    :param model: pydantic model, which will parse data
    """
    objects = []
    for obj in lst_of_objects:
        try:
            if isinstance(obj, dict):
                obj: str
                objects.append(model.parse_obj(obj))
                continue
            for detached_obj in obj:
                objects.append(model.parse_obj(detached_obj))
        except ValidationError as ex:
            print(ex.json(indent=4))
    return objects


def simple_multiply_parse(lst_of_objects, model):
    """
    Parse simple objects, which cant raise ValidationError

    :param lst_of_objects: usually its response.response_data
    :param model: pydantic model, which will parse data
    """
    objects = []
    for obj in lst_of_objects:
        objects.append(model.parse_obj(obj))
    return objects


def take_event_loop(set_debug: bool = False):
    """
    Get new or running event loop
    !THIS FUNCTION IS IRRELEVANT, DON'T USE IT!

    :param set_debug:
    """
    try:
        loop = asyncio.get_running_loop()
        if loop.is_closed():
            raise RuntimeError()
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    loop.set_debug(set_debug)
    return loop


def hmac_key(key, amount, status, bill_id, site_id):
    """
    Функция расшифровки подписи webhook

    :param key: ключ webhook, закодированный в Base64
    :param amount: сумма p2p платежа
    :param status: статус платежа
    :param bill_id: unique p2p id
    :param site_id:
    """
    byte_key = binascii.unhexlify(key)
    invoice_params = (
        f"{amount.currency}|{amount.value}|{bill_id}|{site_id}|{status.value}"
    ).encode("utf-8")

    return hmac.new(
        byte_key, invoice_params, hashlib.sha256
    ).hexdigest().upper()


def hmac_for_transaction(
        webhook_key_base64,
        amount,
        txn_type,
        account,
        txn_id,
        txn_hash
):
    invoice_params = (
        f"{amount.currency}|{amount.amount}|{txn_type}|{account}|{txn_id}"
    )
    webhook_key = base64.b64decode(bytes(webhook_key_base64, 'utf-8'))
    return hmac.new(webhook_key, invoice_params.encode('utf-8'),
                    hashlib.sha256).hexdigest() == txn_hash


def custom_load(data):
    """
    Custom loads for each pydantic model, because
    it guard API from different errors

    :param data: class data
    """
    return orjson.loads(orjson.dumps(data))


class Parser(BaseModel):
    """ Модель pydantic для перевода строки в datetime """
    dt: datetime.datetime


def allow_response_code(status_code):
    """
    Декоратор, который позволяет разрешить определенный код ответа от апи

    :param status_code: статус код, который будет рассмотрен как правильный,
     кроме 200
    """

    def wrap_func(func):
        async def wrapper(*args, **kwargs):
            from glQiwiApi import RequestError
            try:
                await func(*args, **kwargs)
            except RequestError as error:
                if error.status_code == str(status_code):
                    info = error.traceback_info
                    return {"success": True} if not info else info
                return {"success": False}

        return wrapper

    return wrap_func


def qiwi_master_data(ph_number, data):
    payload = deepcopy(data)
    payload["fields"]["account"] = ph_number
    return payload


def to_datetime(string_representation):
    """
    Вспомогательная функция для перевода строки во время

    :param string_representation: дата в виде строки
    :return: datetime representation
    """
    try:
        dictionary = {'dt': string_representation}
        return Parser.parse_obj(dictionary).dt
    except (ValidationError, orjson.JSONDecodeError) as ex:
        return ex.json(indent=4)


def new_card_data(ph_number, order_id, data):
    payload = deepcopy(data)
    url = "https://edge.qiwi.com" + "/sinap/test_api/v2/terms/32064/payments"
    payload["fields"].pop("vas_alias")
    payload["fields"].update(order_id=order_id)
    payload["fields"]["account"] = ph_number
    return url, payload


def sync_measure_time(func):
    """
    Decorator, which measures time your synchronous functions

    :param func:
    """

    @ft.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.monotonic()
        result = func(*args, **kwargs)
        execute_time = time.monotonic() - start_time
        print(
            f'Function `{func.__name__}` executed for {execute_time} secs')
        return result

    return wrapper


def parse_amount(txn_type, txn):
    amount = txn.amount if txn_type == 'in' else txn.amount_due
    comment = txn.comment if txn_type == 'in' else txn.message
    return amount, comment


def check_params(amount_, amount, txn, transaction_type):
    if amount is not None:
        if amount_ >= amount:
            if txn.direction == transaction_type:
                return True
    return False


def run_forever_safe(loop) -> None:
    """ run a loop for ever and clean up after being stopped """

    loop.run_forever()
    # NOTE: loop.run_forever returns after calling loop.stop

    safe_cancel(loop=loop)


def safe_cancel(loop) -> None:
    """cancel all tasks and close the loop gracefully"""

    loop_tasks_all = asyncio.all_tasks(loop=loop)

    for task in loop_tasks_all:
        task.cancel()
    # NOTE: `cancel` does not guarantee that the task will be cancelled

    for task in loop_tasks_all:
        if not (task.done() or task.cancelled()):
            try:
                # wait for task cancellations
                loop.run_until_complete(task)
            except asyncio.CancelledError:
                pass
    # Finally, close event loop
    loop.close()


[docs]def _await_sync(future): """ synchronously waits for a task """ return future.result()
def _cancel_future(loop, future, executor) -> None: """ cancels future if any exception occurred """ executor.submit(loop.call_soon_threadsafe, future.cancel) def _stop_loop(loop) -> None: """ stops an event loop """ loop.stop() class AdaptiveExecutor(futures.ThreadPoolExecutor): """ object: AdaptiveExecutor """ def __init__(self, max_workers=None, **kwargs): super().__init__(max_workers, 'sync_adapter_', **kwargs) self.max_workers = max_workers self.is_from_running_loop = ContextVar('Adapter_', default=False) def _construct_all(): """ Get or create new event loop """ loop = take_event_loop() executor = AdaptiveExecutor() loop.set_default_executor(executor) return loop, executor def _on_shutdown(executor, loop): """ Do some cleanup """ if not executor.is_from_running_loop.get(): loop.call_soon_threadsafe(_stop_loop, loop) executor.shutdown(wait=True)
[docs]def sync(func, *args, **kwargs): """ Function to execute async functions synchronously :param func: Async function, which you want to execute in synchronous code :param args: args, which need your async func :param kwargs: kwargs, which need your async func """ # construct an event loop and executor loop, executor = _construct_all() # Run our coroutine thread safe wrapped_future = asyncio.run_coroutine_threadsafe( func(*args, **kwargs), loop=loop ) # Run loop.run_forever(), but do it safely executor.submit(run_forever_safe, loop) try: # Get result return _await_sync(wrapped_future) finally: # Cleanup _on_shutdown(executor, loop)
def async_as_sync(func): """ Decorator, which use `sync` function to implement async to sync """ @ft.wraps(func) def wrapper(*args, **kwargs): return sync(func, *args, **kwargs) return wrapper def check_transaction( transactions, amount, transaction_type, sender, comment ): for txn in transactions: if float(txn.sum.amount) >= amount: if txn.type == transaction_type: if txn.comment == comment and txn.to_account == sender: return True elif comment and sender: continue elif txn.to_account == sender: return True elif sender: continue elif txn.comment == comment: return True return False def parse_limits(response, model): limits = {} for code, limit in response.response_data.get("limits").items(): if not limit: continue limits.update({code: model.parse_obj(limit[0])}) return limits def override_error_messages(status_codes): """ Function, which helps to override error message :param status_codes: special dictionary """ def functor(func): @ft.wraps(func) async def wrapper(*args, **kwargs): from glQiwiApi import RequestError try: return await func(*args, **kwargs) except RequestError as ex: if int(ex.status_code) in status_codes.keys(): error = status_codes.get(int(ex.status_code)) ex = RequestError( message=error.get("message"), status_code=ex.status_code, traceback_info=error.get("json_info"), additional_info=ex.additional_info ) raise ex return wrapper return functor def check_api_method(api_method): if not isinstance(api_method, str): raise RuntimeError(f"Invalid type of api_method(must be string)." f" Passed {type(api_method)}") def check_dates_for_statistic_request(start_date, end_date): first_expression = isinstance( start_date, datetime.datetime ) and isinstance(end_date, datetime.datetime) second_expression = isinstance( start_date, datetime.timedelta ) and isinstance(end_date, datetime.timedelta) if first_expression or second_expression: delta: datetime.timedelta = end_date - start_date if delta.days > 90: raise ValueError( 'Максимальный период для выгрузки' ' статистики - 90 календарных дней.' ) else: raise ValueError( 'Вы передали значения начальной ' 'и конечной даты в неправильном формате.' ) async def save_file(dir_path, file_name, data): """ Saving file in dir_path/file_name.pdf with some data :param dir_path: :param file_name: :param data: """ if isinstance(dir_path, str): dir_path = pathlib.Path(dir_path) if not dir_path.is_dir(): raise RuntimeError("Invalid path to save, its not directory!") path_to_file: pathlib.Path = dir_path / (file_name + '.pdf') async with aiofiles.open(path_to_file, 'wb') as file: return await file.write(data)