Created
June 27, 2018 16:55
-
-
Save Rurik/e55d0b4e11470e4a3c5b4218d2c291b2 to your computer and use it in GitHub Desktop.
Tracks a public Twitter List and posts updates to a given Slack channel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### Tracks a public Twitter List and posts updates to a given Slack channel | |
### Example: https://i.imgur.com/RMQB27N.png | |
import datetime | |
import time | |
import twitter | |
from slackclient import SlackClient | |
slack_bot_id = '<FILL OUT>' | |
slack_channel = '<FILL OUT>' | |
slack_client = SlackClient(slack_bot_id) | |
twitter_consumer_key = '<FILL OUT>' | |
twitter_consumer_secret = '<FILL OUT>' | |
twitter_access_token = '<FILL OUT>' | |
twitter_access_secret = '<FILL OUT>' | |
sinceID = None | |
counter = 0 | |
twitter_list = '<FILL OUT>' | |
list_owner = 'bbaskin' | |
def read_tweets(list_slug, sinceID): | |
if sinceID: | |
statuses = t.lists.statuses(owner_screen_name=list_owner, slug=list_slug, since_id=sinceID) | |
else: | |
statuses = t.lists.statuses(owner_screen_name=list_owner, slug=list_slug) | |
return statuses | |
def parse_tweets(statuses): | |
global sinceID | |
global counter | |
for tweet in statuses: | |
text = tweet['text'] | |
user = tweet['user']['screen_name'] | |
url = 'https://twitter.com/{}/status/{}'.format(user, tweet['id']) | |
post = '[{}] @{}: {} (Source: {} )'.format(counter, user, text, url) | |
slack_post_channel(post, slack_channel) | |
print('[{}] {}'.format(counter, url)) | |
counter += 1 | |
if statuses: | |
sinceID = statuses[0]['id'] | |
return True | |
return False | |
def slack_post_channel(msg, room): | |
slack_client.api_call('chat.postMessage', channel=room, text=msg) | |
if __name__ == '__main__': | |
global t | |
t = twitter.Twitter(auth=twitter.OAuth(twitter_access_token, twitter_access_secret, twitter_consumer_key, twitter_consumer_secret)) | |
if slack_client.rtm_connect(with_team_state=False): | |
slack_client.api_call('auth.test')['user_id'] | |
while True: | |
statuses = read_tweets(twitter_list, sinceID) | |
parse_tweets(statuses) | |
time.sleep(60) | |
else: | |
print('[*] Slack connection failed') | |
quit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment