源码看 ROS 通信机制

源码看 ROS 通信机制

Louis 2051 2023-03-19

工作了一段时间了,一直在无脑使用ROS进行进程间通信,没有去了解实际的通信原理。使用而不知道原理,并不高明。最近在学习Linux上各种通信方式,顺便通过阅读ROS Master源码来解构其一些看似神奇的功能。

本文出现的源码是ros_comm仓库地址:https://github.com/ros/ros_comm.git 版本号:1.15.11主要是看roscpp的代码,这部分管理cpp相关节点的功能。

Master启动

启动代码位于roscpp/src/libros/init.cpp

void start() {
  boost::mutex::scoped_lock lock(g_start_mutex);
  if (g_started) {
    return;
  }

  g_shutdown_requested = false;
  g_shutting_down = false;
  g_started = true;
  g_ok = true;

  //省略一些debug代码

  param::param("/tcp_keepalive", TransportTCP::s_use_keepalive_, TransportTCP::s_use_keepalive_);

  /* 这里有几个重要的部件
  PollManager:一个信号监听器,在ros停止时发送信号停止其他ros节点
  XMLRPCManager:利用XMLRPC机制处理topic和service通信的关键部件
  TopicManager:管理topic的接收和发送,此处挂有各个订阅者的回调函数指针
  ServiceManager:
  ConnectionManager:
  */
  PollManager::instance()->addPollThreadListener(checkForShutdown);
  XMLRPCManager::instance()->bind("shutdown", shutdownCallback);

  initInternalTimerManager();

  TopicManager::instance()->start();
  ServiceManager::instance()->start();
  ConnectionManager::instance()->start();
  PollManager::instance()->start();
  XMLRPCManager::instance()->start();

  //设置一些信号处理函数
  if (!(g_init_options & init_options::NoSigintHandler)) {
    signal(SIGINT, basicSigintHandler);
  }

  //初始化时钟
  ros::Time::init();

  //……

  //初始化一个用于回调函数的任务队列,负责服务的回调,log的发送
  g_internal_queue_thread = boost::thread(internalCallbackQueueThreadFunc);
  getGlobalCallbackQueue()->enable();
  //……
}

启动过程大概是:

  1. 先用线程锁保证不会有多个线程同时开始初始化
  2. 初始化几个Manager关键部件
  3. 设置log位置和定义相关Topic
  4. 设置clock

关键的几大部件:

  • PollManager:处理网络读写
  • XMLRPCManager:利用XMLRPC机制处理topic和service通信的关键部件
  • TopicManager:管理各个订阅者的回调函数,当有topic发送时,调用对应的回调
  • ServiceManager:服务管理器,用于管理已经发起的服务,当有节点调用服务时,负责调用对应节点的服务处理函数
  • ConnectionManager:作为服务和话题的依赖,在发起服务/话题时创建节点之间的连接

服务管理器的初始化

ServiceManager代码位于roscpp/src/libros/service_manager.cpp

void ServiceManager::start() {
    shutting_down_ = false;
    poll_manager_ = PollManager::instance();
    connection_manager_ = ConnectionManager::instance();
    xmlrpc_manager_ = XMLRPCManager::instance();
}

服务管理器的初始化:

  • 创建出xmlrpc_manager_,此处只用来发起任务时获取一个url。
  • 创建出poll_manager_,用于在client创建时,创建一个client-server的绑定
  • connection_manager_,用于在client创建时,创建一个client-server的连接

服务的发起和呼叫

服务发起

ros中,发起服务使用nodehandle.advertiseService方法

ServiceServer NodeHandle::advertiseService(AdvertiseServiceOptions& ops)
{
  ops.service = resolveName(ops.service);
  if (ops.callback_queue == 0)
  {	//若用户发起服务未指定回调,则使用自身的回调
    if (callback_queue_)
    {
      ops.callback_queue = callback_queue_;
    }
    else
    {
      ops.callback_queue = getGlobalCallbackQueue();
    }
  }

  //本质上还是要使用ServiceManager中advertiseService发起服务
  if (ServiceManager::instance()->advertiseService(ops))
  {
    ServiceServer srv(ops.service, *this);
    {
      boost::mutex::scoped_lock lock(collection_->mutex_);
      collection_->srvs_.push_back(srv.impl_);
    }
    return srv;
  }
  return ServiceServer();
}
bool ServiceManager::advertiseService(const AdvertiseServiceOptions &ops) {
  boost::recursive_mutex::scoped_lock shutdown_lock(shutting_down_mutex_);
  if (shutting_down_) {
    return false;
  }

  {
    boost::mutex::scoped_lock lock(service_publications_mutex_);

    //使用传入的服务参数,创建一个共享指针,放入订阅表 service_publications_ 中,这个订阅表后面呼叫服务的时候要使用
    ServicePublicationPtr pub(
      boost::make_shared<ServicePublication>(ops.service, ops.md5sum, ops.datatype, ops.req_datatype,
                                             ops.res_datatype, ops.helper, ops.callback_queue,
                                             ops.tracked_object));
    service_publications_.push_back(pub);
  }

  //调用master的接口,注册一个Service,manager里只负责准备参数,真正发起服务的底层操作由Master负责
  XmlRpcValue args, result, payload;
  args[0] = this_node::getName();
  args[1] = ops.service;
  char uri_buf[1024];
  std::snprintf(uri_buf, sizeof(uri_buf), "rosrpc://%s:%d",
                network::getHost().c_str(), connection_manager_->getTCPPort());
  args[2] = string(uri_buf);
  args[3] = xmlrpc_manager_->getServerURI();
    //调用xmlRCP服务发现,在Master上注册要发起的服务
  master::execute("registerService", args, result, payload, true);
  return true;
}

ServiceManager维护一个service_publications_保证服务的唯一性。除此之外,交给master::execute执行具体的登记服务操作。

源码很多地方都使用了master::execute方法,这个方法本质上是按照自定的网络协议,给对端发送网络请求,不太重要,该函数实现放在附录。

服务呼叫

通过NodeHandle获得一个client对象,较为简单。

ServiceClient NodeHandle::serviceClient(ServiceClientOptions& ops)
{
  ops.service = resolveName(ops.service);
  ServiceClient client(ops.service, ops.persistent, ops.header, ops.md5sum);
  if (client)
  {
    boost::mutex::scoped_lock lock(collection_->mutex_);
    collection_->srv_cs_.push_back(client.impl_);
  }
  return client;
}

呼叫服务主要是通过ServiceClient对象,该对象提供call方法呼叫服务。ServiceClient用于保存于服务端的连接

bool ServiceClient::call(const SerializedMessage& req, SerializedMessage& resp, const std::string& service_md5sum)
{
  //此处省略了一些校验的逻辑
  ServiceServerLinkPtr link;
  if (impl_->persistent_)
  {
    //为了避免反复查表,只在第一次呼叫的时候,获得一个与服务器端的连接
    if (!impl_->server_link_)
    {
        //创建一个与服务端的连接,该函数会通过ServiceManager,在上文提到的订阅表中,查到对应的服务,并且创建一个ServiceServerLink
      impl_->server_link_ = ServiceManager::instance()->createServiceServerLink(impl_->name_, impl_->persistent_, service_md5sum, service_md5sum, impl_->header_values_);
    link = impl_->server_link_;
  }
  else
  {
    link = ServiceManager::instance()->createServiceServerLink(impl_->name_, impl_->persistent_, service_md5sum, service_md5sum, impl_->header_values_);
  }
  //在使用返回的连接调用call
  bool ret = link->call(req, resp);
  link.reset();
  //省略一些故障排除代码
  return ret;
}

call方法由ServiceServerLink实现;ServiceServerLink 封装了一些网络服务的细节,完成请求,在请求结束时提供返回给上层。

bool ServiceServerLink::call(const SerializedMessage& req, SerializedMessage& resp)
{
  CallInfoPtr info(boost::make_shared<CallInfo>());
  info->req_ = req;
  info->resp_ = &resp;
  info->success_ = false;
  info->finished_ = false;
  info->call_finished_ = false;
  info->caller_thread_id_ = boost::this_thread::get_id();
  
  bool immediate = false;
  {
	//省略故障排除代码
    boost::mutex::scoped_lock lock(call_queue_mutex_);

    if (call_queue_.empty() && header_written_ && header_read_)
    {
      //队列为空则马上处理呼叫,否则卡在finished_等待服务异步处理完成
      immediate = true;
    }
    call_queue_.push(info);
  }
  //如果队列不为空,队列会自动发起下个任务,所以这里只需要判断为空时启动任务
  if (immediate) processNextCall();
    
  {
    boost::mutex::scoped_lock lock(info->finished_mutex_);

    while (!info->finished_)
    {
      //等待任务完成,任务完成会通过条件变量唤起线程
      info->finished_condition_.wait(lock);
    }
  }
  info->call_finished_ = true;
//省略故障排除代码
  return info->success_;
}
void ServiceServerLink::processNextCall()
{
    //省略了好多故障排除代码
    SerializedMessage request;
    {
      boost::mutex::scoped_lock lock(call_queue_mutex_);
      request = current_call_->req_;
    }
    //本质上是网络请求,发起请求,并在收到回调后调用onRequestWritten
    connection_->write(request.buf, request.num_bytes, boost::bind(&ServiceServerLink::onRequestWritten, this, boost::placeholders::_1));
}
//主要是先解析长度,然后交由onResponseOkAndLength处理
void ServiceServerLink::onRequestWritten(const ConnectionPtr& conn)
{
  (void)conn;
  //ros::WallDuration(0.1).sleep();
  connection_->read(5, boost::bind(&ServiceServerLink::onResponseOkAndLength, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, boost::placeholders::_4));
}
//读取真正的消息体
void ServiceServerLink::onResponseOkAndLength(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
{
  //省略了异常处理代码
  if (len > 0)
  {
  //从connection读取,绑定读取数据处理函数
    connection_->read(len, boost::bind(&ServiceServerLink::onResponse, this, boost::placeholders::_1, boost::placeholders::_2, boost::placeholders::_3, boost::placeholders::_4));
  }
  else
  {
    onResponse(conn, boost::shared_array<uint8_t>(), 0, true);
  }
}
//填写rsp返回
void ServiceServerLink::onResponse(const ConnectionPtr& conn, const boost::shared_array<uint8_t>& buffer, uint32_t size, bool success)
{
  //省略异常处理代码
  {
    boost::mutex::scoped_lock queue_lock(call_queue_mutex_);
    if (current_call_->success_)
    {
      //利用返回的buffer填充
      *current_call_->resp_ = SerializedMessage(buffer, size);
    }
  }
  //这个函数主要是处理标志位,并且触发队列中的下一个请求,call函数等待的标志位将在此处finished_condition_.notify_all()被唤起,call函数执行完
  callFinished();
}

至此,一次服务呼叫完成。

Master在服务呼叫中的作用:

  • 在服务发起时维护一个service_publications_,记录服务的名字,接口的格式
  • createServiceServerLink时根据提供的信息创建于服务端的连接
  • 新建一个client,只有首次呼叫服务是master参与的,当与服务连接创建完成后,master将不再参与其中的通信。

话题的发布和订阅

话题发布

话题的发布需要首先从nodehandle通过advertise中创建出Publisher

Publisher NodeHandle::advertise(AdvertiseOptions& ops)
{
  ops.topic = resolveName(ops.topic);
  if (ops.callback_queue == 0)
  {
    if (callback_queue_)
    {
      ops.callback_queue = callback_queue_;
    }
    else
    {
      ops.callback_queue = getGlobalCallbackQueue();
    }
  }

  //创建一个回调,或由用户提供一个回调
  SubscriberCallbacksPtr callbacks(boost::make_shared<SubscriberCallbacks>(ops.connect_cb, ops.disconnect_cb, 
                                                                           ops.tracked_object, ops.callback_queue));

   //Publisher对象包含一个回调函数指针(暂不知道这个回调是做什么用的)
  Publisher pub(ops.topic, ops.md5sum, ops.datatype, ops.latch, *this, callbacks);

  //ros的设定,当该标记为true时,获取topic中最后一个发送的msg
  //此处从getLastMessageCallback获得一个函数指针,指针实际指向实现:
/**
void Publisher::Impl::pushLastMessage(const SubscriberLinkPtr &sub_link) {
  boost::mutex::scoped_lock lock(last_message_mutex_);
  if (last_message_.buf) {
    sub_link->enqueueMessage(last_message_, true, true);
  }
}
**/
    //当有新订阅者接入时,Publication通过此方法调用新订阅者的回调
  if (ops.latch) {
    callbacks->push_latched_message_ = pub.getLastMessageCallback();
  }

  //该函数在TopicManager中增加一个topic
  if (TopicManager::instance()->advertise(ops, callbacks))
  {
    {
      //在本node中记录此次订阅
      boost::mutex::scoped_lock lock(collection_->mutex_);
      collection_->pubs_.push_back(pub.impl_);
    }
    return pub;
  }
  return Publisher();
}

TopicManager中的advertise方法

bool TopicManager::advertise(const AdvertiseOptions& ops, const SubscriberCallbacksPtr& callbacks)
{
    //省略一些校验
  PublicationPtr pub;

  {
    boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);
	//……
    pub = lookupPublicationWithoutLock(ops.topic);
    if (pub && pub->getNumCallbacks() == 0)
    {
      pub.reset();
    }

    if (pub)
    {
        //若topic已有,则对比新旧topic消息,适用于多个发布者的情况
      if (pub->getMD5Sum() != ops.md5sum)
      {
        ROS_ERROR("Tried to advertise on topic [%s] with md5sum [%s] and datatype [%s], but the topic is already advertised as md5sum [%s] and datatype [%s]",
                  ops.topic.c_str(), ops.md5sum.c_str(), ops.datatype.c_str(), pub->getMD5Sum().c_str(), pub->getDataType().c_str());
        return false;
      }
		//对于已有的topic,将此次的回调,加入pub对象中,不重新新建
      pub->addCallbacks(callbacks);

      return true;
    }
//新topic则新建
    pub = PublicationPtr(boost::make_shared<Publication>(ops.topic, ops.datatype, ops.md5sum, ops.message_definition, ops.queue_size, false, ops.has_header));
      //回调中包含了前面函数提到的Publisher::Impl::pushLastMessage方法
    pub->addCallbacks(callbacks);
      //放入topic管理表中
    advertised_topics_.push_back(pub);
  }


  {
    boost::mutex::scoped_lock lock(advertised_topic_names_mutex_);
    advertised_topic_names_.push_back(ops.topic);
  }

  // Check whether we've already subscribed to this topic.  If so, we'll do
  // the self-subscription here, to avoid the deadlock that would happen if
  // the ROS thread later got the publisherUpdate with its own XMLRPC URI.
  // The assumption is that advertise() is called from somewhere other
  // than the ROS thread.
  bool found = false;
  SubscriptionPtr sub;
  {
    boost::mutex::scoped_lock lock(subs_mutex_);
//订阅表中查找订阅者
    for (L_Subscription::iterator s = subscriptions_.begin();
         s != subscriptions_.end() && !found; ++s)
    {
      if ((*s)->getName() == ops.topic && md5sumsMatch((*s)->md5sum(), ops.md5sum) && !(*s)->isDropped())
      {
        found = true;
        sub = *s;
        break;
      }
    }
  }

  if(found)
  {
      //如果已有订阅,则跟已有的订阅者创建连接;如无,则等订阅者接入的时候,再新建连接
    sub->addLocalConnection(pub);
  }

  XmlRpcValue args, result, payload;
  args[0] = this_node::getName();
  args[1] = ops.topic;
  args[2] = ops.datatype;
  args[3] = xmlrpc_manager_->getServerURI();
    //调用xmlRCP服务发现,在Master上注册要发布的主题
  master::execute("registerPublisher", args, result, payload, true);

  return true;
}

至此,新建一个发布者的流程完成,在新建发布者的过程中,Master提供的TopicManager负责管理话题的发布/订阅关系。

需要适用发布者发送一个消息时,需要使用Publisher提供的publish方法

Publisher中的publish方法

void Publisher::publish(const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m) const
{
    //……
  TopicManager::instance()->publish(impl_->topic_, serfunc, m);

  if (isLatched()) {
      //保存最后一条消息,下次有订阅者加入 如果设定了latch参数,则保存最后一条发的消息,等新订阅者接入时,发给订阅者
    boost::mutex::scoped_lock lock(impl_->last_message_mutex_);
    impl_->last_message_ = m;
  }
}

Topic中的publish函数

void TopicManager::publish(const std::string& topic, const boost::function<SerializedMessage(void)>& serfunc, SerializedMessage& m)
{
  boost::recursive_mutex::scoped_lock lock(advertised_topics_mutex_);

  if (isShuttingDown())
  {
    return;
  }

  //从发起topic时更新的表中,找到对应topic
  PublicationPtr p = lookupPublicationWithoutLock(topic);
  if (p->hasSubscribers() || p->isLatching())
  {
    ROS_DEBUG_NAMED("superdebug", "Publishing message on topic [%s] with sequence number [%d]", p->getName().c_str(), p->getSequence());

    // Determine what kinds of subscribers we're publishing to.  If they're intraprocess with the same C++ type we can
    // do a no-copy publish.
    bool nocopy = false;
    bool serialize = false;

    // We can only do a no-copy publish if a shared_ptr to the message is provided, and we have type information for it
    if (m.type_info && m.message)
    {
      p->getPublishTypes(serialize, nocopy, *m.type_info);
    }
    else
    {
      serialize = true;
    }

    if (!nocopy)
    {
      m.message.reset();
      m.type_info = 0;
    }

    if (serialize || p->isLatching())
    {
        //对msg进行序列化
      SerializedMessage m2 = serfunc();
      m.buf = m2.buf;
      m.num_bytes = m2.num_bytes;
      m.message_start = m2.message_start;
    }

      //交由PublicationPtr publish函数 处理对应的发送操作
    p->publish(m);

    // If we're not doing a serialized publish we don't need to signal the pollset.  The write()
    // call inside signal() is actually relatively expensive when doing a nocopy publish.
    if (serialize)
    {
      poll_manager_->getPollSet().signal();
    }
  }
  else
  {
      //若此时无订阅者,而且无需保存最后一条消息,则只做序列号增加操作
    p->incrementSequence();
  }
}

Publicationpublish函数实现

void Publication::publish(SerializedMessage& m)
{
  if (m.message)
  {
    boost::mutex::scoped_lock lock(subscriber_links_mutex_);
    V_SubscriberLink::const_iterator it = subscriber_links_.begin();
    V_SubscriberLink::const_iterator end = subscriber_links_.end();
    for (; it != end; ++it)
    {
      const SubscriberLinkPtr& sub = *it;
        //如果是本地的订阅者
      if (sub->isIntraprocess())
      {
          //在topic的订阅列表中,逐个通知订阅者
        sub->enqueueMessage(m, false, true);
      }
    }
    m.message.reset();
  }
  if (m.buf)
  {
    boost::mutex::scoped_lock lock(publish_queue_mutex_);
    publish_queue_.push_back(m);
  }
}

enqueueMessage有两个版本的实现,在本地节点与远程节点的实现不相同,本地节点直接调用本地的回调,远程节点则调用封装的网络协议发送

//实现1:本地节点的版本
void IntraProcessSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy)
{
  boost::recursive_mutex::scoped_lock lock(drop_mutex_);
  if (dropped_)
  {
    return;
  }

  ROS_ASSERT(subscriber_);
    //此处直接调用本地订阅者的回调
  subscriber_->handleMessage(m, ser, nocopy);
}
//实现2:远程节点的版本
void TransportSubscriberLink::enqueueMessage(const SerializedMessage& m, bool ser, bool nocopy)
{
  (void)nocopy;
  if (!ser)
  {
    return;
  }

  {
    boost::mutex::scoped_lock lock(outbox_mutex_);

    int max_queue = 0;
    if (PublicationPtr parent = parent_.lock())
    {
      max_queue = parent->getMaxQueue();
    }

    ROS_DEBUG_NAMED("superdebug", "TransportSubscriberLink on topic [%s] to caller [%s], queueing message (queue size [%d])", topic_.c_str(), destination_caller_id_.c_str(), (int)outbox_.size());

    if (max_queue > 0 && (int)outbox_.size() >= max_queue)
    {
      if (!queue_full_)
      {
        ROS_DEBUG("Outgoing queue full for topic [%s].  "
               "Discarding oldest message",
               topic_.c_str());
      }

      outbox_.pop(); // toss out the oldest thing in the queue to make room for us
      queue_full_ = true;
    }
    else
    {
      queue_full_ = false;
    }

    outbox_.push(m);
  }

    //此处调用connection_->write(m.buf, m.num_bytes, boost::bind(&TransportSubscriberLink::onMessageWritten, this, boost::placeholders::_1), immediate_write) 将消息发出
  startMessageWrite(false);

  stats_.messages_sent_++;
  stats_.bytes_sent_ += m.num_bytes;
  stats_.message_data_sent_ += m.num_bytes;
}

话题的订阅

话题的订阅相对简单,也是要先从nodeHander获得一个订阅者

Subscriber NodeHandle::subscribe(SubscribeOptions& ops)
{
  ops.topic = resolveName(ops.topic);
    //绑定回调或者全局回调
  if (ops.callback_queue == 0)
  {
    if (callback_queue_)
    {
      ops.callback_queue = callback_queue_;
    }
    else
    {
      ops.callback_queue = getGlobalCallbackQueue();
    }
  }

    //与发布者的操作类似,都通过TopicManager形成发布者与订阅者的绑定关系
  if (TopicManager::instance()->subscribe(ops))
  {
    Subscriber sub(ops.topic, *this, ops.helper);
    {
      boost::mutex::scoped_lock lock(collection_->mutex_);
        //在表中记录该订阅信息
      collection_->subs_.push_back(sub.impl_);
    }
    return sub;
  }
    //返回的订阅者,一般不使用
  return Subscriber();
}

TopicManager的subscribe函数实现

bool TopicManager::subscribe(const SubscribeOptions& ops)
{
  boost::mutex::scoped_lock lock(subs_mutex_);
//省略一些校验
  const std::string& md5sum = ops.md5sum;
  std::string datatype = ops.datatype;

  SubscriptionPtr s(boost::make_shared<Subscription>(ops.topic, md5sum, datatype, ops.transport_hints));
  s->addCallback(ops.helper, ops.md5sum, ops.callback_queue, ops.queue_size, ops.tracked_object, ops.allow_concurrent_callbacks);

  if (!registerSubscriber(s, ops.datatype))
  {
    ROS_WARN("couldn't register subscriber on topic [%s]", ops.topic.c_str());
    s->shutdown();
    return false;
  }
//订阅表中加入该订阅
  subscriptions_.push_back(s);
  return true;
}

订阅的流程到此结束,

  • TopicManager维护一张订阅关系表,当有新的订阅者时,加入订阅表中;
  • 当有新的发布者,则会构建发布者和订阅者的连接
  • 每次发布者发布消息,Master将消息序列化,调用订阅者的相关回调
  • 如果是本地节点,则直接调用回调函数,无需复制,如是远程节点,则调用网络发送

参考网站

ROS原理学习第一天–ROS通信机制(Topic)笔记

附录

master::execute函数的实现

本质上是把网络请求包装成函数请求,其中创建了请求头(head)和请求体(body),将获取的返回通过可读事件放入返回结构体,完成一次请求。

bool execute(const std::string& method, const XmlRpc::XmlRpcValue& request, XmlRpc::XmlRpcValue& response, XmlRpc::XmlRpcValue& payload, bool wait_for_master)
{
  ros::SteadyTime start_time = ros::SteadyTime::now();

  std::string master_host = getHost();
  uint32_t master_port = getPort();
   //做了好多花里胡哨的操作,其实就是创建一个XMLRPCClient,操作全都是由该对象负责
  XmlRpc::XmlRpcClient *c = XMLRPCManager::instance()->getXMLRPCClient(master_host, master_port, "/");
  bool printed = false;
  bool slept = false;
  bool ok = true;
  bool b = false;
  //最重要的一行,交给XMLRPCClient操作,最后返回操作是否成功,由于request和response都是以引用的方式传入,所以execute函数中能够直接填写response
  b = c->execute(method.c_str(), request, response);
  //此处忽略了一些释放XMLRPCClient和异常处理的代码
  //……
  return b;
}

XMLRPCClient::execute

bool
XmlRpcClient::execute(const char* method, XmlRpcValue const& params, XmlRpcValue& result)
{
  XmlRpcUtil::log(1, "XmlRpcClient::execute: method %s (_connectionState %s).", method, connectionStateStr(_connectionState));

  // This is not a thread-safe operation, if you want to do multithreading, use separate
  // clients for each thread. If you want to protect yourself from multiple threads
  // accessing the same client, replace this code with a real mutex.
  if (_executing)
    return false;

  _executing = true;
  ClearFlagOnExit cf(_executing);

  _sendAttempts = 0;
  _isFault = false;

  if ( ! setupConnection())
    return false;

  //将方法名和方法参数,以一定格式,填入 _request
  if ( ! generateRequest(method, params))
    return false;

  result.clear();
  double msTime = -1.0;   // Process until exit is called
  //触发一个可写事件,在回调中将准备好的_request发送网络请求,超时或得到回复后,收到一个可读事件,将返回写入_response中(这些逻辑在handleEvent函数中)
  _disp.work(msTime);

  if (_connectionState != IDLE || ! parseResponse(result)) {
    _header = "";
    return false;
  }

  // close() if server does not supports HTTP1.1
  // otherwise, reusing the socket to write leads to a SIGPIPE because
  // the remote server could shut down the corresponding socket.
  if (_header.find("HTTP/1.1 200 OK", 0, 15) != 0) {
    close();
  }

  XmlRpcUtil::log(1, "XmlRpcClient::execute: method %s completed.", method);
  _header = "";
  _response = "";
  return true;
}