Created
May 31, 2012 20:23
-
-
Save swarbhanu/2845961 to your computer and use it in GitHub Desktop.
A sample script that shows how to set up email, sms and DetectionEvent notifications through User Notification Service
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from interface.objects import DeliveryMode, UserInfo, DetectionFilterConfig | |
from pyon.util.int_test import IonIntegrationTestCase | |
from pyon.util.unit_test import PyonTestCase | |
from pyon.public import IonObject, RT, PRED, Container | |
from pyon.core.exception import NotFound, BadRequest | |
from nose.plugins.attrib import attr | |
from pyon.util.log import log | |
from pyon.event.event import EventPublisher | |
import gevent | |
from mock import Mock, mocksignature | |
import gevent | |
from gevent.timeout import Timeout | |
#--------------------------------------------------------------------------------- | |
# Create email notification | |
#--------------------------------------------------------------------------------- | |
rr = pn.resource_registry | |
user = UserInfo(name = 'user') | |
user_id, _ = rr.create(user) | |
uns = pn.user_notification | |
notification_id = uns.create_email(event_type='ResourceLifecycleEvent', | |
event_subtype=None, | |
origin='Some_Resource_Agent_ID1', | |
origin_type=None, | |
user_id=user_id, | |
email='[email protected]', | |
mode = DeliveryMode.DIGEST, | |
message_header='message_header', | |
parser='parser', | |
period=1) | |
#------------------------------------------------------- | |
# publish an event for each notification to generate the emails | |
#------------------------------------------------------- | |
rle_publisher = EventPublisher("ResourceLifecycleEvent") | |
rle_publisher.publish_event(origin='Some_Resource_Agent_ID1', description="RLE test event for email") | |
#------------------------------------------------------- | |
# grab the email message generated | |
#------------------------------------------------------- | |
msg_tuple = uns.event_processors[notification_id].smtp_client.sentmail.get(timeout=4) | |
print msg_tuple | |
assert uns.event_processors[notification_id].smtp_client.sentmail.empty() == True | |
message = msg_tuple[2] | |
list_lines = message.split("\n") | |
#------------------------------------------------------- | |
# parse the message body | |
#------------------------------------------------------- | |
message_dict = {} | |
for line in list_lines: | |
key_item = line.split(": ") | |
if key_item[0] == 'Subject': | |
message_dict['Subject'] = key_item[1] + key_item[2] | |
else: | |
try: | |
message_dict[key_item[0]] = key_item[1] | |
except IndexError as exc: | |
# these IndexError exceptions happen only because the message sometimes | |
# has successive /r/n (i.e. new lines) and therefore, | |
# the indexing goes out of range. These new lines | |
# can just be ignored. So we ignore the exceptions here. | |
pass | |
#------------------------------------------------------- | |
# print the parsed message body of the email generated | |
#------------------------------------------------------- | |
print "message_dict: ", message_dict | |
#--------------------------------------------------------------------------------- | |
################################################################################# | |
#--------------------------------------------------------------------------------- | |
#--------------------------------------------------------------------------------- | |
# Create sms notification | |
#--------------------------------------------------------------------------------- | |
notification_id = uns.create_sms(event_type='ResourceLifecycleEvent', | |
event_subtype=None, | |
origin='Some_Resource_Agent_ID2', | |
origin_type=None, | |
user_id=user_id, | |
phone = '401-XXX-XXXX', | |
provider='T-Mobile', | |
message_header='message_header', | |
parser='parser', | |
) | |
#------------------------------------------------------- | |
# publish an event for each notification to generate the emails | |
#------------------------------------------------------- | |
rle_publisher = EventPublisher("ResourceLifecycleEvent") | |
rle_publisher.publish_event(origin='Some_Resource_Agent_ID2', description="RLE test event for sms") | |
#------------------------------------------------------- | |
# grab the email message generated | |
#------------------------------------------------------- | |
msg_tuple = uns.event_processors[notification_id].smtp_client.sentmail.get(timeout=4) | |
assert uns.event_processors[notification_id].smtp_client.sentmail.empty() == True | |
message = msg_tuple[2] | |
list_lines = message.split("\n") | |
#------------------------------------------------------- | |
# parse the message body | |
#------------------------------------------------------- | |
message_dict = {} | |
for line in list_lines: | |
key_item = line.split(": ") | |
if key_item[0] == 'Subject': | |
message_dict['Subject'] = key_item[1] + key_item[2] | |
else: | |
try: | |
message_dict[key_item[0]] = key_item[1] | |
except IndexError as exc: | |
# these IndexError exceptions happen only because the message sometimes | |
# has successive /r/n (i.e. new lines) and therefore, | |
# the indexing goes out of range. These new lines | |
# can just be ignored. So we ignore the exceptions here. | |
pass | |
#------------------------------------------------------- | |
# print the parsed message body of the email generated | |
#------------------------------------------------------- | |
print "message_dict: ", message_dict | |
#--------------------------------------------------------------------------------- | |
################################################################################# | |
#--------------------------------------------------------------------------------- | |
#--------------------------------------------------------------------------------- | |
# Create DetectionEvent Notification | |
#--------------------------------------------------------------------------------- | |
# Create detection notification | |
dfilt = DetectionFilterConfig() | |
dfilt.processing['condition'] = 5 | |
dfilt.processing['comparator'] = '>' | |
dfilt.processing['filter_field'] = 'voltage' | |
dfilt.delivery['message'] = 'I got my detection event!' | |
notification_id = uns.create_detection_filter(event_type='ExampleDetectableEvent', | |
event_subtype=None, | |
origin='Some_Resource_Agent_ID3', | |
origin_type=None, | |
user_id=user_id, | |
filter_config=dfilt | |
) | |
#--------------------------------------------------------------------------------- | |
# Create event subscription for resulting detection event | |
#--------------------------------------------------------------------------------- | |
# Create an email notification so that when the DetectionEventProcessor | |
# detects an event and fires its own output event, this will caught by an | |
# EmailEventProcessor and an email will be sent to the user | |
notification_id_2 = uns.create_email(event_type='DetectionEvent', | |
event_subtype=None, | |
origin='DetectionEventProcessor', | |
origin_type=None, | |
user_id=user_id, | |
email='[email protected]', | |
mode = DeliveryMode.UNFILTERED, | |
message_header='Detection event', | |
parser='parser', | |
period=1) | |
# Send event that is not detected | |
# publish an event for each notification to generate the emails | |
rle_publisher = EventPublisher("ExampleDetectableEvent") | |
# since the voltage field in this event is less than 5, it will not be detected | |
rle_publisher.publish_event(origin='Some_Resource_Agent_ID1', | |
description="RLE test event", | |
voltage = 3) | |
# this event does not generate a DetectionEvent | |
# to check this, we can do: | |
assert uns.event_processors[notification_id_2].smtp_client.sentmail.empty() == True | |
# Send Event that is detected | |
# publish an event for each notification to generate the emails | |
# since the voltage field in this event is greater than 5, it WILL be detected | |
rle_publisher = EventPublisher("ExampleDetectableEvent") | |
rle_publisher.publish_event(origin='Some_Resource_Agent_ID3', | |
description="RLE test event", | |
voltage = 10) | |
#------------------------------------------------------- | |
# make assertions | |
#------------------------------------------------------- | |
msg_tuple = uns.event_processors[notification_id_2].smtp_client.sentmail.get(timeout=4) | |
# The first event never triggered an email because the voltage was less than 5, the queue is now empty | |
assert uns.event_processors[notification_id_2].smtp_client.sentmail.empty() == True | |
print "The email address to which email is going: msg_tuple[1] = ", msg_tuple[1] | |
print "The email address from where the email is going: msg_tuple[1] = ", msg_tuple[0] | |
# parse the message body | |
message = msg_tuple[2] | |
list_lines = message.split("\n") | |
message_dict = {} | |
for line in list_lines: | |
key_item = line.split(": ") | |
if key_item[0] == 'Subject': | |
message_dict['Subject'] = key_item[1] + key_item[2] | |
else: | |
try: | |
message_dict[key_item[0]] = key_item[1] | |
except IndexError as exc: | |
# these IndexError exceptions happen only because the message sometimes | |
# has successive /r/n (i.e. new lines) and therefore, | |
# the indexing goes out of range. These new lines | |
# can just be ignored. So we ignore the exceptions here. | |
pass | |
print "The parsed message body of email generated by DetectionEventProcessor: ", message_dict | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment