Source code for veryscrape.session

from aioauth_client import HmacSha1Signature
from aiohttp.client import _RequestContextManager
from collections import OrderedDict
from fake_useragent import UserAgent, settings
from hashlib import sha1
from time import time
from random import SystemRandom
from urllib.parse import urljoin, urlparse
import asyncio
import aiohttp
import logging
import re


log = logging.getLogger(__name__)
random = SystemRandom().random
# If fake_useragent can't connect then this allows it to finish early
settings.HTTP_DELAY = 0.01
settings.HTTP_RETRIES = 1
settings.HTTP_TIMEOUT = 0.1
_agent_factory = UserAgent(fallback='python:veryscrape')


def _create_nested_metadata(parent):
    request_metadata = {}
    for path, limit in parent.items():
        if isinstance(limit, dict):
            request_metadata[path] = _create_nested_metadata(limit)
        else:
            request_metadata[path] = {'n': 0, 'last': time()}
    return request_metadata


[docs]class RateLimiter: def __init__(self, rate_limits, period): self.rate_limit_period = period self.rate_limits = OrderedDict() defined_limits = rate_limits.copy() global_limit = defined_limits.pop('*', None) # sorts paths alphabetically for deterministic execution pre-python3.6 for k in sorted(list(defined_limits.keys())): self.rate_limits[k] = defined_limits[k] # set global rate limit last so it is always checked last if global_limit: assert isinstance(global_limit, int), \ 'Global rate limit must be defined directly with an integer' self.rate_limits['*'] = global_limit self._rl_meta = _create_nested_metadata(self.rate_limits)
[docs] def get_limit(self, url, parent=None, metadata=None): parent = parent or self.rate_limits metadata = metadata or self._rl_meta for path, limit in parent.items(): if re.search(path, url): if isinstance(limit, dict): return self.get_limit(url, parent=limit, metadata=metadata[path]) return limit, metadata[path] else: return False, {}
[docs] def refresh_limits(self, rate_limit, metadata): time_since = time() - metadata['last'] metadata['n'] = max(0, metadata['n'] - int( time_since * rate_limit / self.rate_limit_period))
async def _wait_limit(self, rate_limit, metadata): while metadata['n'] >= rate_limit: self.refresh_limits(rate_limit, metadata) # wait 1/1000 the time of one request to try again await asyncio.sleep(self.rate_limit_period / rate_limit / 1000)
[docs] async def wait_limit(self, url): rate_limit, metadata = self.get_limit(url) if rate_limit: self.refresh_limits(rate_limit, metadata) await self._wait_limit(rate_limit, metadata) metadata['last'] = time() metadata['n'] += 1
[docs]class OAuth1: def __init__(self, client, secret, token, token_secret): self.signature = HmacSha1Signature() self.client = client self.secret = secret self.token = token self.token_secret = token_secret @property def oauth_params(self): """ Returns dictionary of oauth1 parameters required for oauth1 signed http request """ return { 'oauth_consumer_key': self.client, 'oauth_token': self.token, 'oauth_signature_method': self.signature.name, 'oauth_nonce': sha1(str(random()).encode('ascii')).hexdigest(), 'oauth_timestamp': str(int(time())), 'oauth_version': '1.0' }
[docs] async def patch_request(self, method, url, params, kwargs): params.update(self.oauth_params) params['oauth_signature'] = self.signature.sign( self.secret, method, url, self.token_secret, **params) return url, params, kwargs
[docs]class OAuth2: def __init__(self, client, secret, token_url): self.client = client self.secret = secret self.token_url = token_url self.auth = aiohttp.BasicAuth(self.client, self.secret) self.token = None self.token_expiry = 0 @property def oauth2_token_expired(self): """Returns true if current oauth2 token needs to be refreshed""" return ( self.token is None or ( self.token_expiry and time() >= self.token_expiry ) )
[docs] async def auth_token(self): """ Returns dictionary of oauth2 parameters required for oauth2 signed http request """ if self.oauth2_token_expired: async with aiohttp.ClientSession() as sess: async with sess.post( self.token_url, data={'grant_type': 'client_credentials'}, auth=self.auth ) as resp: auth = await resp.json() self.token = auth['access_token'] try: self.token_expiry = int(time()) + int(auth['expires_in']) except KeyError: self.token_expiry = 0 return {'Authorization': 'bearer ' + self.token}
[docs] async def patch_request(self, method, url, params, kwargs): auth = await self.auth_token() if kwargs.get('headers', None) is None: kwargs['headers'] = {} kwargs['headers'].update(auth) return url, params, kwargs
[docs]class Session: rate_limits = {} rate_limit_period = 60 persist_user_agent = True user_agent = None error_on_failure = True # Is FetchError raised when Session.fetch fails retries_to_error = 5 # Number of retries before failing sleep_increment = 15 # Time to sleep between failed requests def __init__(self, *args, proxy_pool=None, **kwargs): self.limiter = RateLimiter(self.rate_limits, self.rate_limit_period) self._pool = proxy_pool self._session = aiohttp.ClientSession(**kwargs) # This is so you can call get, post, etc... without having to recode # aiohttp uses _request internally for everything, so that is saved, # and calls to aiohttp's _request are sent to _request of this class, # which uses the saved _request at the end of the method self._original_request = self._session._request self._session.__dict__['_request'] = self._request def __getattr__(self, item): try: return self.__getattribute__(item) except AttributeError: return self._session.__getattribute__(item) async def __aenter__(self): self._session = await self._session.__aenter__() return self async def __aexit__(self, exc_type, exc_val, exc_tb): return await self._session.__aexit__(exc_type, exc_val, exc_tb) @property def _user_agent(self): """ Creates new random user agent, sets self.user agent and returns, otherwise returns current user agent """ if self.user_agent is None or not self.persist_user_agent: self.user_agent = _agent_factory.random return self.user_agent else: return self.user_agent async def _request(self, method, url, **kwargs): await self.limiter.wait_limit(url) if kwargs.get('headers', None) is None: kwargs['headers'] = {} kwargs['headers'].update({'user-agent': self._user_agent}) if self._pool is not None: proxy = await self._pool.get(scheme=urlparse(url).scheme) kwargs.update(proxy='http://%s:%d' % (proxy.host, proxy.port)) log.debug('Requesting: \n\tMETHOD=%s, \n\tURL=%s, \n\tKWARGS=%s', method, url, str(kwargs)) return await self._original_request(method, url, **kwargs)
[docs] def request(self, method, url, **kwargs): return _RequestContextManager(self._request(method, url, **kwargs))
[docs] async def fetch(self, method, url, *, params=None, stream_func=None, **kwargs): result = '' count = 0 success = False kwargs.update(params=params, timeout=kwargs.pop('timeout', 10)) while not success and count <= self.retries_to_error: async with self.request(method, url, **kwargs) as resp: try: try: resp.raise_for_status() except asyncio.TimeoutError: await asyncio.sleep(count * self.sleep_increment) except aiohttp.ClientResponseError: try: error_text = await resp.text() except Exception as e: error_text = repr(e) log.error('ERROR %d requesting %-20s - %20s', resp.status, url, error_text[:20]) await self.on_error(resp.status) else: if stream_func is not None: async for line in resp.content: stream_func(line) else: result = await resp.text() success = True except Exception as e: log.error("EXCEPTION requesting %-20s: %20s", url, repr(e)[:20]) await asyncio.sleep(count * self.sleep_increment) count += 1 if not success and self.error_on_failure: raise FetchError return result
[docs] async def on_error(self, error_code): """ This is called when a request returns a non-200 status code. :param error_code: status code of http request """ return
[docs]class OAuth1Session(Session): base_url = None _patcher = OAuth1 def __init__(self, *args, **kwargs): super(OAuth1Session, self).__init__(**kwargs) self.patcher = self._patcher(*args) async def _request(self, method, url, **kwargs): if self.base_url is not None and not url.startswith('http'): url = urljoin(self.base_url, url) url, params, kwargs = await self.patcher.patch_request( method, url, kwargs.pop('params', None) or {}, kwargs ) return await super(OAuth1Session, self)._request( method, url, params=params, **kwargs )
[docs]class OAuth2Session(OAuth1Session): _patcher = OAuth2
[docs]class FetchError(Exception): """ Exception raised when a fetch request fails despite retries This is used for flow control of circuit-breaker logic in Scraper """