Source code for veryscrape.veryscrape

from collections import defaultdict
from functools import partial
from multiprocessing import cpu_count
import asyncio
import json
import logging
import signal
import threading

from proxybroker import Broker, ProxyPool

from .wrappers import ItemMerger, ItemProcessor, ItemSorter

log = logging.getLogger('veryscrape')


_mutex = threading.Lock()
_scrapers = {}
_classifying_scrapers = {}


[docs]def register(name, scraper, classify=False): """ Register scraper class so it is created automatically from keys in VeryScrape.config when VeryScrape is run :param name: name of data source (e.g. 'twitter') :param scraper: scraper class :param classify: whether scraper needs to classify text topic afterwards """ with _mutex: _scrapers[name] = scraper if classify: _classifying_scrapers[name] = scraper
[docs]def unregister(name): """ Unregister scraper class registered with 'veryscrape.register' :param name: name of data source (e.g. 'twitter') """ with _mutex: if name == '*': _scrapers.clear() _classifying_scrapers.clear() else: del _scrapers[name] if name in _classifying_scrapers: del _classifying_scrapers[name]
[docs]class VeryScrape: """ Many API, much data, VeryScrape! :param q: Queue to output data gathered from scraping :param loop: Event loop to run the scraping """ def __init__(self, q, loop=None): # declaring items in __init__ allows the items to # be cancelled from the close method of this class self.items = None self.loop = loop or asyncio.get_event_loop() self.queue = q self.using_proxies = False proxy_queue = asyncio.Queue(loop=self.loop) self.proxies = ProxyPool(proxy_queue) self.proxy_broker = Broker( queue=proxy_queue, loop=self.loop ) self.kill_event = asyncio.Event(loop=self.loop) self.loop.add_signal_handler(signal.SIGINT, self.close)
[docs] async def scrape(self, config, *, n_cores=1, max_items=0, max_age=None): """ Scrape, process and organize data on the web based on a scrape config :param config: dict: scrape configuration This is a map of scrape sources to data gathering information. The basic scheme is as follows (see examples for real example): { "source1": { "first_authentication|split|by|pipe": { "topic1": ["query1", "query2"], "topic2": ["query3", "query4"] }, "second_authentication|split|by|pipe": { "topic3": ["query5", "query6"] } }, "source2": ... } :param n_cores: number of cores to use for processing data Set to 0 to use all available cores. Set to -1 to disable processing. :param max_items: :param max_age: """ if isinstance(config, str): with open(config) as f: config = json.load(f) assert isinstance(config, dict), \ 'Configuration must be a dict or a path to a json config file' try: scrapers, streams, topics = \ self.create_all_scrapers_and_streams(config) except Exception as e: raise ValueError().with_traceback(e.__traceback__) self.items = ItemMerger(*[stream() for stream in streams]) if n_cores > -1: self.items = ItemProcessor(self.items, # one core is needed to run event loop n_cores=n_cores or cpu_count() - 1, loop=self.loop) # Update topics of ItemProcessor for classifying self.items.update_topics(**topics) if max_items > 0 or max_age is not None: self.items = ItemSorter(self.items, max_items=max_items, max_age=max_age) # Start finding proxies if any scrapers use proxies if self.using_proxies: asyncio.ensure_future(self._update_proxies()) async for item in self.items: await self.queue.put(item) await asyncio.gather(*[s.close() for s in scrapers])
[docs] def close(self): self.kill_event.set() if self.items is not None: self.items.cancel() self.proxy_broker.stop()
[docs] def create_all_scrapers_and_streams(self, config): """ Creates all scrapers and stream functions associated with a config A scraper is a class inheriting from scrape.Scraper A stream function is a function returning an items.ItemGenerator :param config: scrape configuration :return: list of scrapers, list of stream functions """ scrapers = [] streams = [] topics_by_source = defaultdict(dict) for source, auth_topics in config.copy().items(): for auth, metadata in auth_topics.items(): args, kwargs = self._create_args_kwargs(auth, metadata) scraper, _streams = self._create_single_scraper_and_streams( metadata, _scrapers[source], args, kwargs ) topics_by_source[source].update(metadata) scrapers.append(scraper) streams.extend(_streams) return scrapers, streams, topics_by_source
def _create_args_kwargs(self, auth, metadata): args = [] if auth: args.extend(auth.split('|')) kwargs = {'proxy_pool': None} kwargs.update(metadata.pop('kwargs', {})) use_proxies = metadata.pop('use_proxies', False) if use_proxies: self.using_proxies = True kwargs.update(proxy_pool=self.proxies) return args, kwargs @staticmethod def _create_single_scraper_and_streams(topics, klass, args, kwargs): streams = [] scraper = klass(*args, **kwargs) for topic, queries in topics.items(): if klass in _classifying_scrapers.values(): topic = '__classify__' for q in queries: streams.append(partial(scraper.stream, q, topic=topic)) return scraper, streams async def _update_proxies(self): while not self.kill_event.is_set(): await self.proxy_broker.find( strict=True, types=['HTTP', 'HTTPS'], judges=[ 'http://httpbin.org/get?show_env', 'https://httpbin.org/get?show_env' ] ) # default proxy-broker sleep cycle for continuous find await asyncio.sleep(180) # pragma: nocover