Last active
May 18, 2021 23:27
-
-
Save Carpintonto/8d61267f1861befab37c572a9b4f5421 to your computer and use it in GitHub Desktop.
mashup of 3 twitter stream routines
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Twitter Stream Downloader | |
# https://gist.github.com/bonzanini/af0463b927433c73784d | |
# | |
# Carmen library for geolocating tweets | |
# https://github.com/mdredze/carmen-python | |
# | |
# From Tweets to GeoJSON | |
# https://marcobonzanini.com/2015/06/16/mining-twitter-data-with-python-and-js-part-7-geolocation-and-interactive-maps/ | |
import tweepy | |
from tweepy import Stream | |
from tweepy import OAuthHandler | |
from tweepy.streaming import StreamListener | |
import time | |
import argparse | |
import string | |
import config | |
import json | |
import carmen | |
from carmen.location import LocationEncoder | |
import io | |
import collections | |
# Variables for statistics. | |
city_found = county_found = state_found = country_found = 0 | |
has_place = has_coordinates = has_geo = has_profile_location = 0 | |
resolution_method_counts = collections.defaultdict(int) | |
skipped_tweets = resolved_tweets = total_tweets = 0 | |
#global geojson constructor | |
geo_data = { | |
"type": "FeatureCollection", | |
"features": [] | |
} | |
def get_parser(): | |
"""Get parser for command line arguments.""" | |
parser = argparse.ArgumentParser(description="Twitter Downloader") | |
parser.add_argument("-q", | |
"--query", | |
dest="query", | |
help="Query/Filter", | |
default='-') | |
parser.add_argument("-d", | |
"--data-dir", | |
dest="data_dir", | |
help="Output/Data Directory") | |
return parser | |
class MyListener(StreamListener): | |
"""Custom StreamListener for streaming data.""" | |
def __init__(self, data_dir, query): | |
query_fname = format_filename(query) | |
self.outfile = "%s/stream_%s.json" % (data_dir, query_fname) | |
def on_data(self, data): | |
global geo_data | |
json_data = json.loads(data) | |
geo_data['features'].append(get_location(json_data)) | |
try: | |
with open(self.outfile, 'w') as f: | |
f.write(json.dumps(geo_data)) | |
# f.write("\n") | |
print (f"in on_data: resolved tweets {resolved_tweets}, total tweets {total_tweets}") | |
print ("\n") | |
return True | |
except BaseException as e: | |
print("Error on_data: %s" % str(e)) | |
time.sleep(5) | |
return True | |
def on_error(self, status): | |
print(status) | |
return True | |
def format_filename(fname): | |
"""Convert file name into a safe string. | |
Arguments: | |
fname -- the file name to convert | |
Return: | |
String -- converted file name | |
""" | |
return ''.join(convert_valid(one_char) for one_char in fname) | |
def convert_valid(one_char): | |
"""Convert a character into '_' if invalid. | |
Arguments: | |
one_char -- the char to convert | |
Return: | |
Character -- converted char | |
""" | |
valid_chars = "-_.%s%s" % (string.ascii_letters, string.digits) | |
if one_char in valid_chars: | |
return one_char | |
else: | |
return '_' | |
def carmen_res(data): | |
global city_found, county_found, state_found, country_found | |
global has_place, has_coordinates, has_geo, has_profile_location | |
global resolution_method_counts, resolved_tweets, total_tweets | |
# Collect statistics on the tweet. | |
if data.get('place'): | |
has_place += 1 | |
if data.get('coordinates'): | |
has_coordinates += 1 | |
if data.get('geo'): | |
has_geo += 1 | |
if data.get('user', {}).get('location', ''): | |
has_profile_location += 1 | |
# Perform the actual resolution | |
resolution = resolver.resolve_tweet(data) | |
if resolution: | |
location = resolution[1] | |
data['location'] = location | |
# More statistics. | |
resolution_method_counts[location.resolution_method] += 1 | |
if location.city: | |
city_found += 1 | |
elif location.county: | |
county_found += 1 | |
elif location.state: | |
state_found += 1 | |
elif location.country: | |
country_found += 1 | |
resolved_tweets += 1 | |
location = json.dumps(data, cls=LocationEncoder).encode() | |
total_tweets += 1 | |
print (f"in carmen_res: resolved tweets {resolved_tweets}, total tweets {total_tweets}") | |
return location.decode() | |
def get_location(data): | |
geo_json_feature = {} | |
with io.StringIO(carmen_res(data)) as f: | |
# geo_data = { | |
# "type": "FeatureCollection", | |
# "features": [] | |
# } | |
for line in f: | |
tweet = json.loads(line) | |
if tweet.get('location'): | |
if tweet['coordinates']: | |
geo_json_feature = { | |
"type": "Feature", | |
"geometry": tweet['coordinates'], | |
"properties": { | |
"text": tweet['text'], | |
"created_at": tweet['created_at'], | |
"carmen": tweet['location'], | |
"language": tweet['lang'], | |
"profile" : tweet['user']['location'] | |
} | |
} | |
else: | |
geo_json_feature = { | |
"type": "Feature", | |
"geometry": { | |
"type": "Point", | |
"coordinates": [ | |
tweet.get('location', {}).get('longitude'), | |
tweet.get('location', {}).get('latitude') | |
] | |
}, | |
"properties": { | |
"text": tweet['text'], | |
"created_at": tweet['created_at'], | |
"carmen": tweet['location'], | |
"language": tweet['lang'], | |
"profile" : tweet['user']['location'] | |
} | |
} | |
# geo_data['features'].append(geo_json_feature) | |
return geo_json_feature | |
if __name__ == '__main__': | |
parser = get_parser() | |
args = parser.parse_args() | |
auth = OAuthHandler(config.consumer_key, config.consumer_secret) | |
auth.set_access_token(config.access_token, config.access_secret) | |
api = tweepy.API(auth) | |
resolver = carmen.get_resolver() | |
resolver.load_locations() | |
twitter_stream = Stream(auth, MyListener(args.data_dir, args.query)) | |
twitter_stream.filter(track=[args.query]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment