Created
July 19, 2013 19:55
-
-
Save ozgurakan/6041927 to your computer and use it in GitHub Desktop.
Openstack Marconi Python Example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
username = 'my-user' | |
apikey = 'my-api-key' | |
url = 'https://test.my-marconi-server.com:443' | |
class Queue_Connection(object): | |
def __init__(self, username, apikey): | |
url = 'https://identity.api.rackspacecloud.com/v2.0/tokens' | |
payload = {"auth":{"RAX-KSKEY:apiKeyCredentials":{"username": username , "apiKey": apikey }}} | |
headers = {'Content-Type': 'application/json'} | |
r = requests.post(url, data=json.dumps(payload), headers=headers) | |
self.token = r.json()['access']['token']['id'] | |
self.headers = {'X-Auth-Token' : self.token, 'Content-Type': 'application/json', 'Client-ID': 'QClient1'} | |
def token(self): | |
return self.token | |
def get(self, url, payload=None): | |
r = requests.get(url, data=json.dumps(payload), headers=self.headers) | |
return [r.status_code, r.headers, r.content] | |
def post(self, url, payload=None): | |
r = requests.post(url, data=json.dumps(payload), headers=self.headers) | |
return [r.status_code, r.headers, r.content] | |
def put(self, url, payload=None): | |
r = requests.put(url, data=json.dumps(payload), headers=self.headers) | |
return [r.status_code, r.headers, r.content] | |
def delete(self, url, payload=None): | |
r = requests.delete(url, data=json.dumps(payload), headers=self.headers) | |
return [r.status_code, r.headers, r.content] | |
class Producer(Queue_Connection): | |
def __init__(self, url, username, apikey): | |
super(Producer, self).__init__(username, apikey) | |
self.base_url = url | |
def queue_name(): | |
def fget(self): | |
return self._queue_name | |
def fset(self, value): | |
self._queue_name = value | |
def fdel(self): | |
del self._queue_name | |
return locals() | |
queue_name = property(**queue_name()) | |
def queue_exists(self): | |
url = self.base_url + '/v1/queues/' + self.queue_name + '/stats' | |
if self.get(url)[0] == 200: | |
return True | |
return False | |
def create_queue(self, payload=None): | |
url = self.base_url + "/v1/queues/" + self.queue_name | |
res = self.put(url, payload) | |
if res[0] == 200: | |
print '%s created' % self.queue_name | |
elif res[0] == 204: | |
print 'A queue named %s is present' % self.queue_name | |
else: | |
print 'Problem with queue creation,' | |
def post_messages(self, payload): | |
url = self.base_url + '/v1/queues/' + self.queue_name + '/messages' | |
res = self.post(url, payload) | |
if res[0] == 201: | |
return json.loads(res[2])['resources'] | |
else: | |
print "Couldn't post messages" | |
class Consumer(Queue_Connection): | |
def __init__(self, url, username, apikey): | |
super(Consumer, self).__init__(username, apikey) | |
self.base_url = url | |
def claim_messages(self, payload, limit=1): | |
url = self.base_url + '/v1/queues/' + self.queue_name + '/claims?limit=' + str(limit) | |
res = self.post(url, payload) | |
if res[0] == 200: | |
return json.loads(res[2]) | |
else: | |
print "Couldn't claim messages" | |
def delete_message(self, url): | |
url = self.base_url + url | |
res = self.delete(url) | |
if res[0] == 204: | |
print "Message deleted" | |
""" create a Producer instance """ | |
pub = Producer(url, username, apikey) | |
pub.queue_name = 'testqueue' | |
if not pub.queue_exists(): | |
print "Creating queue", pub.queue_name | |
pub.create_queue({"metadata": "My Queue"}) | |
""" create and post two messages """ | |
data = [{"ttl": 60,"body": {"task":"one"}},{"ttl": 60,"body": {"task":"two"}}] | |
for message in pub.post_messages(data): | |
print "message: ", message | |
""" create a Consumer instance """ | |
con = Consumer(url, username, apikey) | |
""" define ttl and grace times for the claim """ | |
data = {"ttl":60, "grace":60} | |
con.queue_name = 'testqueue' | |
messages = con.claim_messages(data, 2) | |
for message in messages: | |
print "task : ", message['body']['task'] | |
print message['href'] | |
""" | |
do something with the messages | |
when done delete | |
""" | |
for message in messages: | |
con.delete_message(message['href']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment