源码分析——Envoy LDS

举报
popoman 发表于 2020/01/03 15:37:45 2020/01/03
【摘要】 监听器发现服务(LDS)是一个可选的 API,Envoy 将调用它来动态获取监听器。Envoy 将协调 API 响应,并根据需要添加、修改或删除已知的监听器。

LDS是什么?

Envoy的架构使其可以使用不同类型的配置管理方法。部署中采用的方法将取决于实现者的需求。完全静态的配置可以实现简单的部署。更复杂的部署可能会逐步增加更复杂的动态配置,前提是实现者必须提供一个或多个基于外部gRPC/REST的配置提供程序API。这些API统称为“xDS”(*发现服务)。xDS包括LDS、RDS、CDS、EDS。

其中监听器发现服务(LDS)是一个可选的API,Envoy将调用它来动态获取监听器。Envoy将协调API响应,并根据需要添加、修改或删除已知的监听器。

监听器更新的语义如下:

  • 每个监听器必须有一个独特的名字。如果没有提供名称,Envoy将创建一个UUID。要动态更新的监听器,管理服务必须提供监听器的唯一名称。

  • 当一个监听器被添加,在参与连接处理之前,会先进入“warming”阶段。例如,如果监听器引用RDS配置,那么在监听器迁移到“active”之前,将会解析并提取该配置。

  • 监听器一旦创建,实际上就会保持不变。因此,更新监听器时,会创建一个全新的监听器(使用相同的侦听套接字)。新增加的监听者都会通过上面所描述的相同“预热”过程。

  • 当更新或删除监听器时,旧的监听器将被置于“draining”状态,就像整个服务重新启动时一样。监听器移除之后,该监听器所拥有的连接,经过一段时间优雅地关闭(如果可能的话)剩余的连接。逐出时间通过--drain-time-s选项设置。

注意:任何在Envoy配置中静态定义的监听器都不能通过LDS API进行修改或删除。

整体结构

LDS通过一个ListenerManager管理n个Listener,每个Listener又通过一个FilterChainManager管理n个FilterChain。下图为LDS的相关类的UML图,多数类集中声明在lds_api.h、listener_manager_impl.h和filter_chain_manager_impl.h中。

1578384758710302.jpg

LDS相关类的UML图

Listener的三种状态

ListenerManagerImpl实例管理全部的监听器。它内部包含了三个list,它们分别代表处于三种状态的Listener。

活跃监听器(active_listeners):

    活跃监听器是当前正在接受工作节点上的新连接的监听器。

半活跃监听器(warming_listeners):

    半活跃监听器是可能需要通过监听器的init管理器进一步初始化的监听器。如RDS,未来KDS等。一旦一个监听器成为半活跃状态,它将转变为活跃的。

待回收监听器(draining_listeners):

    待回收监听器是正在被回收和移除的监听器。他们经历了两个阶段,首先工人停止接受新的连接,现有的连接被耗尽。之后,监听器从所有工作节点中移除,并关闭所有剩余的连接。

LDS初始化过程

在初始化bootstrap配置时,如果有Lds配置会进行初始化。

void InstanceImpl::initialize(const Options& options,
                              Network::Address::InstanceConstSharedPtr local_address,
                              ComponentFactory& component_factory, ListenerHooks& hooks) {
......
// Instruct the listener manager to create the LDS provider if needed. This must be done later
// because various items do not yet exist when the listener manager is created.
if (bootstrap_.dynamic_resources().has_lds_config()) {
listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config());
}

在ListenerManagerImpl类中通过ProdListenerComponentFactory配置类创建Lds。

class ProdListenerComponentFactory : public ListenerComponentFactory,
                                     Logger::Loggable<Logger::Id::config> {
public:
......
  // Server::ListenerComponentFactory
  LdsApiPtr createLdsApi(const envoy::api::v2::core::ConfigSource& lds_config) override {
    return std::make_unique<LdsApiImpl>(
        lds_config, server_.clusterManager(), server_.initManager(), server_.stats(),
        server_.listenerManager(), server_.messageValidationContext().dynamicValidationVisitor());
  }
......

新建Lds时,会创建一个subscription。

LdsApiImpl::LdsApiImpl(const envoy::api::v2::core::ConfigSource& lds_config,
                       Upstream::ClusterManager& cm, Init::Manager& init_manager,
                       Stats::Scope& scope, ListenerManager& lm,
                       ProtobufMessage::ValidationVisitor& validation_visitor)
    : listener_manager_(lm), scope_(scope.createScope("listener_manager.lds.")), cm_(cm),
      init_target_("LDS", [this]() { subscription_->start({}); }),
      validation_visitor_(validation_visitor) {
  subscription_ = cm.subscriptionFactory().subscriptionFromConfigSource(
      lds_config, Grpc::Common::typeUrl(envoy::api::v2::Listener().GetDescriptor()->full_name()),
      *scope_, *this);
  init_manager.add(init_target_);
}

LDS更新过程

有数据下发时,subscription会调用callback触发onConfigUpdate。

// Config::GrpcMuxCallbacks
void GrpcMuxSubscriptionImpl::onConfigUpdate(
    const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
    const std::string& version_info) {
  disableInitFetchTimeoutTimer();
  // TODO(mattklein123): In the future if we start tracking per-resource versions, we need to
  // supply those versions to onConfigUpdate() along with the xDS response ("system")
  // version_info. This way, both types of versions can be tracked and exposed for debugging by
  // the configuration update targets.
  callbacks_.onConfigUpdate(resources, version_info);
  stats_.update_success_.inc();
  stats_.update_attempt_.inc();
  stats_.version_.set(HashUtil::xxHash64(version_info));
  ENVOY_LOG(debug, "gRPC config for {} accepted with {} resources with version {}", type_url_,
            resources.size(), version_info);
}

两参数的onConfigUpdate方法通过获取到的Listener列表和本地的Listener列表计算出要移除的Listener和要新增的Listener,最后再通过调用三参数的onConfigUpdate执行真正的更新操作。

  • 要移除的Listener = 当前的Listener - 获取到的Listener

  • 要新增的Listener = 获取到的Listener - 当前的Listener

void LdsApiImpl::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
                                const std::string& version_info) {
  // We need to keep track of which listeners need to remove.
  // Specifically, it's [listeners we currently have] - [listeners found in the response].
  std::unordered_set<std::string> listeners_to_remove;
  for (const auto& listener : listener_manager_.listeners()) {
    listeners_to_remove.insert(listener.get().name());
  }

  Protobuf::RepeatedPtrField<envoy::api::v2::Resource> to_add_repeated;
  for (const auto& listener_blob : resources) {
    // Add this resource to our delta added/updated pile...
    envoy::api::v2::Resource* to_add = to_add_repeated.Add();
    const std::string listener_name =
        MessageUtil::anyConvert<envoy::api::v2::Listener>(listener_blob, validation_visitor_)
            .name();
    to_add->set_name(listener_name);
    to_add->set_version(version_info);
    to_add->mutable_resource()->MergeFrom(listener_blob);
    // ...and remove its name from our delta removed pile.
    listeners_to_remove.erase(listener_name);
  }

  // Copy our delta removed pile into the desired format.
  Protobuf::RepeatedPtrField<std::string> to_remove_repeated;
  for (const auto& listener : listeners_to_remove) {
    *to_remove_repeated.Add() = listener;
  }
  onConfigUpdate(to_add_repeated, to_remove_repeated, version_info);
}

移除Listener

在添加或更新Listener之前,我们执行所有Listener删除。这允许添加与要移除的Listener具有相同地址的新Listener。

void LdsApiImpl::onConfigUpdate(
    const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources,
    const Protobuf::RepeatedPtrField<std::string>& removed_resources,
    const std::string& system_version_info) {
  cm_.adsMux().pause(Config::TypeUrl::get().RouteConfiguration);
  Cleanup rds_resume([this] { cm_.adsMux().resume(Config::TypeUrl::get().RouteConfiguration); });

  bool any_applied = false;
  // We do all listener removals before adding the new listeners. This allows adding a new listener
  // with the same address as a listener that is to be removed. Do not change the order.
  for (const auto& removed_listener : removed_resources) {
    if (listener_manager_.removeListener(removed_listener)) {
      ENVOY_LOG(info, "lds: remove listener '{}'", removed_listener);
      any_applied = true;
    }
  }
......

移除Listener的动作的细节在ListenerManagerImpl类的removeListener方法下。如果是在“warming”阶段的Listener直接删除,如果是在活动的Listener将其置于”draining“中,过一段时间关闭。

bool ListenerManagerImpl::removeListener(const std::string& name) {
  ENVOY_LOG(debug, "begin remove listener: name={}", name);

  auto existing_active_listener = getListenerByName(active_listeners_, name);
  auto existing_warming_listener = getListenerByName(warming_listeners_, name);
  if ((existing_warming_listener == warming_listeners_.end() ||
       (*existing_warming_listener)->blockRemove()) &&
      (existing_active_listener == active_listeners_.end() ||
       (*existing_active_listener)->blockRemove())) {
    ENVOY_LOG(debug, "unknown/locked listener '{}'. no remove", name);
    return false;
  }

  // Destroy a warming listener directly.
  if (existing_warming_listener != warming_listeners_.end()) {
    (*existing_warming_listener)->debugLog("removing warming listener");
    warming_listeners_.erase(existing_warming_listener);
  }

  // If there is an active listener it needs to be moved to draining after workers have started, or
  // destroyed directly.
  if (existing_active_listener != active_listeners_.end()) {
    // Listeners in active_listeners_ are added to workers after workers start, so we drain
    // listeners only after this occurs.
    if (workers_started_) {
      drainListener(std::move(*existing_active_listener));
    }
    active_listeners_.erase(existing_active_listener);
  }

  stats_.listener_removed_.inc();
  updateWarmingActiveGauges();
  return true;
}

更新Listener

正如前面提到的,每个Listener必须有一个独一无二的名字,否则创建一个对应的uuid。

bool ListenerManagerImpl::addOrUpdateListener(const envoy::api::v2::Listener& config,
                                              const std::string& version_info, bool added_via_api) {
  std::string name;
  if (!config.name().empty()) {
    name = config.name();
  } else {
    name = server_.random().uuid();
  }
  const uint64_t hash = MessageUtil::hash(config);
  ENVOY_LOG(debug, "begin add/update listener: name={} hash={}", name, hash);

判断相同名称的Listener必须有相同的配置地址。

  if ((existing_warming_listener != warming_listeners_.end() &&
     (*existing_warming_listener)->blockUpdate(hash)) ||
    (existing_active_listener != active_listeners_.end() &&
     (*existing_active_listener)->blockUpdate(hash))) {
  ENVOY_LOG(debug, "duplicate/locked listener '{}'. no add/update", name);
  return false;
  }

不论是增加还是更新Listener都是直接创建,而旧的Listener将来会被“draining”。

  ListenerImplPtr new_listener(new ListenerImpl(
      config, version_info, *this, name, added_via_api, workers_started_, hash,
      added_via_api ? server_.messageValidationContext().dynamicValidationVisitor()
                    : server_.messageValidationContext().staticValidationVisitor()));
  ListenerImpl& new_listener_ref = *new_listener;

在新建Listener的时候,还要创建对应的FilterChainManager以及每个Filter的FilterFactory。这里是通过Filter的名字去FactoryRegistry获取,而每个对应的FilterFactory(如DubboProxyFilterConfigFactory)在之前已经被注册到FactoryRegistry了。

std::vector<Network::FilterFactoryCb> ProdListenerComponentFactory::createNetworkFilterFactoryList_(
    const Protobuf::RepeatedPtrField<envoy::api::v2::listener::Filter>& filters,
    Configuration::FactoryContext& context) {
  std::vector<Network::FilterFactoryCb> ret;
  for (ssize_t i = 0; i < filters.size(); i++) {
    const auto& proto_config = filters[i];
    const std::string& string_name = proto_config.name();
    ENVOY_LOG(debug, "  filter #{}:", i);
    ENVOY_LOG(debug, "    name: {}", string_name);
    const Json::ObjectSharedPtr filter_config =
        MessageUtil::getJsonObjectFromMessage(proto_config.config());
    ENVOY_LOG(debug, "  config: {}", filter_config->asJsonString());

    // Now see if there is a factory that will accept the config.
    auto& factory =
        Config::Utility::getAndCheckFactory<Configuration::NamedNetworkFilterConfigFactory>(
            string_name);

    Config::Utility::validateTerminalFilters(filters[i].name(), "network",
                                             factory.isTerminalFilter(), i == filters.size() - 1);

    Network::FilterFactoryCb callback;
    if (Config::Utility::allowDeprecatedV1Config(context.runtime(), *filter_config)) {
      callback = factory.createFilterFactory(*filter_config->getObject("value", true), context);
    } else {
      auto message = Config::Utility::translateToFactoryConfig(
          proto_config, context.messageValidationVisitor(), factory);
      callback = factory.createFilterFactoryFromProto(*message, context);
    }
    ret.push_back(callback);
  }
  return ret;
}

判断相同名称的Listener必须有相同的配置地址。

  if ((existing_warming_listener != warming_listeners_.end() &&
       *(*existing_warming_listener)->address() != *new_listener->address()) ||
      (existing_active_listener != active_listeners_.end() &&
       *(*existing_active_listener)->address() != *new_listener->address())) {
    const std::string message = fmt::format(
        "error updating listener: '{}' has a different address '{}' from existing listener", name,
        new_listener->address()->asString());
    ENVOY_LOG(warn, "{}", message);
    throw EnvoyException(message);
  }

那么接下来分为三种情况:

  1. 新创建的Listener在warming_listeners列表中

  2. 新创建的Listener在active_listeners列表中

  3. 新创建的Listener不在上述两个列表中

前两种情况我们进行相应的更新Listener操作,而第三种情况将进行新增Listener操作。

  bool added = false;
  if (existing_warming_listener != warming_listeners_.end()) {
    // 1.在warming_listeners列表中
  } else if (existing_active_listener != active_listeners_.end()) {
    // 2.在active_listeners列表中
  } else {
    // 3.新增Listener
    added = true;
  }

如果更新的Listener在“warming”阶段的Listener中,直接内部替换。

  if (existing_warming_listener != warming_listeners_.end()) {
    // In this case we can just replace inline.
    ASSERT(workers_started_);
    new_listener->debugLog("update warming listener");
    new_listener->setSocket((*existing_warming_listener)->getSocket());
    *existing_warming_listener = std::move(new_listener);

如果更新的Listener在活动中的Listener中。当前工作线程已经启动的话,加入“预热”阶段的Listener。当前工作线程未启动状态,直接内部替换。

此处打印日志:add warming listener
  } else if (existing_active_listener != active_listeners_.end()) {
    // In this case we have no warming listener, so what we do depends on whether workers
    // have been started or not. Either way we get the socket from the existing listener.
    new_listener->setSocket((*existing_active_listener)->getSocket());
    if (workers_started_) {
      new_listener->debugLog("add warming listener");
      warming_listeners_.emplace_back(std::move(new_listener));
    } else {
      new_listener->debugLog("update active listener");
      *existing_active_listener = std::move(new_listener);
    }
  }

1578453896298090.jpg

当前Listener处于active状态时的变化图1


在addOrUpdateListeneraddOrUpdateListener方法的最后才对新的Listener执行initialize操作。

......
  new_listener_ref.initialize();
  return true;
}

init操作实际上就是执行onListenerWarmed方法,在Listener被创建的时候提前把此方法注册到了Init_watcher内部。

ListenerImpl::ListenerImpl(const envoy::api::v2::Listener& config, const std::string& version_info,
                           ListenerManagerImpl& parent, const std::string& name, bool added_via_api,
                           bool workers_started, uint64_t hash,
                           ProtobufMessage::ValidationVisitor& validation_visitor)
    : parent_(parent), address_(Network::Address::resolveProtoAddress(config.address())),
......
      dynamic_init_manager_(fmt::format("Listener {}", name)),
      init_watcher_(std::make_unique<Init::WatcherImpl>(
          "ListenerImpl", [this] { parent_.onListenerWarmed(*this); })),
......

执行onListenerWarmed方法的时候把warming列表内的Listener更新到active状态,把active状态的对应Listener更新到draining状态。

void ListenerManagerImpl::onListenerWarmed(ListenerImpl& listener) {
  // The warmed listener should be added first so that the worker will accept new connections
  // when it stops listening on the old listener.
  for (const auto& worker : workers_) {
    addListenerToWorker(*worker, listener);
  }

  auto existing_active_listener = getListenerByName(active_listeners_, listener.name());
  auto existing_warming_listener = getListenerByName(warming_listeners_, listener.name());
  (*existing_warming_listener)->debugLog("warm complete. updating active listener");
  if (existing_active_listener != active_listeners_.end()) {
    drainListener(std::move(*existing_active_listener));
    *existing_active_listener = std::move(*existing_warming_listener);
  } else {
    active_listeners_.emplace_back(std::move(*existing_warming_listener));
  }

  warming_listeners_.erase(existing_warming_listener);
  updateWarmingActiveGauges();
}

uint64_t ListenerManagerImpl::numConnections() {
  uint64_t num_connections = 0;
  for (const auto& worker : workers_) {
    num_connections += worker->numConnections();
  }

  return num_connections;
}

1578453873368518.jpg

当前Listener处于active状态时的变化图2

(removing listener部分未完待续)

1578447514262092.jpg

当前Listener处于active状态时的变化图3

新增Listener

新增Listener时,会先去“draining”状态的Listener中查找是否已有相同地址的Listener,有相同的重新拉回来,并根据当前工作线程是否已就绪加入对应的Listener。

    // Typically we catch address issues when we try to bind to the same address multiple times.
    // However, for listeners that do not bind we must check to make sure we are not duplicating.
    // This is an edge case and nothing will explicitly break, but there is no possibility that
    // two listeners that do not bind will ever be used. Only the first one will be used when
    // searched for by address. Thus we block it.
    if (!new_listener->bindToPort() &&
        (hasListenerWithAddress(warming_listeners_, *new_listener->address()) ||
         hasListenerWithAddress(active_listeners_, *new_listener->address()))) {
      const std::string message =
          fmt::format("error adding listener: '{}' has duplicate address '{}' as existing listener",
                      name, new_listener->address()->asString());
      ENVOY_LOG(warn, "{}", message);
      throw EnvoyException(message);
    }

    // We have no warming or active listener so we need to make a new one. What we do depends on
    // whether workers have been started or not. Additionally, search through draining listeners
    // to see if there is a listener that has a socket bound to the address we are configured for.
    // This is an edge case, but may happen if a listener is removed and then added back with a same
    // or different name and intended to listen on the same address. This should work and not fail.
    Network::SocketSharedPtr draining_listener_socket;
    auto existing_draining_listener = std::find_if(
        draining_listeners_.cbegin(), draining_listeners_.cend(),
        [&new_listener](const DrainingListener& listener) {
          return *new_listener->address() == *listener.listener_->socket().localAddress();
        });
    if (existing_draining_listener != draining_listeners_.cend()) {
      draining_listener_socket = existing_draining_listener->listener_->getSocket();
    }

    new_listener->setSocket(draining_listener_socket
                                ? draining_listener_socket
                                : factory_.createListenSocket(new_listener->address(),
                                                              new_listener->socketType(),
                                                              new_listener->listenSocketOptions(),
                                                              new_listener->bindToPort()));
    if (workers_started_) {
      new_listener->debugLog("add warming listener");
      warming_listeners_.emplace_back(std::move(new_listener));
    } else {
      new_listener->debugLog("add active listener");
      active_listeners_.emplace_back(std::move(new_listener));
    }

    added = true;

一些思考

  1. 在新旧Listener进行替换时,已经创建的tcp连接如何优雅地重新建立连接?

  2. 参数--drain-time-s的设置有什么意义?

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。