Last active
April 5, 2021 12:04
-
-
Save cmower/07b51db0b74881aa361609958fee4d6a to your computer and use it in GitHub Desktop.
ROS node base class.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Base class for ROS nodes, a ROS package using this will always require geometry_msgs and tf2_ros as deps.""" | |
import tf2_ros | |
from geometry_msg.msg import TransformStamped | |
class RosNode: | |
def __init__(self, rospy): | |
"""Child-classes need to make a call to super().__init__(rospy)""" | |
self.__rp = rospy | |
self.__tf_broadcaster = tf2_ros.TransformBroadcaster() | |
self.__tf_buffer = tf2_ros.Buffer() | |
tf2_ros.TransformListener(self.__tf_buffer) | |
self.subs = {} # Subscribers | |
self.pubs = {} # Publishers | |
self.srvs = {} # Services | |
self.timers = {} # Timers | |
self.msgs = {} # Messages | |
def getTf(self, base_frame_id, child_frame_id): | |
tf = None | |
try: | |
tf = self.__tf_buffer.lookup_transform(base_frame_id, child_frame_id, self.rp.Time()) | |
except (tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException): | |
self.__rp.logwarn(f'Did not recover frame {child_frame_id} in the frame {base_frame_id}!') | |
return tf | |
def setTf(self, position, quaternion, base_frame_id, child_frame_id): | |
tf = TransformStamped() | |
tf.header.stamp = self.__rp.Time.now() | |
tf.header.frame_id = base_frame_id | |
for i, dim in enumerate('xyz'): | |
setattr(tf.transform.translation, dim, position[i]) | |
setattr(tf.transform.rotation, dim, quaternion[i]) | |
tf.transform.rotation.w = quaternion[3] | |
self.__tf_broadcaster.sendTransform(tf) | |
def startTimer(self, name, frequency, handle): | |
self.timers[name] = self.__rp.Timer(self.__rp.Duration(1.0/float(frequency)), handle) | |
def startSubscriber(self, name, topic, type): | |
self.subs[name] = self.__rp.Subscriber(topic, type, self.__callback, callback_args=name) | |
def __callback(self, msg, name): | |
self.msgs[name] = msg | |
def spin(self): | |
self.__rp.spin() | |
def base_shutdown(self): | |
for timer in self.timers.values(): | |
timer.shutdown() | |
for sub in self.subs.values(): | |
sub.unregister() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment