Created
October 8, 2012 19:12
-
-
Save swarbhanu/3854310 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
@author MManning | |
@file ion/processes/data/transforms/ctd/ctd_L1_conductivity.py | |
@description Transforms CTD parsed data into L1 product for conductivity | |
''' | |
from pyon.ion.transforma import TransformDataProcess | |
from pyon.core.exception import BadRequest | |
from interface.services.dm.ipubsub_management_service import PubsubManagementServiceProcessClient | |
from ion.services.dm.utility.granule.record_dictionary import RecordDictionaryTool | |
from ion.core.function.transform_function import SimpleGranuleTransformFunction | |
# For usage: please refer to the integration tests in | |
# ion/processes/data/transforms/ctd/test/test_ctd_transforms.py | |
class CTDL1ConductivityTransform(TransformDataProcess): | |
''' A basic transform that receives input through a subscription, | |
parses the input from a CTD, extracts the conductivity value and scales it according to | |
the defined algorithm. If the transform | |
has an output_stream it will publish the output on the output stream. | |
''' | |
def on_start(self): | |
super(CTDL1ConductivityTransform, self).on_start() | |
if not self.CFG.process.publish_streams.has_key('conductivity'): | |
raise BadRequest("For CTD transforms, please send the stream_id using " | |
"a special keyword (ex: conductivity)") | |
self.cond_stream = self.CFG.process.publish_streams.conductivity | |
# Read the parameter dict from the stream def of the stream | |
pubsub = PubsubManagementServiceProcessClient(process=self) | |
self.stream_definition = pubsub.read_stream_definition(stream_id=self.cond_stream) | |
def recv_packet(self, packet, stream_route, stream_id): | |
"""Processes incoming data!!!! | |
""" | |
if packet == {}: | |
return | |
granule = CTDL1ConductivityTransformAlgorithm.execute(packet, params=self.stream_definition._id) | |
self.conductivity.publish(msg=granule) | |
class CTDL1ConductivityTransformAlgorithm(SimpleGranuleTransformFunction): | |
@staticmethod | |
@SimpleGranuleTransformFunction.validate_inputs | |
def execute(input=None, context=None, config=None, params=None, state=None): | |
rdt = RecordDictionaryTool.load_from_granule(input) | |
conductivity = rdt['conductivity'] | |
cond_value = (conductivity / 100000.0) - 0.5 | |
values = {} | |
for field_name,value in rdt.iteritems(): | |
values[field_name]=value | |
# Update the conductivity values | |
values['conductivity'] = cond_value | |
# build the granule for conductivity | |
result = CTDL1ConductivityTransformAlgorithm._build_granule(stream_definition_id = params, | |
values=values) | |
return result | |
@staticmethod | |
def _build_granule(stream_definition_id=None, values=None): | |
root_rdt = RecordDictionaryTool(stream_definition_id=stream_definition_id) | |
for field_name, value in values.iteritems(): | |
root_rdt[field_name] = value | |
return root_rdt.to_granule() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment