Created
February 1, 2012 23:22
-
-
Save swarbhanu/1720114 to your computer and use it in GitHub Desktop.
Integration test for ingestion workers
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
@file ion/services/dm/ingestion/test/test_ingestion.py | |
@author Swarbhanu Chatterjee | |
@test ion.services.dm.ingestion.ingestion_management_service test suite to cover all ingestion mgmt service code | |
''' | |
import gevent | |
from mock import Mock, sentinel, patch | |
from pyon.util.unit_test import PyonTestCase | |
from pyon.util.int_test import IonIntegrationTestCase | |
from ion.services.dm.ingestion.ingestion_management_service import IngestionManagementService, IngestionManagementServiceException | |
from nose.plugins.attrib import attr | |
from pyon.core.exception import NotFound, BadRequest | |
import unittest | |
from pyon.public import CFG, IonObject, log, RT, PRED, LCS, StreamPublisher, StreamSubscriber | |
from pyon.public import Container | |
from pyon.public import Container | |
from pyon.util.containers import DotDict | |
from interface.objects import ProcessDefinition, StreamQuery, ExchangeQuery | |
from interface.services.icontainer_agent import ContainerAgentClient | |
from interface.services.dm.iingestion_management_service import IngestionManagementServiceClient | |
from interface.services.dm.ipubsub_management_service import PubsubManagementServiceClient | |
from interface.services.dm.itransform_management_service import TransformManagementServiceClient | |
from interface.services.coi.iresource_registry_service import ResourceRegistryServiceClient | |
from interface.objects import BlogPost, BlogComment | |
from pyon.datastore.couchdb.couchdb_dm_datastore import CouchDB_DM_DataStore | |
@attr('INT', group='dm') | |
class IngestionManagementServiceIntTest(IonIntegrationTestCase): | |
def setUp(self): | |
# set up the container for testing | |
#------------------------------------------------------------------------ | |
# Container | |
#---------------------------------------------------------------------- | |
self._start_container() | |
self.cc = ContainerAgentClient(node=self.container.node,name=self.container.name) | |
self.cc.start_rel_from_url('res/deploy/r2dm.yml') | |
#------------------------------------------------------------------------ | |
# Service clients | |
#---------------------------------------------------------------------- | |
self.pubsub_cli = PubsubManagementServiceClient(node=self.cc.node) | |
self.tms_cli = TransformManagementServiceClient(node=self.cc.node) | |
self.ingestion_cli = IngestionManagementServiceClient(node=self.cc.node) | |
self.rr_cli = ResourceRegistryServiceClient(node=self.cc.node) | |
#------------------------------------------------------------------------ | |
# Configuration parameters | |
#---------------------------------------------------------------------- | |
self.exchange_point_id = 'science_data' | |
self.datastore_name = 'dm_datastore' | |
self.number_of_workers = 2 | |
self.hdf_storage = {'root_path': '', 'filesystem' : ''} | |
self.couch_storage = {'server': '', 'couchstorage': '', 'database': self.datastore_name } | |
self.default_policy = {} | |
self.XP = 'science_data' | |
self.exchange_name = 'ingestion_queue' | |
#------------------------------------------------------------------------ | |
# Refresh datastore before testing | |
#---------------------------------------------------------------------- | |
self.db = CouchDB_DM_DataStore() | |
self.db.delete_datastore(self.datastore_name) | |
#------------------------------------------------------------------------ | |
# Stream publisher | |
#---------------------------------------------------------------------- | |
self.input_stream_id = self.pubsub_cli.create_stream(name='input_stream',original=True) | |
stream_route = self.pubsub_cli.register_producer(exchange_name=self.exchange_name, stream_id=self.input_stream_id) | |
self.ctd_stream1_publisher = StreamPublisher(node=self.cc.node, name=('science_data',stream_route.routing_key), \ | |
process=self.cc) | |
def tearDown(self): | |
""" | |
Cleanup. Delete Subscription, Stream, Process Definition | |
""" | |
self.pubsub_cli.delete_stream(self.input_stream_id) | |
self._stop_container() | |
def test_ingestion_workers_writes_to_couch(self): | |
""" | |
Test that the ingestion workers are writing messages to couch | |
""" | |
#------------------------------------------------------------------------ | |
# Create ingestion configuration and activate it | |
#---------------------------------------------------------------------- | |
ingestion_configuration_id = self.ingestion_cli.create_ingestion_configuration(self.exchange_point_id, \ | |
self.couch_storage, self.hdf_storage, self.number_of_workers, self.default_policy) | |
self.ingestion_cli.activate_ingestion_configuration(ingestion_configuration_id) | |
#------------------------------------------------------------------------ | |
# Publish messages | |
#---------------------------------------------------------------------- | |
post = BlogPost( post_id = '1234', title = 'The beautiful life',author = {}, updated = 'too early', content ='summer' ) | |
self.ctd_stream1_publisher.publish(post) | |
comment = BlogComment(ref_id = '1234',author = {}, updated = 'too late',content = 'when summer comes') | |
self.ctd_stream1_publisher.publish(comment) | |
#------------------------------------------------------------------------ | |
# List the posts and the comments that should have been written to couch | |
#---------------------------------------------------------------------- | |
objs = self.db.list_objects(self.couch_storage['database']) | |
# the list of ion_objects... in our case BlogPost and BlogComment | |
ion_objs = [] | |
for obj in objs: | |
# read the document returned by list | |
result = self.db.read_doc(objs[0], '', 'dm_datastore') | |
# convert the persistence dict to an ion_object | |
ion_obj = self.db._persistence_dict_to_ion_object(result) | |
if isinstance(ion_obj, BlogPost): | |
log.debug("ION OBJECT: %s\n" % ion_obj) | |
log.debug("POST: %s\n" % post) | |
# since the retrieved document has an extra attribute, rev_id, which the orginal post did not have | |
# it is easier to compare the attributes than the whole objects | |
self.assertTrue(ion_obj.author == post.author), "The post is not to be found in couch storage" | |
self.assertTrue(ion_obj.title == post.title), "The post is not to be found in couch storage" | |
self.assertTrue(ion_obj.post_id == post.post_id), "The post is not to be found in couch storage" | |
elif isinstance(ion_obj, BlogComment): | |
log.debug("ION OBJECT: %s\n" % ion_obj) | |
log.debug("COMMENT: %s\n" % comment) | |
# since the retrieved document has an extra attribute, rev_id, which the orginal post did not have | |
# it is easier to compare the attributes than the whole objects | |
self.assertTrue(ion_obj.author == comment.author), "The comment is not to be found in couch storage" | |
self.assertTrue(ion_obj.content == comment.content), "The comment is not to be found in couch storage" | |
self.assertTrue(ion_obj.ref_id == comment.ref_id), "The comment is not to be found in couch storage" | |
#------------------------------------------------------------------------ | |
# Cleanup | |
#---------------------------------------------------------------------- | |
self.ingestion_cli.deactivate_ingestion_configuration(ingestion_configuration_id) | |
self.ingestion_cli.delete_ingestion_configuration(ingestion_configuration_id) | |
def test_ingestion_worker_receives_message(self): | |
""" | |
Test the activation of the ingestion configuration | |
""" | |
pass | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment