工作了一段时间了,一直在无脑使用ROS进行进程间通信,没有去了解实际的通信原理。使用而不知道原理,并不高明。最近在学习Linux上各种通信方式,顺便通过阅读ROS Master源码来解构其一些看似神奇的功能。
本文出现的源码是ros_comm
仓库地址:https://github.com/ros/ros_comm.git 版本号:1.15.11
主要是看roscpp的代码,这部分管理cpp相关节点的功能。
Master启动
启动代码位于roscpp/src/libros/init.cpp
void start() {
boost::mutex::scoped_lock lock(g_start_mutex);
if (g_started) {
return;
}
g_shutdown_requested = false;
g_shutting_down = false;
g_started = true;
g_ok = true;
//省略一些debug代码
param::param("/tcp_keepalive", TransportTCP::s_use_keepalive_, TransportTCP::s_use_keepalive_);
/* 这里有几个重要的部件
PollManager:一个信号监听器,在ros停止时发送信号停止其他ros节点
XMLRPCManager:利用XMLRPC机制处理topic和service通信的关键部件
TopicManager:管理topic的接收和发送,此处挂有各个订阅者的回调函数指针
ServiceManager:
ConnectionManager:
*/
PollManager::instance()->addPollThreadListener(checkForShutdown);
XMLRPCManager::instance()->bind("shutdown", shutdownCallback);
initInternalTimerManager();
TopicManager::instance()->start();
ServiceManager::instance()->start();
ConnectionManager::instance()->start();
PollManager::instance()->start();
XMLRPCManager::instance()->start();
//设置一些信号处理函数
if (!(g_init_options & init_options::NoSigintHandler)) {
signal(SIGINT, basicSigintHandler);
}
//初始化时钟
ros::Time::init();
//……
//初始化一个用于回调函数的任务队列,负责服务的回调,log的发送
g_internal_queue_thread = boost::thread(internalCallbackQueueThreadFunc);
getGlobalCallbackQueue()->enable();
//……
}
启动过程大概是:
- 先用线程锁保证不会有多个线程同时开始初始化
- 初始化几个Manager关键部件
- 设置log位置和定义相关Topic
- 设置clock
关键的几大部件:
- PollManager:处理网络读写
- XMLRPCManager:利用XMLRPC机制处理topic和service通信的关键部件
- TopicManager:管理各个订阅者的回调函数,当有topic发送时,调用对应的回调
- ServiceManager:服务管理器,用于管理已经发起的服务,当有节点调用服务时,负责调用对应节点的服务处理函数
- ConnectionManager:作为服务和话题的依赖,在发起服务/话题时创建节点之间的连接
服务管理器的初始化
ServiceManager
代码位于roscpp/src/libros/service_manager.cpp
void ServiceManager::start() {
shutting_down_ = false;
poll_manager_ = PollManager::instance();
connection_manager_ = ConnectionManager::instance();
xmlrpc_manager_ = XMLRPCManager::instance();
}
服务管理器的初始化:
- 创建出xmlrpc_manager_,此处只用来发起任务时获取一个url。
- 创建出poll_manager_,用于在client创建时,创建一个client-server的绑定
- connection_manager_,用于在client创建时,创建一个client-server的连接
服务的发起和呼叫
服务发起
ros中,发起服务使用nodehandle.advertiseService
方法
ServiceServer NodeHandle::advertiseService(AdvertiseServiceOptions& ops)
{
ops.service = resolveName(ops.service);
if (ops.callback_queue == 0)
{ //若用户发起服务未指定回调,则使用自身的回调
if (callback_queue_)
{
ops.callback_queue = callback_queue_;
}
else
{
ops.callback_queue = getGlobalCallbackQueue();
}
}
//本质上还是要使用ServiceManager中advertiseService发起服务
if (ServiceManager::instance()->advertiseService(ops))
{
ServiceServer srv(ops.service, *this);
{
boost::mutex::scoped_lock lock(collection_->mutex_);
collection_->srvs_.push_back(srv.impl_);
}
return srv;
}
return ServiceServer();
}
bool ServiceManager::advertiseService(const AdvertiseServiceOptions &ops) {
boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
if (shutting_down_) {
return false;
}
{
boost::mutex::scoped_lock lock(service_publications_mutex_);
//使用传入的服务参数,创建一个共享指针,放入订阅表 service_publications_ 中,这个订阅表后面呼叫服务的时候要使用
ServicePublicationPtr pub(
boost::make_shared<ServicePublication>(ops.service, ops.md5sum, ops.datatype, ops.req_datatype,
ops.res_datatype, ops.helper, ops.callback_queue,
ops.tracked_object));
service_publications_.push_back(pub);
}
//调用master的接口,注册一个Service,manager里只负责准备参数,真正发起服务的底层操作由Master负责
XmlRpcValue args, result, payload;
args[0] = this_node::getName();
args[1] = ops.service;
char uri_buf[1024];
std::snprintf(uri_buf, sizeof(uri_buf), "rosrpc://%s:%d",
network::getHost().c_str(), connection_manager_->getTCPPort());
args[2] = string(uri_buf);
args[3] = xmlrpc_manager_->getServerURI();
//调用xmlRCP服务发现,在Master上注册要发起的服务
master::execute("registerService", args, result, payload, true);
return true;
}
ServiceManager维护一个service_publications_
保证服务的唯一性。除此之外,交给master::execute
执行具体的登记服务操作。
源码很多地方都使用了master::execute方法,这个方法本质上是按照自定的网络协议,给对端发送网络请求,不太重要,该函数实现放在附录。
服务呼叫
通过NodeHandle获得一个client对象,较为简单。
ServiceClient NodeHandle::serviceClient(ServiceClientOptions& ops)
{
ops.service = resolveName(ops.service);
ServiceClient client(ops.service, ops.persistent, ops.header, ops.md5sum);
if (client)
{
boost::mutex::scoped_lock lock(collection_->mutex_);
collection_->srv_cs_.push_back(client.impl_);
}
return client;
}
呼叫服务主要是通过ServiceClient
对象,该对象提供call方法呼叫服务。ServiceClient
用于保存于服务端的连接
bool ServiceClient::call(const SerializedMessage& req, SerializedMessage& resp, const std::string& service_md5sum)
{
//此处省略了一些校验的逻辑
ServiceServerLinkPtr link;
if (impl_->persistent_)
{
//为了避免反复查表,只在第一次呼叫的时候,获得一个与服务器端的连接
if (!impl_->server_link_)
{
//创建一个与服务端的连接,该函数会通过ServiceManager,在上文提到的订阅表中,查到对应的服务,并且创建一个ServiceServerLink
impl_->server_link_ = ServiceManager::instance()->createServiceServerLink(impl_->name_, impl_->persistent_, service_md5sum, service_md5sum, impl_->header_values_);
link = impl_->server_link_;
}
else
{
link = ServiceManager::instance()->createServiceServerLink(impl_->name_, impl_->persistent_, service_md5sum, service_md5sum, impl_->header_values_);
}
//在使用返回的连接调用call
bool ret = link->call(req, resp);
link.reset();
//省略一些故障排除代码
return ret;
}
call方法由ServiceServerLink
实现;ServiceServerLink
封装了一些网络服务的细节,完成请求,在请求结束时提供返回给上层。
bool ServiceServerLink::call(const SerializedMessage& req, SerializedMessage& resp)
{
CallInfoPtr info(boost::make_shared<CallInfo>());
info->req_ = req;
info->resp_ = &resp;
info->success_ = false;
info->finished_ = false;
info->call_finished_ = false;
info->caller_thread_id_ = boost::this_thread::get_id();
bool immediate = false;
{
//省略故障排除代码
boost::mutex::scoped_lock lock(call_queue_mutex_);
if (call_queue_.empty() && header_written_ && header_read_)
{
//队列为空则马上处理呼叫,否则卡在finished_等待服务异步处理完成
immediate = true;
}
call_queue_.push(info);
}
//如果队列不为空,队列会自动发起下个任务,所以这里只需要判断为空时启动任务
if (immediate) processNextCall();
{
boost::mutex::scoped_lock lock(info->finished_mutex_);
while (!info->finished_)
{
//等待任务完成,任务完成会通过条件变量唤起线程
info->finished_condition_.wait(lock);
}
}
info->call_finished_ = true;
//省略故障排除代码
return info->success_;
}
void ServiceServerLink::processNextCall()
{
//省略了好多故障排除代码
SerializedMessage request;
{
boost::mutex::scoped_lock lock(call_queue_mutex_);
request = current_call_->req_;
}
//本质上是网络请求,发起请求,并在收到回调后调用onRequestWritten
connection_->write(request.buf, request.num_bytes, boost::bind(&ServiceServerLink::onRequestWritten, this, boost::placeholders::_1));
}
//主要是先解析长度,然后交由onResponseOkAndLength处理
void ServiceServerLink::onRequestWritten(const ConnectionPtr& conn)
{
(void)conn;
//ros::WallDuration(0.1).sleep();
connection_->read(5, boost::bind(&ServiceServerLink::onResponseOkAndLength, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, boost::placeholders::_4));
}
//读取真正的消息体
void ServiceServerLink::onResponseOkAndLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
{
//省略了异常处理代码
if (len > 0)
{
//从connection读取,绑定读取数据处理函数
connection_->read(len, boost::bind(&ServiceServerLink::onResponse, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, boost::placeholders::_4));
}
else
{
onResponse(conn, boost::shared_array<uint8_t>(), 0, true);
}
}
//填写rsp返回
void ServiceServerLink::onResponse(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
{
//省略异常处理代码
{
boost::mutex::scoped_lock queue_lock(call_queue_mutex_);
if (current_call_->success_)
{
//利用返回的buffer填充
*current_call_->resp_ = SerializedMessage(buffer, size);
}
}
//这个函数主要是处理标志位,并且触发队列中的下一个请求,call函数等待的标志位将在此处finished_condition_.notify_all()被唤起,call函数执行完
callFinished();
}
至此,一次服务呼叫完成。
Master在服务呼叫中的作用:
- 在服务发起时维护一个
service_publications_
,记录服务的名字,接口的格式 - 在
createServiceServerLink
时根据提供的信息创建于服务端的连接 - 新建一个client,只有首次呼叫服务是master参与的,当与服务连接创建完成后,master将不再参与其中的通信。
话题的发布和订阅
话题发布
话题的发布需要首先从nodehandle
通过advertise
中创建出Publisher
Publisher NodeHandle::advertise(AdvertiseOptions& ops)
{
ops.topic = resolveName(ops.topic);
if (ops.callback_queue == 0)
{
if (callback_queue_)
{
ops.callback_queue = callback_queue_;
}
else
{
ops.callback_queue = getGlobalCallbackQueue();
}
}
//创建一个回调,或由用户提供一个回调
SubscriberCallbacksPtr callbacks(boost::make_shared<SubscriberCallbacks>(ops.connect_cb, ops.disconnect_cb,
ops.tracked_object, ops.callback_queue));
//Publisher对象包含一个回调函数指针(暂不知道这个回调是做什么用的)
Publisher pub(ops.topic, ops.md5sum, ops.datatype, ops.latch, *this, callbacks);
//ros的设定,当该标记为true时,获取topic中最后一个发送的msg
//此处从getLastMessageCallback获得一个函数指针,指针实际指向实现:
/**
void Publisher::Impl::pushLastMessage(const SubscriberLinkPtr &sub_link) {
boost::mutex::scoped_lock lock(last_message_mutex_);
if (last_message_.buf) {
sub_link->enqueueMessage(last_message_, true, true);
}
}
**/
//当有新订阅者接入时,Publication通过此方法调用新订阅者的回调
if (ops.latch) {
callbacks->push_latched_message_ = pub.getLastMessageCallback();
}
//该函数在TopicManager中增加一个topic
if (TopicManager::instance()->advertise(ops, callbacks))
{
{
//在本node中记录此次订阅
boost::mutex::scoped_lock lock(collection_->mutex_);
collection_->pubs_.push_back(pub.impl_);
}
return pub;
}
return Publisher();
}
TopicManager中的advertise方法
bool TopicManager::advertise(const AdvertiseOptions& ops, const SubscriberCallbacksPtr& callbacks)
{
//省略一些校验
PublicationPtr pub;
{
boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
//……
pub = lookupPublicationWithoutLock(ops.topic);
if (pub && pub->getNumCallbacks() == 0)
{
pub.reset();
}
if (pub)
{
//若topic已有,则对比新旧topic消息,适用于多个发布者的情况
if (pub->getMD5Sum() != ops.md5sum)
{
ROS_ERROR("Tried to advertise on topic [%s] with md5sum [%s] and datatype [%s], but the topic is already advertised as md5sum [%s] and datatype [%s]",
ops.topic.c_str(), ops.md5sum.c_str(), ops.datatype.c_str(), pub->getMD5Sum().c_str(), pub->getDataType().c_str());
return false;
}
//对于已有的topic,将此次的回调,加入pub对象中,不重新新建
pub->addCallbacks(callbacks);
return true;
}
//新topic则新建
pub = PublicationPtr(boost::make_shared<Publication>(ops.topic, ops.datatype, ops.md5sum, ops.message_definition, ops.queue_size, false, ops.has_header));
//回调中包含了前面函数提到的Publisher::Impl::pushLastMessage方法
pub->addCallbacks(callbacks);
//放入topic管理表中
advertised_topics_.push_back(pub);
}
{
boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
advertised_topic_names_.push_back(ops.topic);
}
// Check whether we've already subscribed to this topic. If so, we'll do
// the self-subscription here, to avoid the deadlock that would happen if
// the ROS thread later got the publisherUpdate with its own XMLRPC URI.
// The assumption is that advertise() is called from somewhere other
// than the ROS thread.
bool found = false;
SubscriptionPtr sub;
{
boost::mutex::scoped_lock lock(subs_mutex_);
//订阅表中查找订阅者
for (L_Subscription::iterator s = subscriptions_.begin();
s != subscriptions_.end() && !found; ++s)
{
if ((*s)->getName() == ops.topic && md5sumsMatch((*s)->md5sum(), ops.md5sum) && !(*s)->isDropped())
{
found = true;
sub = *s;
break;
}
}
}
if(found)
{
//如果已有订阅,则跟已有的订阅者创建连接;如无,则等订阅者接入的时候,再新建连接
sub->addLocalConnection(pub);
}
XmlRpcValue args, result, payload;
args[0] = this_node::getName();
args[1] = ops.topic;
args[2] = ops.datatype;
args[3] = xmlrpc_manager_->getServerURI();
//调用xmlRCP服务发现,在Master上注册要发布的主题
master::execute("registerPublisher", args, result, payload, true);
return true;
}
至此,新建一个发布者的流程完成,在新建发布者的过程中,Master提供的TopicManager负责管理话题的发布/订阅关系。
需要适用发布者发送一个消息时,需要使用Publisher提供的publish方法
Publisher中的publish方法
void Publisher::publish(const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m) const
{
//……
TopicManager::instance()->publish(impl_->topic_, serfunc, m);
if (isLatched()) {
//保存最后一条消息,下次有订阅者加入 如果设定了latch参数,则保存最后一条发的消息,等新订阅者接入时,发给订阅者
boost::mutex::scoped_lock lock(impl_->last_message_mutex_);
impl_->last_message_ = m;
}
}
Topic中的publish
函数
void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m)
{
boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
if (isShuttingDown())
{
return;
}
//从发起topic时更新的表中,找到对应topic
PublicationPtr p = lookupPublicationWithoutLock(topic);
if (p->hasSubscribers() || p->isLatching())
{
ROS_DEBUG_NAMED("superdebug", "Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());
// Determine what kinds of subscribers we're publishing to. If they're intraprocess with the same C++ type we can
// do a no-copy publish.
bool nocopy = false;
bool serialize = false;
// We can only do a no-copy publish if a shared_ptr to the message is provided, and we have type information for it
if (m.type_info && m.message)
{
p->getPublishTypes(serialize, nocopy, *m.type_info);
}
else
{
serialize = true;
}
if (!nocopy)
{
m.message.reset();
m.type_info = 0;
}
if (serialize || p->isLatching())
{
//对msg进行序列化
SerializedMessage m2 = serfunc();
m.buf = m2.buf;
m.num_bytes = m2.num_bytes;
m.message_start = m2.message_start;
}
//交由PublicationPtr publish函数 处理对应的发送操作
p->publish(m);
// If we're not doing a serialized publish we don't need to signal the pollset. The write()
// call inside signal() is actually relatively expensive when doing a nocopy publish.
if (serialize)
{
poll_manager_->getPollSet().signal();
}
}
else
{
//若此时无订阅者,而且无需保存最后一条消息,则只做序列号增加操作
p->incrementSequence();
}
}
Publication
的publish
函数实现
void Publication::publish(SerializedMessage& m)
{
if (m.message)
{
boost::mutex::scoped_lock lock(subscriber_links_mutex_);
V_SubscriberLink::const_iterator it = subscriber_links_.begin();
V_SubscriberLink::const_iterator end = subscriber_links_.end();
for (; it != end; ++it)
{
const SubscriberLinkPtr& sub = *it;
//如果是本地的订阅者
if (sub->isIntraprocess())
{
//在topic的订阅列表中,逐个通知订阅者
sub->enqueueMessage(m, false, true);
}
}
m.message.reset();
}
if (m.buf)
{
boost::mutex::scoped_lock lock(publish_queue_mutex_);
publish_queue_.push_back(m);
}
}
enqueueMessage有两个版本的实现,在本地节点与远程节点的实现不相同,本地节点直接调用本地的回调,远程节点则调用封装的网络协议发送
//实现1:本地节点的版本
void IntraProcessSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy)
{
boost::recursive_mutex::scoped_lock lock(drop_mutex_);
if (dropped_)
{
return;
}
ROS_ASSERT(subscriber_);
//此处直接调用本地订阅者的回调
subscriber_->handleMessage(m, ser, nocopy);
}
//实现2:远程节点的版本
void TransportSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy)
{
(void)nocopy;
if (!ser)
{
return;
}
{
boost::mutex::scoped_lock lock(outbox_mutex_);
int max_queue = 0;
if (PublicationPtr parent = parent_.lock())
{
max_queue = parent->getMaxQueue();
}
ROS_DEBUG_NAMED("superdebug", "TransportSubscriberLink on topic [%s] to caller [%s], queueing message (queue size [%d])", topic_.c_str(), destination_caller_id_.c_str(), (int)outbox_.size());
if (max_queue > 0 && (int)outbox_.size() >= max_queue)
{
if (!queue_full_)
{
ROS_DEBUG("Outgoing queue full for topic [%s]. "
"Discarding oldest message",
topic_.c_str());
}
outbox_.pop(); // toss out the oldest thing in the queue to make room for us
queue_full_ = true;
}
else
{
queue_full_ = false;
}
outbox_.push(m);
}
//此处调用connection_->write(m.buf, m.num_bytes, boost::bind(&TransportSubscriberLink::onMessageWritten, this, boost::placeholders::_1), immediate_write) 将消息发出
startMessageWrite(false);
stats_.messages_sent_++;
stats_.bytes_sent_ += m.num_bytes;
stats_.message_data_sent_ += m.num_bytes;
}
话题的订阅
话题的订阅相对简单,也是要先从nodeHander获得一个订阅者
Subscriber NodeHandle::subscribe(SubscribeOptions& ops)
{
ops.topic = resolveName(ops.topic);
//绑定回调或者全局回调
if (ops.callback_queue == 0)
{
if (callback_queue_)
{
ops.callback_queue = callback_queue_;
}
else
{
ops.callback_queue = getGlobalCallbackQueue();
}
}
//与发布者的操作类似,都通过TopicManager形成发布者与订阅者的绑定关系
if (TopicManager::instance()->subscribe(ops))
{
Subscriber sub(ops.topic, *this, ops.helper);
{
boost::mutex::scoped_lock lock(collection_->mutex_);
//在表中记录该订阅信息
collection_->subs_.push_back(sub.impl_);
}
return sub;
}
//返回的订阅者,一般不使用
return Subscriber();
}
TopicManager的subscribe函数实现
bool TopicManager::subscribe(const SubscribeOptions& ops)
{
boost::mutex::scoped_lock lock(subs_mutex_);
//省略一些校验
const std::string& md5sum = ops.md5sum;
std::string datatype = ops.datatype;
SubscriptionPtr s(boost::make_shared<Subscription>(ops.topic, md5sum, datatype, ops.transport_hints));
s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks);
if (!registerSubscriber(s, ops.datatype))
{
ROS_WARN("couldn't register subscriber on topic [%s]", ops.topic.c_str());
s->shutdown();
return false;
}
//订阅表中加入该订阅
subscriptions_.push_back(s);
return true;
}
订阅的流程到此结束,
- TopicManager维护一张订阅关系表,当有新的订阅者时,加入订阅表中;
- 当有新的发布者,则会构建发布者和订阅者的连接
- 每次发布者发布消息,Master将消息序列化,调用订阅者的相关回调
- 如果是本地节点,则直接调用回调函数,无需复制,如是远程节点,则调用网络发送
参考网站
附录
master::execute函数的实现
本质上是把网络请求包装成函数请求,其中创建了请求头(head)和请求体(body),将获取的返回通过可读事件放入返回结构体,完成一次请求。
bool execute(const std::string& method, const XmlRpc::XmlRpcValue& request, XmlRpc::XmlRpcValue& response, XmlRpc::XmlRpcValue& payload, bool wait_for_master)
{
ros::SteadyTime start_time = ros::SteadyTime::now();
std::string master_host = getHost();
uint32_t master_port = getPort();
//做了好多花里胡哨的操作,其实就是创建一个XMLRPCClient,操作全都是由该对象负责
XmlRpc::XmlRpcClient *c = XMLRPCManager::instance()->getXMLRPCClient(master_host, master_port, "/");
bool printed = false;
bool slept = false;
bool ok = true;
bool b = false;
//最重要的一行,交给XMLRPCClient操作,最后返回操作是否成功,由于request和response都是以引用的方式传入,所以execute函数中能够直接填写response
b = c->execute(method.c_str(), request, response);
//此处忽略了一些释放XMLRPCClient和异常处理的代码
//……
return b;
}
XMLRPCClient::execute
bool
XmlRpcClient::execute(const char* method, XmlRpcValue const& params, XmlRpcValue& result)
{
XmlRpcUtil::log(1, "XmlRpcClient::execute: method %s (_connectionState %s).", method, connectionStateStr(_connectionState));
// This is not a thread-safe operation, if you want to do multithreading, use separate
// clients for each thread. If you want to protect yourself from multiple threads
// accessing the same client, replace this code with a real mutex.
if (_executing)
return false;
_executing = true;
ClearFlagOnExit cf(_executing);
_sendAttempts = 0;
_isFault = false;
if ( ! setupConnection())
return false;
//将方法名和方法参数,以一定格式,填入 _request
if ( ! generateRequest(method, params))
return false;
result.clear();
double msTime = -1.0; // Process until exit is called
//触发一个可写事件,在回调中将准备好的_request发送网络请求,超时或得到回复后,收到一个可读事件,将返回写入_response中(这些逻辑在handleEvent函数中)
_disp.work(msTime);
if (_connectionState != IDLE || ! parseResponse(result)) {
_header = "";
return false;
}
// close() if server does not supports HTTP1.1
// otherwise, reusing the socket to write leads to a SIGPIPE because
// the remote server could shut down the corresponding socket.
if (_header.find("HTTP/1.1 200 OK", 0, 15) != 0) {
close();
}
XmlRpcUtil::log(1, "XmlRpcClient::execute: method %s completed.", method);
_header = "";
_response = "";
return true;
}