Source code for veryscrape.items
from datetime import datetime
from hashlib import md5
import asyncio
import logging
import re
log = logging.getLogger(__name__)
[docs]class Item:
def __init__(self, content='', topic='', source='', created_at=None):
self.content = content
self.topic = topic
self.source = source
self.created_at = datetime.now() if created_at is None else created_at
def __str__(self):
return "Item({:5s}, {:7s}, {:50s})".format(
self.topic, self.source, re.sub(r'[\n\r\t]', '', str(self.content))
)
[docs]class ItemGenerator:
max_seen_items = 50000
def __init__(self, q, topic='', source=''):
self.q = q
self.topic = topic
self.source = source
self.seen = set()
self.cancelled = False
def __aiter__(self):
return self
async def __anext__(self):
text = None
created_at = None
while text is None:
if self.cancelled:
raise StopAsyncIteration
try:
unclean_text = self.q.get_nowait()
except asyncio.QueueEmpty:
await asyncio.sleep(1e-2)
continue
text = self.process_text(unclean_text)
created_at = self.process_time(unclean_text)
if not self.filter(text):
text = None
return Item(content=text, topic=self.topic,
source=self.source, created_at=created_at)
[docs] def process_text(self, text):
return text
[docs] def process_time(self, text):
return
[docs] def filter(self, text):
if text is None:
return False
hsh = md5(str(text).encode()).hexdigest()
if hsh not in self.seen:
self.seen.add(hsh)
if len(self.seen) >= self.max_seen_items:
self.seen.pop()
return True
log.debug('Filtering already seen item: %s',
text[:50].replace('\n', ''))
return False
[docs] def cancel(self):
self.cancelled = True