Created
November 4, 2010 07:44
-
-
Save yoshiori/662214 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import time, random, urllib, urllib2, cgi, hmac, hashlib, commands, simplejson | |
from pit import Pit | |
class MyTwitter(object): | |
request_token_url = 'http://twitter.com/oauth/request_token' | |
access_token_url = 'http://twitter.com/oauth/access_token' | |
_URL_BASE = 'http://api.twitter.com/' | |
def __init__(self): | |
twitter_config = Pit.get('twitter.com', | |
{'require' : | |
{'consumer_key':'Your twitter Consumer key', | |
'consumer_secret':'Your twitter consumer_secret'}}) | |
self.consumer_key = twitter_config['consumer_key'] | |
self.consumer_secret = twitter_config['consumer_secret'] | |
self.oauth_token = twitter_config.get('oauth_token') | |
self.oauth_token_secret = twitter_config.get('oauth_token_secret') | |
if not self.oauth_token or not self.oauth_token_secret: | |
self.get_oauth_token() | |
def get_init_params(self): | |
params = { | |
"oauth_consumer_key": self.consumer_key, | |
"oauth_signature_method": "HMAC-SHA1", | |
"oauth_timestamp": str(int(time.time())), | |
"oauth_nonce": str(random.getrandbits(64)), | |
"oauth_version": "1.0" | |
} | |
if self.oauth_token: | |
params['oauth_token'] = self.oauth_token | |
return params | |
def encode(self, text): | |
return urllib.quote(str(text), '~') | |
def _create_param_str(self,params,sep='&'): | |
for k,v in params.items(): | |
if isinstance(v, unicode): | |
params[k] = v.encode('utf8') | |
return sep.join(['%s=%s' % (self.encode(key),self.encode(params[key])) | |
for key in sorted(params)]) | |
def _get_signature(self, method, url, init_params, user_params = None): | |
_params = {} | |
_params.update(init_params) | |
if user_params: | |
_params.update(user_params) | |
params_str = self._create_param_str(_params) | |
message = "&".join([method, self.encode(url), self.encode(params_str)]) | |
key = "%s&%s" % (self.consumer_secret, self.oauth_token_secret if self.oauth_token_secret else '') | |
signature = hmac.new(key, message, hashlib.sha1) | |
return signature.digest().encode("base64").strip() | |
def get_oauth_token(self): | |
params = self.get_init_params() | |
params['oauth_signature'] = self._get_signature('GET', self.request_token_url, params) | |
opener = urllib2.build_opener() | |
_url = self.request_token_url + '?' + urllib.urlencode(params) | |
result = opener.open(_url).read() | |
data = cgi.parse_qs(result) | |
_url = 'http://twitter.com/oauth/authorize?oauth_token='+data['oauth_token'][0] | |
print 'open %s' % _url | |
commands.getoutput('%s %s' % ('xdg-open',_url)) | |
self.oauth_token = data['oauth_token'][0] | |
self.oauth_token_secret = data['oauth_token_secret'][0] | |
params = self.get_init_params() | |
params['oauth_verifier'] = raw_input('PID > ') | |
params['oauth_signature'] = self._get_signature('GET', self.access_token_url,params) | |
opener = urllib2.build_opener() | |
opener.addheaders = [('Authorization','OAuth')] | |
_url = self.access_token_url + '?' + urllib.urlencode(params) | |
result = opener.open(_url).read() | |
data = cgi.parse_qs(result) | |
self.oauth_token = data['oauth_token'][0] | |
self.oauth_token_secret = data['oauth_token_secret'][0] | |
twitter_config = Pit.get('twitter.com') | |
twitter_config['oauth_token'] = self.oauth_token | |
twitter_config['oauth_token_secret'] = self.oauth_token_secret | |
Pit.set('twitter.com',{'data':twitter_config}) | |
def get_auth_opener(self, methon, url, params=None): | |
base_params = self.get_init_params() | |
base_params['oauth_signature'] = self._get_signature(methon, url, base_params, params) | |
opener = urllib2.build_opener() | |
opener.addheaders = [('Authorization','OAuth %s'% self._create_param_str(base_params,','))] | |
return opener | |
def auth_get(self, url, params=None): | |
_url = url | |
if params: | |
_url += '?' + urllib.urlencode(params) | |
result = self.get_auth_opener('GET', url,params).open(_url) | |
return result | |
def auth_post(self, url, params={}): | |
result = self.get_auth_opener('POST', url,params).open(url, urllib.urlencode(params)) | |
return result | |
def get_friend_timeline(self): | |
url = self._URL_BASE + 'statuses/friends_timeline.json' | |
r = self.auth_get(url) | |
data = simplejson.loads(r.read()) | |
return data | |
def get_friends(self,cursor = -1): | |
params = {'cursor':cursor} | |
url = self._URL_BASE + 'statuses/friends.json' | |
r = self.auth_get(url,params) | |
data = simplejson.loads(r.read()) | |
return data | |
def destroy_friend(self,id): | |
url = self._URL_BASE + ('friendships/destroy/%s.json' % id) | |
r = self.auth_post(url) | |
data = simplejson.loads(r.read()) | |
return data | |
def create_friend(self,id): | |
url = self._URL_BASE + ('friendships/create/%s.json' % id) | |
r = self.auth_post(url,) | |
data = simplejson.loads(r.read()) | |
return data | |
def show(self,screen_name): | |
params = {'screen_name':screen_name} | |
url = self._URL_BASE + 'users/show.json' | |
r = self.auth_get(url,params) | |
data = simplejson.loads(r.read()) | |
return data | |
def get_list(self,screen_name ,cursor = -1): | |
params = {'cursor':cursor} | |
url = self._URL_BASE + '1/%s/lists.json' % screen_name | |
r = self.auth_get(url,params) | |
data = simplejson.loads(r.read()) | |
return data | |
def get_list_menbers(self, user, list_name,cursor = -1): | |
url = self._URL_BASE + '1/%s/%s/members.json' % (user,list_name) | |
r = self.auth_get(url,{'cursor':cursor}) | |
data = simplejson.loads(r.read()) | |
return data | |
def get_list_menbers_all(self, user, list_name): | |
cursor = -1 | |
users = [] | |
while cursor: | |
data = self.get_list_menbers(user,list_name, cursor) | |
cursor = data['next_cursor'] | |
users += data['users'] | |
return users | |
def get_list_all(self,screen_name): | |
cursor = -1 | |
lists = [] | |
while cursor: | |
data = self.get_list(screen_name, cursor) | |
cursor = data['next_cursor'] | |
lists += data['lists'] | |
return lists | |
def get_friends_ids(self,screen_name): | |
param = {'screen_name':screen_name} | |
url = self._URL_BASE + 'friends/ids.json' | |
r = self.auth_get(url,param) | |
data = simplejson.loads(r.read()) | |
return data | |
def update(self,status): | |
if isinstance(status, unicode): | |
status = status.encode('utf8') | |
param = {'status':status} | |
url = self._URL_BASE + 'statuses/update.json' | |
r = self.auth_post(url,param) | |
data = simplejson.loads(r.read()) | |
return data | |
def get_replies(self): | |
url = self._URL_BASE + 'statuses/replies.json' | |
r = self.auth_get(url) | |
data = simplejson.loads(r.read()) | |
return data | |
if __name__ == "__main__": | |
import readline | |
readline.parse_and_bind("tab: complete") | |
twitter = MyTwitter() | |
friends = set() | |
def complete(text, status): | |
results = [x for x in friends if x.startswith(text)] + [None] | |
return results[status] | |
readline.set_completer(complete) | |
while True: | |
input = raw_input('> ') | |
input = unicode(input,'utf-8') | |
if input: | |
twitter.update(input) | |
print 'update : ' + input | |
else: | |
for data in reversed(twitter.get_replies()): | |
print '%-12s : %s' % (data['user']['screen_name'] ,data['text']) | |
friends.add(data['user']['screen_name']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment