#!/usr/bin/env python # -*- coding: utf-8 -*- import time, random, urllib, urllib2, cgi, hmac, hashlib, commands, simplejson from pit import Pit class MyTwitter(object): request_token_url = 'http://twitter.com/oauth/request_token' access_token_url = 'http://twitter.com/oauth/access_token' _URL_BASE = 'http://api.twitter.com/' def __init__(self): twitter_config = Pit.get('twitter.com', {'require' : {'consumer_key':'Your twitter Consumer key', 'consumer_secret':'Your twitter consumer_secret'}}) self.consumer_key = twitter_config['consumer_key'] self.consumer_secret = twitter_config['consumer_secret'] self.oauth_token = twitter_config.get('oauth_token') self.oauth_token_secret = twitter_config.get('oauth_token_secret') if not self.oauth_token or not self.oauth_token_secret: self.get_oauth_token() def get_init_params(self): params = { "oauth_consumer_key": self.consumer_key, "oauth_signature_method": "HMAC-SHA1", "oauth_timestamp": str(int(time.time())), "oauth_nonce": str(random.getrandbits(64)), "oauth_version": "1.0" } if self.oauth_token: params['oauth_token'] = self.oauth_token return params def encode(self, text): return urllib.quote(str(text), '~') def _create_param_str(self,params,sep='&'): for k,v in params.items(): if isinstance(v, unicode): params[k] = v.encode('utf8') return sep.join(['%s=%s' % (self.encode(key),self.encode(params[key])) for key in sorted(params)]) def _get_signature(self, method, url, init_params, user_params = None): _params = {} _params.update(init_params) if user_params: _params.update(user_params) params_str = self._create_param_str(_params) message = "&".join([method, self.encode(url), self.encode(params_str)]) key = "%s&%s" % (self.consumer_secret, self.oauth_token_secret if self.oauth_token_secret else '') signature = hmac.new(key, message, hashlib.sha1) return signature.digest().encode("base64").strip() def get_oauth_token(self): params = self.get_init_params() params['oauth_signature'] = self._get_signature('GET', self.request_token_url, params) opener = urllib2.build_opener() _url = self.request_token_url + '?' + urllib.urlencode(params) result = opener.open(_url).read() data = cgi.parse_qs(result) _url = 'http://twitter.com/oauth/authorize?oauth_token='+data['oauth_token'][0] print 'open %s' % _url commands.getoutput('%s %s' % ('xdg-open',_url)) self.oauth_token = data['oauth_token'][0] self.oauth_token_secret = data['oauth_token_secret'][0] params = self.get_init_params() params['oauth_verifier'] = raw_input('PID > ') params['oauth_signature'] = self._get_signature('GET', self.access_token_url,params) opener = urllib2.build_opener() opener.addheaders = [('Authorization','OAuth')] _url = self.access_token_url + '?' + urllib.urlencode(params) result = opener.open(_url).read() data = cgi.parse_qs(result) self.oauth_token = data['oauth_token'][0] self.oauth_token_secret = data['oauth_token_secret'][0] twitter_config = Pit.get('twitter.com') twitter_config['oauth_token'] = self.oauth_token twitter_config['oauth_token_secret'] = self.oauth_token_secret Pit.set('twitter.com',{'data':twitter_config}) def get_auth_opener(self, methon, url, params=None): base_params = self.get_init_params() base_params['oauth_signature'] = self._get_signature(methon, url, base_params, params) opener = urllib2.build_opener() opener.addheaders = [('Authorization','OAuth %s'% self._create_param_str(base_params,','))] return opener def auth_get(self, url, params=None): _url = url if params: _url += '?' + urllib.urlencode(params) result = self.get_auth_opener('GET', url,params).open(_url) return result def auth_post(self, url, params={}): result = self.get_auth_opener('POST', url,params).open(url, urllib.urlencode(params)) return result def get_friend_timeline(self): url = self._URL_BASE + 'statuses/friends_timeline.json' r = self.auth_get(url) data = simplejson.loads(r.read()) return data def get_friends(self,cursor = -1): params = {'cursor':cursor} url = self._URL_BASE + 'statuses/friends.json' r = self.auth_get(url,params) data = simplejson.loads(r.read()) return data def destroy_friend(self,id): url = self._URL_BASE + ('friendships/destroy/%s.json' % id) r = self.auth_post(url) data = simplejson.loads(r.read()) return data def create_friend(self,id): url = self._URL_BASE + ('friendships/create/%s.json' % id) r = self.auth_post(url,) data = simplejson.loads(r.read()) return data def show(self,screen_name): params = {'screen_name':screen_name} url = self._URL_BASE + 'users/show.json' r = self.auth_get(url,params) data = simplejson.loads(r.read()) return data def get_list(self,screen_name ,cursor = -1): params = {'cursor':cursor} url = self._URL_BASE + '1/%s/lists.json' % screen_name r = self.auth_get(url,params) data = simplejson.loads(r.read()) return data def get_list_menbers(self, user, list_name,cursor = -1): url = self._URL_BASE + '1/%s/%s/members.json' % (user,list_name) r = self.auth_get(url,{'cursor':cursor}) data = simplejson.loads(r.read()) return data def get_list_menbers_all(self, user, list_name): cursor = -1 users = [] while cursor: data = self.get_list_menbers(user,list_name, cursor) cursor = data['next_cursor'] users += data['users'] return users def get_list_all(self,screen_name): cursor = -1 lists = [] while cursor: data = self.get_list(screen_name, cursor) cursor = data['next_cursor'] lists += data['lists'] return lists def get_friends_ids(self,screen_name): param = {'screen_name':screen_name} url = self._URL_BASE + 'friends/ids.json' r = self.auth_get(url,param) data = simplejson.loads(r.read()) return data def update(self,status): if isinstance(status, unicode): status = status.encode('utf8') param = {'status':status} url = self._URL_BASE + 'statuses/update.json' r = self.auth_post(url,param) data = simplejson.loads(r.read()) return data def get_replies(self): url = self._URL_BASE + 'statuses/replies.json' r = self.auth_get(url) data = simplejson.loads(r.read()) return data if __name__ == "__main__": import readline readline.parse_and_bind("tab: complete") twitter = MyTwitter() friends = set() def complete(text, status): results = [x for x in friends if x.startswith(text)] + [None] return results[status] readline.set_completer(complete) while True: input = raw_input('> ') input = unicode(input,'utf-8') if input: twitter.update(input) print 'update : ' + input else: for data in reversed(twitter.get_replies()): print '%-12s : %s' % (data['user']['screen_name'] ,data['text']) friends.add(data['user']['screen_name'])