ROS2 Publisher-Subscriber源码解析
发布时间
阅读量:
阅读量
#include <chrono>
#include <memory>
#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"
using namespace std::chrono_literals;
class MinimalPublisher : public rclcpp::Node
{
public:
MinimalPublisher()
: Node("minimal_publisher"), count_(0)
{
publisher_ = this->create_publisher<std_msgs::msg::String>("topic", 10);
auto timer_callback =
[this]() -> void {
auto message = std_msgs::msg::String();
message.data = "Hello, world! " + std::to_string(this->count_++);
RCLCPP_INFO(this->get_logger(), "Publishing: '%s'", message.data.c_str());
this->publisher_->publish(message);
};
timer_ = this->create_wall_timer(500ms, timer_callback);
}
private:
rclcpp::TimerBase::SharedPtr timer_;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_;
size_t count_;
};
int main(int argc, char * argv[])
{
rclcpp::init(argc, argv);
rclcpp::spin(std::make_shared<MinimalPublisher>());
rclcpp::shutdown();
return 0;
}
cpp

MinimalPublisher必须继承rclcpp::Node类,这样才能让executor去spin它。MinimalPublisher声明了一个rclcpp::Publisher<std_msgs::msg::String>::SharedPtr成员,在构造函数中调用了rclcpp::Node::create_publisher<>进行了初始化。create_publisher的定义看一下
template <
typename MessageT,
typename AllocatorT = std::allocator<void>,
typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>>
std::shared_ptr<PublisherT>
create_publisher(
const std::string &topic_name,
const rclcpp::QoS &qos,
const PublisherOptionsWithAllocator<AllocatorT> &options =
PublisherOptionsWithAllocator<AllocatorT>());
cpp

emplate<typename MessageT, typename AllocatorT, typename PublisherT>
std::shared_ptr<PublisherT>
Node::create_publisher(
const std::string & topic_name,
const rclcpp::QoS & qos,
const PublisherOptionsWithAllocator<AllocatorT> & options)
{
// first to check whitelist of topic name
if (!rclcpp::node_interfaces::topology_validate_topic_name(topic_name)) {
throw rclcpp::exceptions::InvalidTopicNameError(
topic_name.c_str(),
"not in the whitelist",
0);
}
return rclcpp::create_publisher<MessageT, AllocatorT, PublisherT>(
*this,
extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
qos,
options);
}
cpp

template<
typename MessageT,
typename AllocatorT = std::allocator<void>,
typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>,
typename NodeT>
std::shared_ptr<PublisherT>
create_publisher(
NodeT & node,
const std::string & topic_name,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
)
)
{
return detail::create_publisher<MessageT, AllocatorT, PublisherT>(
node, node, topic_name, qos, options);
}
cpp

template<
typename MessageT,
typename AllocatorT = std::allocator<void>,
typename PublisherT = rclcpp::Publisher<MessageT, AllocatorT>,
typename NodeParametersT,
typename NodeTopicsT>
std::shared_ptr<PublisherT>
create_publisher(
NodeParametersT & node_parameters,
NodeTopicsT & node_topics,
const std::string & topic_name,
const rclcpp::QoS & qos,
const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options = (
rclcpp::PublisherOptionsWithAllocator<AllocatorT>()
)
)
{
auto node_topics_interface = rclcpp::node_interfaces::get_node_topics_interface(node_topics);
const rclcpp::QoS & actual_qos = options.qos_overriding_options.get_policy_kinds().size() ?
rclcpp::detail::declare_qos_parameters(
options.qos_overriding_options, node_parameters,
node_topics_interface->resolve_topic_name(topic_name),
qos, rclcpp::detail::PublisherQosParametersTraits{}) :
qos;
// Create the publisher.
auto pub = node_topics_interface->create_publisher(
topic_name,
rclcpp::create_publisher_factory<MessageT, AllocatorT, PublisherT>(options),
actual_qos
);
// Add the publisher to the node topics interface.
node_topics_interface->add_publisher(pub, options.callback_group);
return std::dynamic_pointer_cast<PublisherT>(pub);
}
cpp

create_publisher借助一个rclcpp::PublisherFactory中的函数成员来构造PublisherBase
node_topics.cpp
rclcpp::PublisherBase::SharedPtr
NodeTopics::create_publisher(
const std::string & topic_name,
const rclcpp::PublisherFactory & publisher_factory,
const rclcpp::QoS & qos)
{
rclcpp::QoS & qos_adapter = const_cast<rclcpp::QoS &>(qos);
qos_adapter.reliable();
// Create the MessageT specific Publisher using the factory, but return it as PublisherBase.
return publisher_factory.create_typed_publisher(node_base_, topic_name, qos_adapter);
}
cpp

create_typed_publisher成员是一个lambda函数构造的函数对象,这个函数为:
[options](
rclcpp::node_interfaces::NodeBaseInterface * node_base,
const std::string & topic_name,
const rclcpp::QoS & qos
) -> std::shared_ptr<PublisherT>
{
auto publisher = std::make_shared<PublisherT>(node_base, topic_name, qos, options);
// This is used for setting up things like intra process comms which
// require this->shared_from_this() which cannot be called from
// the constructor.
publisher->post_init_setup(node_base, topic_name, qos, options);
return publisher;
}
cpp

node 初始化过程:
ret = rcl_node_init(
rcl_node.get(),
node_name.c_str(), namespace_.c_str(),
context_->get_rcl_context().get(), &rcl_node_options);
cpp
node->impl->rmw_node_handle = rmw_create_node(
&(node->context->impl->rmw_context),
name, local_namespace_);
cpp
rmw_node_t *
rmw_api_connextdds_create_node(
rmw_context_t * context,
const char * name,
const char * ns)
cpp
rmw_ret_t
rmw_context_impl_t::initialize_node(
const char * const node_namespace,
const char * const node_name,
const bool localhost_only)
cpp
rmw_ret_t
rmw_context_impl_t::initialize_participant(
const char * const node_namespace,
const char * const node_name,
const bool localhost_only)
cpp
RMW_Connext_Node *
RMW_Connext_Node::create(
rmw_context_impl_t * const ctx)
cpp
并初始化和更新了graph_cache
rmw_ret_t
rmw_context_impl_t::enable_participant()
{
if (DDS_RETCODE_OK !=
DDS_Entity_enable(
DDS_DomainParticipant_as_entity(this->participant)))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to enable participant")
return RMW_RET_ERROR;
}
if (DDS_RETCODE_OK !=
DDS_Entity_enable(DDS_Subscriber_as_entity(this->dds_sub)))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to enable dds subscriber")
return RMW_RET_ERROR;
}
if (DDS_RETCODE_OK !=
DDS_Entity_enable(DDS_Publisher_as_entity(this->dds_pub)))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to enable dds subscriber")
return RMW_RET_ERROR;
}
if (RMW_RET_OK != rmw_connextdds_graph_enable(this)) {
RMW_CONNEXT_LOG_ERROR("failed to enable graph cache")
return RMW_RET_ERROR;
}
return RMW_RET_OK;
}
cpp

并在此时启动了discover线程
rmw_ret_t
rmw_connextdds_graph_enable(rmw_context_impl_t * const ctx)
{
auto pub = reinterpret_cast<RMW_Connext_Publisher *>(ctx->common.pub->data);
if (RMW_RET_OK != pub->enable()) {
return RMW_RET_ERROR;
}
auto sub = reinterpret_cast<RMW_Connext_Subscriber *>(ctx->common.sub->data);
if (RMW_RET_OK != sub->enable()) {
return RMW_RET_ERROR;
}
if (RMW_RET_OK != rmw_connextdds_enable_builtin_readers(ctx)) {
return RMW_RET_ERROR;
}
rmw_ret_t ret = rmw_connextdds_discovery_thread_start(ctx);
if (RMW_RET_OK != ret) {
RMW_CONNEXT_LOG_ERROR("failed to start discovery thread")
return ret;
}
return RMW_RET_OK;
}
cpp

rmw_ret_t
rmw_connextdds_discovery_thread_start(rmw_context_impl_t * ctx)
{
rmw_dds_common::Context * const common_ctx = &ctx->common;
RMW_CONNEXT_LOG_DEBUG("starting discovery thread...")
common_ctx->listener_thread_gc =
rmw_connextdds_create_guard_condition(true /* internal */);
if (nullptr == common_ctx->listener_thread_gc) {
RMW_CONNEXT_LOG_ERROR(
"failed to create discovery thread condition")
return RMW_RET_ERROR;
}
common_ctx->thread_is_running.store(true);
try {
common_ctx->listener_thread =
std::thread(rmw_connextdds_discovery_thread, ctx);
const char* THREAD_NAME = "ROSdiscovery";
//pthread_setname_np(common_ctx->listener_thread.native_handle(), THREAD_NAME);
ros::common::utils::SetThreadName(common_ctx->listener_thread.native_handle(), THREAD_NAME);
RMW_CONNEXT_LOG_DEBUG("discovery thread started")
return RMW_RET_OK;
} catch (const std::exception & exc) {
RMW_CONNEXT_LOG_ERROR_A_SET("Failed to create std::thread: %s", exc.what())
} catch (...) {
RMW_CONNEXT_LOG_ERROR_SET("Failed to create std::thread")
}
/* We'll get here only on error, so clean up things accordingly */
common_ctx->thread_is_running.store(false);
if (RMW_RET_OK !=
rmw_connextdds_destroy_guard_condition(
common_ctx->listener_thread_gc))
{
RMW_CONNEXT_LOG_ERROR(
"Failed to destroy discovery thread guard condition")
}
return RMW_RET_ERROR;
}
cpp

全部评论 (0)
还没有任何评论哟~
