from aioauth_client import HmacSha1Signature
from aiohttp.client import _RequestContextManager
from collections import OrderedDict
from fake_useragent import UserAgent, settings
from hashlib import sha1
from time import time
from random import SystemRandom
from urllib.parse import urljoin, urlparse
import asyncio
import aiohttp
import logging
import re
log = logging.getLogger(__name__)
random = SystemRandom().random
# If fake_useragent can't connect then this allows it to finish early
settings.HTTP_DELAY = 0.01
settings.HTTP_RETRIES = 1
settings.HTTP_TIMEOUT = 0.1
_agent_factory = UserAgent(fallback='python:veryscrape')
def _create_nested_metadata(parent):
request_metadata = {}
for path, limit in parent.items():
if isinstance(limit, dict):
request_metadata[path] = _create_nested_metadata(limit)
else:
request_metadata[path] = {'n': 0, 'last': time()}
return request_metadata
[docs]class RateLimiter:
def __init__(self, rate_limits, period):
self.rate_limit_period = period
self.rate_limits = OrderedDict()
defined_limits = rate_limits.copy()
global_limit = defined_limits.pop('*', None)
# sorts paths alphabetically for deterministic execution pre-python3.6
for k in sorted(list(defined_limits.keys())):
self.rate_limits[k] = defined_limits[k]
# set global rate limit last so it is always checked last
if global_limit:
assert isinstance(global_limit, int), \
'Global rate limit must be defined directly with an integer'
self.rate_limits['*'] = global_limit
self._rl_meta = _create_nested_metadata(self.rate_limits)
[docs] def get_limit(self, url, parent=None, metadata=None):
parent = parent or self.rate_limits
metadata = metadata or self._rl_meta
for path, limit in parent.items():
if re.search(path, url):
if isinstance(limit, dict):
return self.get_limit(url, parent=limit,
metadata=metadata[path])
return limit, metadata[path]
else:
return False, {}
[docs] def refresh_limits(self, rate_limit, metadata):
time_since = time() - metadata['last']
metadata['n'] = max(0, metadata['n'] - int(
time_since * rate_limit / self.rate_limit_period))
async def _wait_limit(self, rate_limit, metadata):
while metadata['n'] >= rate_limit:
self.refresh_limits(rate_limit, metadata)
# wait 1/1000 the time of one request to try again
await asyncio.sleep(self.rate_limit_period / rate_limit / 1000)
[docs] async def wait_limit(self, url):
rate_limit, metadata = self.get_limit(url)
if rate_limit:
self.refresh_limits(rate_limit, metadata)
await self._wait_limit(rate_limit, metadata)
metadata['last'] = time()
metadata['n'] += 1
[docs]class OAuth1:
def __init__(self, client, secret, token, token_secret):
self.signature = HmacSha1Signature()
self.client = client
self.secret = secret
self.token = token
self.token_secret = token_secret
@property
def oauth_params(self):
"""
Returns dictionary of oauth1 parameters
required for oauth1 signed http request
"""
return {
'oauth_consumer_key': self.client,
'oauth_token': self.token,
'oauth_signature_method': self.signature.name,
'oauth_nonce': sha1(str(random()).encode('ascii')).hexdigest(),
'oauth_timestamp': str(int(time())),
'oauth_version': '1.0'
}
[docs] async def patch_request(self, method, url, params, kwargs):
params.update(self.oauth_params)
params['oauth_signature'] = self.signature.sign(
self.secret, method, url, self.token_secret, **params)
return url, params, kwargs
[docs]class OAuth2:
def __init__(self, client, secret, token_url):
self.client = client
self.secret = secret
self.token_url = token_url
self.auth = aiohttp.BasicAuth(self.client, self.secret)
self.token = None
self.token_expiry = 0
@property
def oauth2_token_expired(self):
"""Returns true if current oauth2 token needs to be refreshed"""
return (
self.token is None
or (
self.token_expiry
and time() >= self.token_expiry
)
)
[docs] async def auth_token(self):
"""
Returns dictionary of oauth2 parameters
required for oauth2 signed http request
"""
if self.oauth2_token_expired:
async with aiohttp.ClientSession() as sess:
async with sess.post(
self.token_url,
data={'grant_type': 'client_credentials'},
auth=self.auth
) as resp:
auth = await resp.json()
self.token = auth['access_token']
try:
self.token_expiry = int(time()) + int(auth['expires_in'])
except KeyError:
self.token_expiry = 0
return {'Authorization': 'bearer ' + self.token}
[docs] async def patch_request(self, method, url, params, kwargs):
auth = await self.auth_token()
if kwargs.get('headers', None) is None:
kwargs['headers'] = {}
kwargs['headers'].update(auth)
return url, params, kwargs
[docs]class Session:
rate_limits = {}
rate_limit_period = 60
persist_user_agent = True
user_agent = None
error_on_failure = True # Is FetchError raised when Session.fetch fails
retries_to_error = 5 # Number of retries before failing
sleep_increment = 15 # Time to sleep between failed requests
def __init__(self, *args, proxy_pool=None, **kwargs):
self.limiter = RateLimiter(self.rate_limits, self.rate_limit_period)
self._pool = proxy_pool
self._session = aiohttp.ClientSession(**kwargs)
# This is so you can call get, post, etc... without having to recode
# aiohttp uses _request internally for everything, so that is saved,
# and calls to aiohttp's _request are sent to _request of this class,
# which uses the saved _request at the end of the method
self._original_request = self._session._request
self._session.__dict__['_request'] = self._request
def __getattr__(self, item):
try:
return self.__getattribute__(item)
except AttributeError:
return self._session.__getattribute__(item)
async def __aenter__(self):
self._session = await self._session.__aenter__()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
return await self._session.__aexit__(exc_type, exc_val, exc_tb)
@property
def _user_agent(self):
"""
Creates new random user agent, sets self.user agent and returns,
otherwise returns current user agent
"""
if self.user_agent is None or not self.persist_user_agent:
self.user_agent = _agent_factory.random
return self.user_agent
else:
return self.user_agent
async def _request(self, method, url, **kwargs):
await self.limiter.wait_limit(url)
if kwargs.get('headers', None) is None:
kwargs['headers'] = {}
kwargs['headers'].update({'user-agent': self._user_agent})
if self._pool is not None:
proxy = await self._pool.get(scheme=urlparse(url).scheme)
kwargs.update(proxy='http://%s:%d' % (proxy.host, proxy.port))
log.debug('Requesting: \n\tMETHOD=%s, \n\tURL=%s, \n\tKWARGS=%s',
method, url, str(kwargs))
return await self._original_request(method, url, **kwargs)
[docs] def request(self, method, url, **kwargs):
return _RequestContextManager(self._request(method, url, **kwargs))
[docs] async def fetch(self, method, url, *,
params=None, stream_func=None, **kwargs):
result = ''
count = 0
success = False
kwargs.update(params=params, timeout=kwargs.pop('timeout', 10))
while not success and count <= self.retries_to_error:
async with self.request(method, url, **kwargs) as resp:
try:
try:
resp.raise_for_status()
except asyncio.TimeoutError:
await asyncio.sleep(count * self.sleep_increment)
except aiohttp.ClientResponseError:
try:
error_text = await resp.text()
except Exception as e:
error_text = repr(e)
log.error('ERROR %d requesting %-20s - %20s',
resp.status, url, error_text[:20])
await self.on_error(resp.status)
else:
if stream_func is not None:
async for line in resp.content:
stream_func(line)
else:
result = await resp.text()
success = True
except Exception as e:
log.error("EXCEPTION requesting %-20s: %20s",
url, repr(e)[:20])
await asyncio.sleep(count * self.sleep_increment)
count += 1
if not success and self.error_on_failure:
raise FetchError
return result
[docs] async def on_error(self, error_code):
"""
This is called when a request returns a non-200 status code.
:param error_code: status code of http request
"""
return
[docs]class OAuth1Session(Session):
base_url = None
_patcher = OAuth1
def __init__(self, *args, **kwargs):
super(OAuth1Session, self).__init__(**kwargs)
self.patcher = self._patcher(*args)
async def _request(self, method, url, **kwargs):
if self.base_url is not None and not url.startswith('http'):
url = urljoin(self.base_url, url)
url, params, kwargs = await self.patcher.patch_request(
method, url, kwargs.pop('params', None) or {}, kwargs
)
return await super(OAuth1Session, self)._request(
method, url, params=params, **kwargs
)
[docs]class OAuth2Session(OAuth1Session):
_patcher = OAuth2
[docs]class FetchError(Exception):
"""
Exception raised when a fetch request fails despite retries
This is used for flow control of circuit-breaker logic in Scraper
"""