Source code for veryscrape.scrapers.twitter
from datetime import datetime
import json
from ..items import ItemGenerator
from ..process import remove_urls
from ..scrape import Scraper
from ..session import OAuth1Session
[docs]class TweetGen(ItemGenerator):
last_item = None
[docs] def process_text(self, text):
try:
self.last_item = json.loads(text.decode('utf-8'))
text = self.last_item['text']
return remove_urls(text)
except (ValueError, KeyError):
return
[docs] def process_time(self, text):
# Last json item is cached to avoid decoding more than once
# process_time is always called immediately after process_text
if self.last_item and self.last_item.get('timestamp_ms', None):
return datetime.fromtimestamp(
int(self.last_item['timestamp_ms']) / 1000)
[docs]class Twitter(Scraper):
source = 'twitter'
item_gen = TweetGen
session_class = TwitterSession
def __init__(self, key, secret, token, token_secret, *, proxy_pool=None):
super(Twitter, self).__init__(
key, secret, token, token_secret,
proxy_pool=proxy_pool
)
[docs] async def scrape(self, query, topic='', **kwargs):
await self.client.fetch(
'POST', 'statuses/filter.json',
stream_func=lambda l: self.queues[topic].put_nowait(l),
params={'language': 'en', 'track': query}, timeout=None, **kwargs
)