Skip to content

Instantly share code, notes, and snippets.

@Carpintonto
Last active May 18, 2021 23:27
Show Gist options
  • Save Carpintonto/8d61267f1861befab37c572a9b4f5421 to your computer and use it in GitHub Desktop.
Save Carpintonto/8d61267f1861befab37c572a9b4f5421 to your computer and use it in GitHub Desktop.
mashup of 3 twitter stream routines
# Twitter Stream Downloader
# https://gist.github.com/bonzanini/af0463b927433c73784d
#
# Carmen library for geolocating tweets
# https://github.com/mdredze/carmen-python
#
# From Tweets to GeoJSON
# https://marcobonzanini.com/2015/06/16/mining-twitter-data-with-python-and-js-part-7-geolocation-and-interactive-maps/
import tweepy
from tweepy import Stream
from tweepy import OAuthHandler
from tweepy.streaming import StreamListener
import time
import argparse
import string
import config
import json
import carmen
from carmen.location import LocationEncoder
import io
import collections
# Variables for statistics.
city_found = county_found = state_found = country_found = 0
has_place = has_coordinates = has_geo = has_profile_location = 0
resolution_method_counts = collections.defaultdict(int)
skipped_tweets = resolved_tweets = total_tweets = 0
#global geojson constructor
geo_data = {
"type": "FeatureCollection",
"features": []
}
def get_parser():
"""Get parser for command line arguments."""
parser = argparse.ArgumentParser(description="Twitter Downloader")
parser.add_argument("-q",
"--query",
dest="query",
help="Query/Filter",
default='-')
parser.add_argument("-d",
"--data-dir",
dest="data_dir",
help="Output/Data Directory")
return parser
class MyListener(StreamListener):
"""Custom StreamListener for streaming data."""
def __init__(self, data_dir, query):
query_fname = format_filename(query)
self.outfile = "%s/stream_%s.json" % (data_dir, query_fname)
def on_data(self, data):
global geo_data
json_data = json.loads(data)
geo_data['features'].append(get_location(json_data))
try:
with open(self.outfile, 'w') as f:
f.write(json.dumps(geo_data))
# f.write("\n")
print (f"in on_data: resolved tweets {resolved_tweets}, total tweets {total_tweets}")
print ("\n")
return True
except BaseException as e:
print("Error on_data: %s" % str(e))
time.sleep(5)
return True
def on_error(self, status):
print(status)
return True
def format_filename(fname):
"""Convert file name into a safe string.
Arguments:
fname -- the file name to convert
Return:
String -- converted file name
"""
return ''.join(convert_valid(one_char) for one_char in fname)
def convert_valid(one_char):
"""Convert a character into '_' if invalid.
Arguments:
one_char -- the char to convert
Return:
Character -- converted char
"""
valid_chars = "-_.%s%s" % (string.ascii_letters, string.digits)
if one_char in valid_chars:
return one_char
else:
return '_'
def carmen_res(data):
global city_found, county_found, state_found, country_found
global has_place, has_coordinates, has_geo, has_profile_location
global resolution_method_counts, resolved_tweets, total_tweets
# Collect statistics on the tweet.
if data.get('place'):
has_place += 1
if data.get('coordinates'):
has_coordinates += 1
if data.get('geo'):
has_geo += 1
if data.get('user', {}).get('location', ''):
has_profile_location += 1
# Perform the actual resolution
resolution = resolver.resolve_tweet(data)
if resolution:
location = resolution[1]
data['location'] = location
# More statistics.
resolution_method_counts[location.resolution_method] += 1
if location.city:
city_found += 1
elif location.county:
county_found += 1
elif location.state:
state_found += 1
elif location.country:
country_found += 1
resolved_tweets += 1
location = json.dumps(data, cls=LocationEncoder).encode()
total_tweets += 1
print (f"in carmen_res: resolved tweets {resolved_tweets}, total tweets {total_tweets}")
return location.decode()
def get_location(data):
geo_json_feature = {}
with io.StringIO(carmen_res(data)) as f:
# geo_data = {
# "type": "FeatureCollection",
# "features": []
# }
for line in f:
tweet = json.loads(line)
if tweet.get('location'):
if tweet['coordinates']:
geo_json_feature = {
"type": "Feature",
"geometry": tweet['coordinates'],
"properties": {
"text": tweet['text'],
"created_at": tweet['created_at'],
"carmen": tweet['location'],
"language": tweet['lang'],
"profile" : tweet['user']['location']
}
}
else:
geo_json_feature = {
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [
tweet.get('location', {}).get('longitude'),
tweet.get('location', {}).get('latitude')
]
},
"properties": {
"text": tweet['text'],
"created_at": tweet['created_at'],
"carmen": tweet['location'],
"language": tweet['lang'],
"profile" : tweet['user']['location']
}
}
# geo_data['features'].append(geo_json_feature)
return geo_json_feature
if __name__ == '__main__':
parser = get_parser()
args = parser.parse_args()
auth = OAuthHandler(config.consumer_key, config.consumer_secret)
auth.set_access_token(config.access_token, config.access_secret)
api = tweepy.API(auth)
resolver = carmen.get_resolver()
resolver.load_locations()
twitter_stream = Stream(auth, MyListener(args.data_dir, args.query))
twitter_stream.filter(track=[args.query])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment