Advertisement

ROS2 Publisher-Subscriber源码解析

阅读量:
复制代码
 #include <chrono>

    
 #include <memory>
    
 #include "rclcpp/rclcpp.hpp"
    
 #include "std_msgs/msg/string.hpp"
    
  
    
 using namespace std::chrono_literals;
    
 class MinimalPublisher : public rclcpp::Node
    
 {
    
 public:
    
   MinimalPublisher()
    
   : Node("minimal_publisher"), count_(0)
    
   {
    
     publisher_ = this->create_publisher<std_msgs::msg::String>("topic", 10);
    
     auto timer_callback =
    
       [this]() -> void {
    
     auto message = std_msgs::msg::String();
    
     message.data = "Hello, world! " + std::to_string(this->count_++);
    
     RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", message.data.c_str());
    
     this->publisher_->publish(message);
    
       };
    
     timer_ = this->create_wall_timer(500ms, timer_callback);
    
   }
    
  
    
 private:
    
   rclcpp::TimerBase::SharedPtr timer_;
    
   rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
    
   size_t count_;
    
 };
    
  
    
 int main(int argc, char * argv[])
    
 {
    
   rclcpp::init(argc, argv);
    
   rclcpp::spin(std::make_shared<MinimalPublisher>());
    
   rclcpp::shutdown();
    
   return 0;
    
 }
    
    
    
    
    cpp
    
    
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-17/m1DgjV9nFR5w3SXrHsL2lApUak4x.png)

MinimalPublisher必须继承rclcpp::Node类,这样才能让executor去spin它。MinimalPublisher声明了一个rclcpp::Publisher<std_msgs::msg::String>::SharedPtr成员,在构造函数中调用了rclcpp::Node::create_publisher<>进行了初始化。create_publisher的定义看一下

复制代码
     template <

    
     typename MessageT,
    
     typename AllocatorT = std::allocator<void>,
    
     typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>>
    
     std::shared_ptr<PublisherT> 
    
     create_publisher(
    
     const std::string &topic_name,
    
     const rclcpp::QoS &qos,
    
     const PublisherOptionsWithAllocator<AllocatorT> &options =
    
         PublisherOptionsWithAllocator<AllocatorT>());
    
    
    
    
    cpp
    
    
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-17/rPs5lHkn8wDTNb14RBgjUOCaKM2X.png)
复制代码
 emplate<typename MessageT, typename AllocatorT, typename PublisherT>

    
 std::shared_ptr<PublisherT>
    
 Node::create_publisher(
    
   const std::string & topic_name,
    
   const rclcpp::QoS & qos,
    
   const PublisherOptionsWithAllocator<AllocatorT> & options)
    
 {
    
   // first to check whitelist of topic name
    
   if (!rclcpp::node_interfaces::topology_validate_topic_name(topic_name)) {
    
     throw rclcpp::exceptions::InvalidTopicNameError(
    
             topic_name.c_str(),
    
             "not in the whitelist",
    
             0);
    
   }
    
   return rclcpp::create_publisher<MessageT, AllocatorT, PublisherT>(
    
     *this,
    
     extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
    
     qos,
    
     options);
    
 }
    
    
    
    
    cpp
    
    
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-17/iqYpSzNdT7mXA06UDQyVxEgLtKFO.png)
复制代码
 template<

    
   typename MessageT,
    
   typename AllocatorT = std::allocator<void>,
    
   typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>,
    
   typename NodeT>
    
 std::shared_ptr<PublisherT>
    
 create_publisher(
    
   NodeT & node,
    
   const std::string & topic_name,
    
   const rclcpp::QoS & qos,
    
   const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
    
     rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
    
   )
    
 )
    
 {
    
   return detail::create_publisher<MessageT, AllocatorT, PublisherT>(
    
     node, node, topic_name, qos, options);
    
 }
    
    
    
    
    cpp
    
    
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-17/FZI7CecRSEKihBm0pVo2rqdbJaNM.png)
复制代码
 template<

    
   typename MessageT,
    
   typename AllocatorT = std::allocator<void>,
    
   typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>,
    
   typename NodeParametersT,
    
   typename NodeTopicsT>
    
 std::shared_ptr<PublisherT>
    
 create_publisher(
    
   NodeParametersT & node_parameters,
    
   NodeTopicsT & node_topics,
    
   const std::string & topic_name,
    
   const rclcpp::QoS & qos,
    
   const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
    
     rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
    
   )
    
 )
    
 {
    
   auto node_topics_interface = rclcpp::node_interfaces::get_node_topics_interface(node_topics);
    
   const rclcpp::QoS & actual_qos = options.qos_overriding_options.get_policy_kinds().size() ?
    
     rclcpp::detail::declare_qos_parameters(
    
     options.qos_overriding_options, node_parameters,
    
     node_topics_interface->resolve_topic_name(topic_name),
    
     qos, rclcpp::detail::PublisherQosParametersTraits{}) :
    
     qos;
    
  
    
   // Create the publisher.
    
   auto pub = node_topics_interface->create_publisher(
    
     topic_name,
    
     rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(options),
    
     actual_qos
    
   );
    
  
    
   // Add the publisher to the node topics interface.
    
   node_topics_interface->add_publisher(pub, options.callback_group);
    
  
    
   return std::dynamic_pointer_cast<PublisherT>(pub);
    
 }
    
    
    
    
    cpp
    
    
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-17/jQlrAxta4Z98me13vbncBwNLHVWd.png)

create_publisher借助一个rclcpp::PublisherFactory中的函数成员来构造PublisherBase

node_topics.cpp

复制代码
 rclcpp::PublisherBase::SharedPtr

    
 NodeTopics::create_publisher(
    
   const std::string & topic_name,
    
   const rclcpp::PublisherFactory & publisher_factory,
    
   const rclcpp::QoS & qos)
    
 {
    
   rclcpp::QoS & qos_adapter = const_cast<rclcpp::QoS &>(qos);
    
   qos_adapter.reliable();
    
  
    
   // Create the MessageT specific Publisher using the factory, but return it as PublisherBase.
    
   return publisher_factory.create_typed_publisher(node_base_, topic_name, qos_adapter);
    
 }
    
    
    
    
    cpp
    
    
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-17/idMqZFRXIzsN0xT13vrBLt7eouD4.png)

create_typed_publisher成员是一个lambda函数构造的函数对象,这个函数为:

复制代码
     [options](

    
       rclcpp::node_interfaces::NodeBaseInterface * node_base,
    
       const std::string & topic_name,
    
       const rclcpp::QoS & qos
    
     ) -> std::shared_ptr<PublisherT>
    
     {
    
       auto publisher = std::make_shared<PublisherT>(node_base, topic_name, qos, options);
    
       // This is used for setting up things like intra process comms which
    
       // require this->shared_from_this() which cannot be called from
    
       // the constructor.
    
       publisher->post_init_setup(node_base, topic_name, qos, options);
    
       return publisher;
    
     }
    
    
    
    
    cpp
    
    
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-17/anyNQuzjpKV3Cm9cHSYWRg4G2bkU.png)

node 初始化过程:

复制代码
 ret = rcl_node_init(

    
       rcl_node.get(),
    
       node_name.c_str(), namespace_.c_str(),
    
       context_->get_rcl_context().get(), &rcl_node_options);
    
    
    
    
    cpp
    
    
复制代码
 node->impl->rmw_node_handle = rmw_create_node(

    
     &(node->context->impl->rmw_context),
    
     name, local_namespace_);
    
    
    
    
    cpp
    
    
复制代码
 rmw_node_t *

    
 rmw_api_connextdds_create_node(
    
   rmw_context_t * context,
    
   const char * name,
    
   const char * ns)
    
    
    
    
    cpp
    
    
复制代码
 rmw_ret_t

    
 rmw_context_impl_t::initialize_node(
    
   const char * const node_namespace,
    
   const char * const node_name,
    
   const bool localhost_only)
    
    
    
    
    cpp
    
    
复制代码
 rmw_ret_t

    
 rmw_context_impl_t::initialize_participant(
    
   const char * const node_namespace,
    
   const char * const node_name,
    
   const bool localhost_only)
    
    
    
    
    cpp
    
    
复制代码
 RMW_Connext_Node *

    
 RMW_Connext_Node::create(
    
   rmw_context_impl_t * const ctx)
    
    
    
    
    cpp
    
    

并初始化和更新了graph_cache

复制代码
 rmw_ret_t

    
 rmw_context_impl_t::enable_participant()
    
 {
    
   if (DDS_RETCODE_OK !=
    
     DDS_Entity_enable(
    
       DDS_DomainParticipant_as_entity(this->participant)))
    
   {
    
     RMW_CONNEXT_LOG_ERROR_SET("failed to enable participant")
    
     return RMW_RET_ERROR;
    
   }
    
  
    
   if (DDS_RETCODE_OK !=
    
     DDS_Entity_enable(DDS_Subscriber_as_entity(this->dds_sub)))
    
   {
    
     RMW_CONNEXT_LOG_ERROR_SET("failed to enable dds subscriber")
    
     return RMW_RET_ERROR;
    
   }
    
  
    
   if (DDS_RETCODE_OK !=
    
     DDS_Entity_enable(DDS_Publisher_as_entity(this->dds_pub)))
    
   {
    
     RMW_CONNEXT_LOG_ERROR_SET("failed to enable dds subscriber")
    
     return RMW_RET_ERROR;
    
   }
    
  
    
   if (RMW_RET_OK != rmw_connextdds_graph_enable(this)) {
    
     RMW_CONNEXT_LOG_ERROR("failed to enable graph cache")
    
     return RMW_RET_ERROR;
    
   }
    
  
    
   return RMW_RET_OK;
    
 }
    
    
    
    
    cpp
    
    
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-17/t5cC7PlG8VROqFHNKpQmEg6w3iuY.png)

并在此时启动了discover线程

复制代码
  
    
 rmw_ret_t
    
 rmw_connextdds_graph_enable(rmw_context_impl_t * const ctx)
    
 {
    
   auto pub = reinterpret_cast<RMW_Connext_Publisher *>(ctx->common.pub->data);
    
   if (RMW_RET_OK != pub->enable()) {
    
     return RMW_RET_ERROR;
    
   }
    
  
    
   auto sub = reinterpret_cast<RMW_Connext_Subscriber *>(ctx->common.sub->data);
    
   if (RMW_RET_OK != sub->enable()) {
    
     return RMW_RET_ERROR;
    
   }
    
  
    
   if (RMW_RET_OK != rmw_connextdds_enable_builtin_readers(ctx)) {
    
     return RMW_RET_ERROR;
    
   }
    
  
    
   rmw_ret_t ret = rmw_connextdds_discovery_thread_start(ctx);
    
   if (RMW_RET_OK != ret) {
    
     RMW_CONNEXT_LOG_ERROR("failed to start discovery thread")
    
     return ret;
    
   }
    
  
    
   return RMW_RET_OK;
    
 }
    
    
    
    
    cpp
    
    
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-17/KOHRzaMX1qNgc85o9peVuntiA74b.png)
复制代码
  
    
 rmw_ret_t
    
 rmw_connextdds_discovery_thread_start(rmw_context_impl_t * ctx)
    
 {
    
   rmw_dds_common::Context * const common_ctx = &ctx->common;
    
  
    
   RMW_CONNEXT_LOG_DEBUG("starting discovery thread...")
    
  
    
   common_ctx->listener_thread_gc =
    
     rmw_connextdds_create_guard_condition(true /* internal */);
    
   if (nullptr == common_ctx->listener_thread_gc) {
    
     RMW_CONNEXT_LOG_ERROR(
    
       "failed to create discovery thread condition")
    
     return RMW_RET_ERROR;
    
   }
    
  
    
   common_ctx->thread_is_running.store(true);
    
  
    
   try {
    
     common_ctx->listener_thread =
    
       std::thread(rmw_connextdds_discovery_thread, ctx);
    
     const char* THREAD_NAME = "ROSdiscovery";
    
     //pthread_setname_np(common_ctx->listener_thread.native_handle(), THREAD_NAME);
    
     ros::common::utils::SetThreadName(common_ctx->listener_thread.native_handle(), THREAD_NAME);
    
  
    
     RMW_CONNEXT_LOG_DEBUG("discovery thread started")
    
  
    
     return RMW_RET_OK;
    
   } catch (const std::exception & exc) {
    
     RMW_CONNEXT_LOG_ERROR_A_SET("Failed to create std::thread: %s", exc.what())
    
   } catch (...) {
    
     RMW_CONNEXT_LOG_ERROR_SET("Failed to create std::thread")
    
   }
    
  
    
   /* We'll get here only on error, so clean up things accordingly */
    
  
    
   common_ctx->thread_is_running.store(false);
    
   if (RMW_RET_OK !=
    
     rmw_connextdds_destroy_guard_condition(
    
       common_ctx->listener_thread_gc))
    
   {
    
     RMW_CONNEXT_LOG_ERROR(
    
       "Failed to destroy discovery thread guard condition")
    
   }
    
  
    
   return RMW_RET_ERROR;
    
 }
    
    
    
    
    cpp
    
    
![](https://ad.itadn.com/c/weblog/blog-img/images/2025-08-17/8b9NCnacfUPldDWTEhqxBS4RpoYu.png)

全部评论 (0)

还没有任何评论哟~