from __future__ import annotations
from asyncio import as_completed, set_event_loop_policy
from itertools import repeat
from typing import (
AsyncGenerator
)
from typing import Dict, Optional, Any, Union, Tuple, Type, Iterable, cast, List
import aiohttp
from aiohttp import (
ClientTimeout,
ClientProxyConnectionError,
ServerDisconnectedError,
ClientConnectionError, ClientSession
)
from aiohttp.typedefs import LooseCookies
from glQiwiApi.core import AbstractParser
from glQiwiApi.core.constants import DEFAULT_TIMEOUT
from glQiwiApi.types import Response
from glQiwiApi.types.basics import Cached
from glQiwiApi.utils.exceptions import RequestError
_ProxyBasic = Union[str, Tuple[str, aiohttp.BasicAuth]]
_ProxyChain = Iterable[_ProxyBasic]
_ProxyType = Union[_ProxyChain, _ProxyBasic]
def _retrieve_basic(basic: _ProxyBasic) -> Dict[str, Any]:
from aiohttp_socks.utils import parse_proxy_url # type: ignore
proxy_auth: Optional[aiohttp.BasicAuth] = None
if isinstance(basic, str):
proxy_url = basic
else:
proxy_url, proxy_auth = basic
proxy_type, host, port, username, password = parse_proxy_url(proxy_url)
if isinstance(proxy_auth, aiohttp.BasicAuth):
username = proxy_auth.login
password = proxy_auth.password
return dict(
proxy_type=proxy_type,
host=host,
port=port,
username=username,
password=password,
rdns=True,
)
def _prepare_connector(
chain_or_plain: _ProxyType
) -> Tuple[Type["aiohttp.TCPConnector"], Dict[str, Any]]:
from aiohttp_socks import ChainProxyConnector, ProxyConnector, ProxyInfo # type: ignore
# since tuple is Iterable(compatible with _ProxyChain) object, we assume that
# user wants chained proxies if tuple is a pair of string(url) and BasicAuth
if isinstance(chain_or_plain, str) or (
isinstance(chain_or_plain, tuple) and len(chain_or_plain) == 2
):
chain_or_plain = cast(_ProxyBasic, chain_or_plain)
return ProxyConnector, _retrieve_basic(chain_or_plain)
chain_or_plain = cast(_ProxyChain, chain_or_plain)
infos: List[ProxyInfo] = []
for basic in chain_or_plain:
infos.append(ProxyInfo(**_retrieve_basic(basic)))
return ChainProxyConnector, dict(proxy_infos=infos)
[docs]class HttpXParser(AbstractParser):
"""
Aiohttp wrapper, implements the method of sending a request
"""
def __init__(self, proxy: Optional[_ProxyType] = None,
messages: Optional[Dict[str, str]] = None) -> None:
self.base_headers = {
'User-Agent': "glQiwiApi/1.0beta",
'Accept-Language': "ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7"
}
self.messages = messages
self._timeout = ClientTimeout(
total=5,
connect=None,
sock_connect=5,
sock_read=None
)
self._session: Optional[ClientSession] = None
self._connector_type: Type[aiohttp.TCPConnector] = aiohttp.TCPConnector
self._connector_init: Dict[str, Any] = {}
self._should_reset_connector = False # flag determines connector state
self._proxy: Optional[_ProxyType] = None
if proxy is not None:
try:
self._setup_proxy_connector(proxy)
except ImportError as exc: # pragma: no cover
raise RuntimeError(
"In order to use aiohttp client for proxy requests, install "
"https://pypi.org/project/aiohttp-socks/"
) from exc
def _setup_proxy_connector(self, proxy: _ProxyType) -> None:
self._connector_type, self._connector_init = _prepare_connector(proxy)
self._proxy = proxy
@property
def proxy(self) -> Optional[_ProxyType]:
return self._proxy
@proxy.setter
def proxy(self, proxy: _ProxyType) -> None:
self._setup_proxy_connector(proxy)
self._should_reset_connector = True
async def _make_request(
self,
url: str,
get_json: bool = False,
method: str = 'POST',
set_timeout: bool = True,
cookies: Optional[LooseCookies] = None,
json: Optional[dict] = None,
data: Optional[Dict[str, Union[
str, int, List[
Union[str, int]
]]]
] = None,
headers: Optional[dict] = None,
params: Optional[
Dict[str, Union[str, int, List[
Union[str, int]
]]]
] = None,
get_bytes: bool = False,
**kwargs) -> Union[Response, Cached]:
"""
Send request to some url. Method has a similar signature with the `aiohttp.request`
:param url: ссылка, куда вы хотите отправить ваш запрос
:param get_json: указывает на то, хотите ли вы получить ответ
в формате json
:param method: Тип запроса
:param data: payload data
:param set_timeout:
:param json:
:param cookies: куки запроса
:param headers: заголовки запроса
:param params:
:param get_bytes: указывает на то, хотите ли вы получить ответ
в байтах
:param kwargs:
:return: Response instance
"""
headers = headers or self.base_headers
# Create new session if old was closed
session = await self.create_session(
timeout=self._timeout if set_timeout else DEFAULT_TIMEOUT
)
# sending query to some endpoint url
try:
response = await session.request(
method=method,
url=url,
data=data,
headers=headers,
json=json if isinstance(json, dict) else None,
cookies=cookies,
params=params,
**kwargs
)
except (
ClientProxyConnectionError,
ServerDisconnectedError,
ClientConnectionError
):
raise self.make_exception(status_code=500)
# Get content and return response
try:
if get_json:
resp_data = await response.json(encoding="utf-8")
elif get_bytes:
resp_data = await response.read()
else:
resp_data = await response.text(encoding="utf-8")
except aiohttp.ContentTypeError as ex:
# For better traceback we raising a new exception from aiohttp.ContentTypeError
raise self.make_exception(
status_code=response.status,
traceback_info=ex.request_info
) from None
return Response(
status_code=response.status,
response_data=resp_data,
raw_headers=response.raw_headers,
cookies=response.cookies,
ok=response.ok,
content_type=response.content_type,
host=response.host,
url=response.url.__str__()
)
[docs] async def fetch(
self,
*,
times: int = 1,
**kwargs
) -> AsyncGenerator[Union[Response, Cached], None]:
"""
Basic usage: \n
parser = HttpXParser() \n
async for response in parser.fetch():
print(response)
:param times: quantity requests
:param kwargs: HttpXParser._request kwargs
:return:
"""
coroutines = [self._make_request(**kwargs) for _ in repeat(None, times)]
for future in as_completed(fs=coroutines):
yield await future
[docs] def fast(self) -> HttpXParser:
"""
Method to fetching faster with using faster event loop(uvloop) \n
USE IT ONLY ON LINUX SYSTEMS,
on Windows or Mac its dont give performance!
:return:
"""
try:
from uvloop import EventLoopPolicy # type: ignore
set_event_loop_policy(EventLoopPolicy())
except ImportError:
# Catching import error and forsake standard policy
from asyncio import DefaultEventLoopPolicy as EventLoopPolicy # type: ignore
set_event_loop_policy(EventLoopPolicy())
return self
[docs] async def create_session(self, **kwargs) -> aiohttp.ClientSession:
""" Creating new session if old was close or it's None """
if self.proxy is not None:
kwargs.update(connector=self._connector_type(**self._connector_init))
if self._should_reset_connector and isinstance(self._session, ClientSession):
await self._session.close()
if not isinstance(self._session, ClientSession):
self._session = ClientSession(**kwargs)
self._should_reset_connector = False
elif isinstance(self._session, ClientSession):
if self._session.closed:
self._session = ClientSession(**kwargs)
self._should_reset_connector = False
return self._session
[docs] async def close(self) -> None:
""" close aiohttp session"""
if isinstance(self._session, ClientSession):
if not self._session.closed:
await self._session.close()
async def stream_content(
self, url: str, timeout: int, chunk_size: int
) -> AsyncGenerator[bytes, None]:
session = await self.create_session()
async with session.get(url, timeout=timeout) as resp:
async for chunk in resp.content.iter_chunked(chunk_size):
yield chunk
[docs] def make_exception(
self,
status_code: int,
traceback_info: Optional[Union[aiohttp.RequestInfo, dict, str, bytes]] = None,
message: Optional[str] = None
) -> RequestError:
""" Raise :class:`RequestError` exception with pretty explanation """
from glQiwiApi import __version__
if not isinstance(message, str):
if isinstance(self.messages, dict):
message = self.messages.get(status_code, "Unknown") # type: ignore
return RequestError(
message,
status_code,
additional_info=f"{__version__} version api",
traceback_info=traceback_info
)