Skip to content

Instantly share code, notes, and snippets.

@cmower
Last active April 5, 2021 12:04
Show Gist options
  • Save cmower/07b51db0b74881aa361609958fee4d6a to your computer and use it in GitHub Desktop.
Save cmower/07b51db0b74881aa361609958fee4d6a to your computer and use it in GitHub Desktop.
ROS node base class.
"""Base class for ROS nodes, a ROS package using this will always require geometry_msgs and tf2_ros as deps."""
import tf2_ros
from geometry_msg.msg import TransformStamped
class RosNode:
def __init__(self, rospy):
"""Child-classes need to make a call to super().__init__(rospy)"""
self.__rp = rospy
self.__tf_broadcaster = tf2_ros.TransformBroadcaster()
self.__tf_buffer = tf2_ros.Buffer()
tf2_ros.TransformListener(self.__tf_buffer)
self.subs = {} # Subscribers
self.pubs = {} # Publishers
self.srvs = {} # Services
self.timers = {} # Timers
self.msgs = {} # Messages
def getTf(self, base_frame_id, child_frame_id):
tf = None
try:
tf = self.__tf_buffer.lookup_transform(base_frame_id, child_frame_id, self.rp.Time())
except (tf2_ros.LookupException, tf2_ros.ConnectivityException, tf2_ros.ExtrapolationException):
self.__rp.logwarn(f'Did not recover frame {child_frame_id} in the frame {base_frame_id}!')
return tf
def setTf(self, position, quaternion, base_frame_id, child_frame_id):
tf = TransformStamped()
tf.header.stamp = self.__rp.Time.now()
tf.header.frame_id = base_frame_id
for i, dim in enumerate('xyz'):
setattr(tf.transform.translation, dim, position[i])
setattr(tf.transform.rotation, dim, quaternion[i])
tf.transform.rotation.w = quaternion[3]
self.__tf_broadcaster.sendTransform(tf)
def startTimer(self, name, frequency, handle):
self.timers[name] = self.__rp.Timer(self.__rp.Duration(1.0/float(frequency)), handle)
def startSubscriber(self, name, topic, type):
self.subs[name] = self.__rp.Subscriber(topic, type, self.__callback, callback_args=name)
def __callback(self, msg, name):
self.msgs[name] = msg
def spin(self):
self.__rp.spin()
def base_shutdown(self):
for timer in self.timers.values():
timer.shutdown()
for sub in self.subs.values():
sub.unregister()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment