Source code for veryscrape.wrappers

from concurrent.futures import ProcessPoolExecutor
from collections import defaultdict
from functools import partial
import asyncio
import heapq
import logging
import time

from .process import classify_text, clean_item

log = logging.getLogger(__name__)


# the only purpose of this is to allow a defaultdict
# created with this function as the default factory
# to be passed into a ProcessPoolExecutor
def _create_list_defaultdict():
    return defaultdict(list)  # pragma: nocover


[docs]class GeneratorWrapper: def __init__(self, item_gen, loop=None): self.loop = loop or asyncio.get_event_loop() self.cancelled = False self.items = item_gen self._gen = None self._q = asyncio.Queue() self._future = None
[docs] def cancel(self): if not self.cancelled: self.cancelled = True self.items.cancel() if self._future is not None and not self._future.done(): self._future.cancel()
[docs] async def get(self): try: return self._q.get_nowait() except asyncio.QueueEmpty: await asyncio.sleep(1e-3)
[docs] async def put(self, item): await self._q.put(item)
def __aiter__(self): self._gen = self.items.__aiter__() self._future = asyncio.ensure_future(self._stream()) return self async def __anext__(self): while True: if self.cancelled: raise StopAsyncIteration else: item = await self.get() if item is not None: return item async def _stream(self): while True: try: item = await self._gen.__anext__() await self.put(item) except StopAsyncIteration: return
[docs]class ItemMerger: def __init__(self, *item_gens): self.q = asyncio.Queue() self.item_gens = item_gens self.cancelled = False self._future = None def __aiter__(self): self._future = asyncio.ensure_future(asyncio.gather(*[ self._stream(item_gen) for item_gen in self.item_gens ])) return self async def __anext__(self): while not self.cancelled: try: return self.q.get_nowait() except asyncio.QueueEmpty: await asyncio.sleep(1e-3) raise StopAsyncIteration
[docs] def cancel(self): if not self.cancelled: self.cancelled = True for gen in self.item_gens: gen.cancel() self._future.cancel()
async def _stream(self, item_gen): async for item in item_gen: await self.q.put(item)
[docs]class ItemProcessor(GeneratorWrapper): # The default classification function is simple and fast # You can change this if you want more detailed classification # classify takes two arguments - data: any, topics_to_classify: dict # see veryscrape.process.classify_text for more details classify = classify_text def __init__(self, items, n_cores=1, loop=None): super(ItemProcessor, self).__init__(items, loop=loop) self.pool = ProcessPoolExecutor(max_workers=n_cores) self.loop.set_default_executor(self.pool) self.topics_by_source = defaultdict(_create_list_defaultdict)
[docs] def cancel(self): self.pool.shutdown(wait=True) super(ItemProcessor, self).cancel()
[docs] async def put(self, item): f = self.loop.run_in_executor(self.pool, clean_item, item) if item.topic == '__classify__': f.add_done_callback(self._classify_item) else: f.add_done_callback(self._enqueue_item) await asyncio.sleep(0)
[docs] def update_topics(self, **topics_by_source): """ Update local topics by source for use in classification of items :param topics_by_source: dict[list]: associated queries by topic """ self.topics_by_source.update(topics_by_source)
def _classify_item(self, future): if self._should_continue(future): item = future.result() f = self.loop.run_in_executor( self.pool, ItemProcessor.classify, item.content, self.topics_by_source[item.source] ) f.add_done_callback(partial( self._enqueue_classified_item, item=item)) def _enqueue_classified_item(self, future, item=None): if self._should_continue(future) and item is not None: item.topic = future.result() log.debug('Queuing cleaned and classified item: %s', str(item)) self._q.put_nowait(item) def _enqueue_item(self, future): if self._should_continue(future): result = future.result() log.debug('Queuing cleaned item: %s', str(result)) self._q.put_nowait(result) def _should_continue(self, future): return ( not self.cancelled and not future.cancelled() and not future.exception() )
[docs]class ItemSorter(GeneratorWrapper): def __init__(self, items, max_items=None, max_age=None, loop=None): super(ItemSorter, self).__init__(items, loop=loop) self.max_items = max_items or 0 self.max_age = max_age or 0 self._heap = []
[docs] async def put(self, item): heapq.heappush(self._heap, (item.created_at.timestamp(), item)) await asyncio.sleep(0)
[docs] async def get(self): if self._heap and ( len(self._heap) > self.max_items or time.time() - self._heap[0][0] > self.max_age ): return heapq.heappop(self._heap)[1] await asyncio.sleep(1e-3)