源码分析——Envoy LDS
LDS是什么?
Envoy的架构使其可以使用不同类型的配置管理方法。部署中采用的方法将取决于实现者的需求。完全静态的配置可以实现简单的部署。更复杂的部署可能会逐步增加更复杂的动态配置,前提是实现者必须提供一个或多个基于外部gRPC/REST的配置提供程序API。这些API统称为“xDS”(*发现服务)。xDS包括LDS、RDS、CDS、EDS。
其中监听器发现服务(LDS)是一个可选的API,Envoy将调用它来动态获取监听器。Envoy将协调API响应,并根据需要添加、修改或删除已知的监听器。
监听器更新的语义如下:
每个监听器必须有一个独特的名字。如果没有提供名称,Envoy将创建一个UUID。要动态更新的监听器,管理服务必须提供监听器的唯一名称。
当一个监听器被添加,在参与连接处理之前,会先进入“warming”阶段。例如,如果监听器引用RDS配置,那么在监听器迁移到“active”之前,将会解析并提取该配置。
监听器一旦创建,实际上就会保持不变。因此,更新监听器时,会创建一个全新的监听器(使用相同的侦听套接字)。新增加的监听者都会通过上面所描述的相同“预热”过程。
当更新或删除监听器时,旧的监听器将被置于“draining”状态,就像整个服务重新启动时一样。监听器移除之后,该监听器所拥有的连接,经过一段时间优雅地关闭(如果可能的话)剩余的连接。逐出时间通过--drain-time-s选项设置。
注意:任何在Envoy配置中静态定义的监听器都不能通过LDS API进行修改或删除。
整体结构
LDS通过一个ListenerManager管理n个Listener,每个Listener又通过一个FilterChainManager管理n个FilterChain。下图为LDS的相关类的UML图,多数类集中声明在lds_api.h、listener_manager_impl.h和filter_chain_manager_impl.h中。
LDS相关类的UML图
Listener的三种状态
ListenerManagerImpl实例管理全部的监听器。它内部包含了三个list,它们分别代表处于三种状态的Listener。
活跃监听器(active_listeners):
活跃监听器是当前正在接受工作节点上的新连接的监听器。
半活跃监听器(warming_listeners):
半活跃监听器是可能需要通过监听器的init管理器进一步初始化的监听器。如RDS,未来KDS等。一旦一个监听器成为半活跃状态,它将转变为活跃的。
待回收监听器(draining_listeners):
待回收监听器是正在被回收和移除的监听器。他们经历了两个阶段,首先工人停止接受新的连接,现有的连接被耗尽。之后,监听器从所有工作节点中移除,并关闭所有剩余的连接。
LDS初始化过程
在初始化bootstrap配置时,如果有Lds配置会进行初始化。
void InstanceImpl::initialize(const Options& options, Network::Address::InstanceConstSharedPtr local_address, ComponentFactory& component_factory, ListenerHooks& hooks) { ...... // Instruct the listener manager to create the LDS provider if needed. This must be done later // because various items do not yet exist when the listener manager is created. if (bootstrap_.dynamic_resources().has_lds_config()) { listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config()); }
在ListenerManagerImpl类中通过ProdListenerComponentFactory配置类创建Lds。
class ProdListenerComponentFactory : public ListenerComponentFactory, Logger::Loggable<Logger::Id::config> { public: ...... // Server::ListenerComponentFactory LdsApiPtr createLdsApi(const envoy::api::v2::core::ConfigSource& lds_config) override { return std::make_unique<LdsApiImpl>( lds_config, server_.clusterManager(), server_.initManager(), server_.stats(), server_.listenerManager(), server_.messageValidationContext().dynamicValidationVisitor()); } ......
新建Lds时,会创建一个subscription。
LdsApiImpl::LdsApiImpl(const envoy::api::v2::core::ConfigSource& lds_config, Upstream::ClusterManager& cm, Init::Manager& init_manager, Stats::Scope& scope, ListenerManager& lm, ProtobufMessage::ValidationVisitor& validation_visitor) : listener_manager_(lm), scope_(scope.createScope("listener_manager.lds.")), cm_(cm), init_target_("LDS", [this]() { subscription_->start({}); }), validation_visitor_(validation_visitor) { subscription_ = cm.subscriptionFactory().subscriptionFromConfigSource( lds_config, Grpc::Common::typeUrl(envoy::api::v2::Listener().GetDescriptor()->full_name()), *scope_, *this); init_manager.add(init_target_); }
LDS更新过程
有数据下发时,subscription会调用callback触发onConfigUpdate。
// Config::GrpcMuxCallbacks void GrpcMuxSubscriptionImpl::onConfigUpdate( const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources, const std::string& version_info) { disableInitFetchTimeoutTimer(); // TODO(mattklein123): In the future if we start tracking per-resource versions, we need to // supply those versions to onConfigUpdate() along with the xDS response ("system") // version_info. This way, both types of versions can be tracked and exposed for debugging by // the configuration update targets. callbacks_.onConfigUpdate(resources, version_info); stats_.update_success_.inc(); stats_.update_attempt_.inc(); stats_.version_.set(HashUtil::xxHash64(version_info)); ENVOY_LOG(debug, "gRPC config for {} accepted with {} resources with version {}", type_url_, resources.size(), version_info); }
两参数的onConfigUpdate方法通过获取到的Listener列表和本地的Listener列表计算出要移除的Listener和要新增的Listener,最后再通过调用三参数的onConfigUpdate执行真正的更新操作。
要移除的Listener = 当前的Listener - 获取到的Listener
要新增的Listener = 获取到的Listener - 当前的Listener
void LdsApiImpl::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources, const std::string& version_info) { // We need to keep track of which listeners need to remove. // Specifically, it's [listeners we currently have] - [listeners found in the response]. std::unordered_set<std::string> listeners_to_remove; for (const auto& listener : listener_manager_.listeners()) { listeners_to_remove.insert(listener.get().name()); } Protobuf::RepeatedPtrField<envoy::api::v2::Resource> to_add_repeated; for (const auto& listener_blob : resources) { // Add this resource to our delta added/updated pile... envoy::api::v2::Resource* to_add = to_add_repeated.Add(); const std::string listener_name = MessageUtil::anyConvert<envoy::api::v2::Listener>(listener_blob, validation_visitor_) .name(); to_add->set_name(listener_name); to_add->set_version(version_info); to_add->mutable_resource()->MergeFrom(listener_blob); // ...and remove its name from our delta removed pile. listeners_to_remove.erase(listener_name); } // Copy our delta removed pile into the desired format. Protobuf::RepeatedPtrField<std::string> to_remove_repeated; for (const auto& listener : listeners_to_remove) { *to_remove_repeated.Add() = listener; } onConfigUpdate(to_add_repeated, to_remove_repeated, version_info); }
移除Listener
在添加或更新Listener之前,我们执行所有Listener删除。这允许添加与要移除的Listener具有相同地址的新Listener。
void LdsApiImpl::onConfigUpdate( const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources, const Protobuf::RepeatedPtrField<std::string>& removed_resources, const std::string& system_version_info) { cm_.adsMux().pause(Config::TypeUrl::get().RouteConfiguration); Cleanup rds_resume([this] { cm_.adsMux().resume(Config::TypeUrl::get().RouteConfiguration); }); bool any_applied = false; // We do all listener removals before adding the new listeners. This allows adding a new listener // with the same address as a listener that is to be removed. Do not change the order. for (const auto& removed_listener : removed_resources) { if (listener_manager_.removeListener(removed_listener)) { ENVOY_LOG(info, "lds: remove listener '{}'", removed_listener); any_applied = true; } } ......
移除Listener的动作的细节在ListenerManagerImpl类的removeListener方法下。如果是在“warming”阶段的Listener直接删除,如果是在活动的Listener将其置于”draining“中,过一段时间关闭。
bool ListenerManagerImpl::removeListener(const std::string& name) { ENVOY_LOG(debug, "begin remove listener: name={}", name); auto existing_active_listener = getListenerByName(active_listeners_, name); auto existing_warming_listener = getListenerByName(warming_listeners_, name); if ((existing_warming_listener == warming_listeners_.end() || (*existing_warming_listener)->blockRemove()) && (existing_active_listener == active_listeners_.end() || (*existing_active_listener)->blockRemove())) { ENVOY_LOG(debug, "unknown/locked listener '{}'. no remove", name); return false; } // Destroy a warming listener directly. if (existing_warming_listener != warming_listeners_.end()) { (*existing_warming_listener)->debugLog("removing warming listener"); warming_listeners_.erase(existing_warming_listener); } // If there is an active listener it needs to be moved to draining after workers have started, or // destroyed directly. if (existing_active_listener != active_listeners_.end()) { // Listeners in active_listeners_ are added to workers after workers start, so we drain // listeners only after this occurs. if (workers_started_) { drainListener(std::move(*existing_active_listener)); } active_listeners_.erase(existing_active_listener); } stats_.listener_removed_.inc(); updateWarmingActiveGauges(); return true; }
更新Listener
正如前面提到的,每个Listener必须有一个独一无二的名字,否则创建一个对应的uuid。
bool ListenerManagerImpl::addOrUpdateListener(const envoy::api::v2::Listener& config, const std::string& version_info, bool added_via_api) { std::string name; if (!config.name().empty()) { name = config.name(); } else { name = server_.random().uuid(); } const uint64_t hash = MessageUtil::hash(config); ENVOY_LOG(debug, "begin add/update listener: name={} hash={}", name, hash);
判断相同名称的Listener必须有相同的配置地址。
if ((existing_warming_listener != warming_listeners_.end() && (*existing_warming_listener)->blockUpdate(hash)) || (existing_active_listener != active_listeners_.end() && (*existing_active_listener)->blockUpdate(hash))) { ENVOY_LOG(debug, "duplicate/locked listener '{}'. no add/update", name); return false; }
不论是增加还是更新Listener都是直接创建,而旧的Listener将来会被“draining”。
ListenerImplPtr new_listener(new ListenerImpl( config, version_info, *this, name, added_via_api, workers_started_, hash, added_via_api ? server_.messageValidationContext().dynamicValidationVisitor() : server_.messageValidationContext().staticValidationVisitor())); ListenerImpl& new_listener_ref = *new_listener;
在新建Listener的时候,还要创建对应的FilterChainManager以及每个Filter的FilterFactory。这里是通过Filter的名字去FactoryRegistry获取,而每个对应的FilterFactory(如DubboProxyFilterConfigFactory)在之前已经被注册到FactoryRegistry了。
std::vector<Network::FilterFactoryCb> ProdListenerComponentFactory::createNetworkFilterFactoryList_( const Protobuf::RepeatedPtrField<envoy::api::v2::listener::Filter>& filters, Configuration::FactoryContext& context) { std::vector<Network::FilterFactoryCb> ret; for (ssize_t i = 0; i < filters.size(); i++) { const auto& proto_config = filters[i]; const std::string& string_name = proto_config.name(); ENVOY_LOG(debug, " filter #{}:", i); ENVOY_LOG(debug, " name: {}", string_name); const Json::ObjectSharedPtr filter_config = MessageUtil::getJsonObjectFromMessage(proto_config.config()); ENVOY_LOG(debug, " config: {}", filter_config->asJsonString()); // Now see if there is a factory that will accept the config. auto& factory = Config::Utility::getAndCheckFactory<Configuration::NamedNetworkFilterConfigFactory>( string_name); Config::Utility::validateTerminalFilters(filters[i].name(), "network", factory.isTerminalFilter(), i == filters.size() - 1); Network::FilterFactoryCb callback; if (Config::Utility::allowDeprecatedV1Config(context.runtime(), *filter_config)) { callback = factory.createFilterFactory(*filter_config->getObject("value", true), context); } else { auto message = Config::Utility::translateToFactoryConfig( proto_config, context.messageValidationVisitor(), factory); callback = factory.createFilterFactoryFromProto(*message, context); } ret.push_back(callback); } return ret; }
判断相同名称的Listener必须有相同的配置地址。
if ((existing_warming_listener != warming_listeners_.end() && *(*existing_warming_listener)->address() != *new_listener->address()) || (existing_active_listener != active_listeners_.end() && *(*existing_active_listener)->address() != *new_listener->address())) { const std::string message = fmt::format( "error updating listener: '{}' has a different address '{}' from existing listener", name, new_listener->address()->asString()); ENVOY_LOG(warn, "{}", message); throw EnvoyException(message); }
那么接下来分为三种情况:
新创建的Listener在warming_listeners列表中
新创建的Listener在active_listeners列表中
新创建的Listener不在上述两个列表中
前两种情况我们进行相应的更新Listener操作,而第三种情况将进行新增Listener操作。
bool added = false; if (existing_warming_listener != warming_listeners_.end()) { // 1.在warming_listeners列表中 } else if (existing_active_listener != active_listeners_.end()) { // 2.在active_listeners列表中 } else { // 3.新增Listener added = true; }
如果更新的Listener在“warming”阶段的Listener中,直接内部替换。
if (existing_warming_listener != warming_listeners_.end()) { // In this case we can just replace inline. ASSERT(workers_started_); new_listener->debugLog("update warming listener"); new_listener->setSocket((*existing_warming_listener)->getSocket()); *existing_warming_listener = std::move(new_listener);
如果更新的Listener在活动中的Listener中。当前工作线程已经启动的话,加入“预热”阶段的Listener。当前工作线程未启动状态,直接内部替换。
此处打印日志:add warming listener
} else if (existing_active_listener != active_listeners_.end()) { // In this case we have no warming listener, so what we do depends on whether workers // have been started or not. Either way we get the socket from the existing listener. new_listener->setSocket((*existing_active_listener)->getSocket()); if (workers_started_) { new_listener->debugLog("add warming listener"); warming_listeners_.emplace_back(std::move(new_listener)); } else { new_listener->debugLog("update active listener"); *existing_active_listener = std::move(new_listener); } }
当前Listener处于active状态时的变化图1
在addOrUpdateListeneraddOrUpdateListener方法的最后才对新的Listener执行initialize操作。
...... new_listener_ref.initialize(); return true; }
init操作实际上就是执行onListenerWarmed方法,在Listener被创建的时候提前把此方法注册到了Init_watcher内部。
ListenerImpl::ListenerImpl(const envoy::api::v2::Listener& config, const std::string& version_info, ListenerManagerImpl& parent, const std::string& name, bool added_via_api, bool workers_started, uint64_t hash, ProtobufMessage::ValidationVisitor& validation_visitor) : parent_(parent), address_(Network::Address::resolveProtoAddress(config.address())), ...... dynamic_init_manager_(fmt::format("Listener {}", name)), init_watcher_(std::make_unique<Init::WatcherImpl>( "ListenerImpl", [this] { parent_.onListenerWarmed(*this); })), ......
执行onListenerWarmed方法的时候把warming列表内的Listener更新到active状态,把active状态的对应Listener更新到draining状态。
void ListenerManagerImpl::onListenerWarmed(ListenerImpl& listener) { // The warmed listener should be added first so that the worker will accept new connections // when it stops listening on the old listener. for (const auto& worker : workers_) { addListenerToWorker(*worker, listener); } auto existing_active_listener = getListenerByName(active_listeners_, listener.name()); auto existing_warming_listener = getListenerByName(warming_listeners_, listener.name()); (*existing_warming_listener)->debugLog("warm complete. updating active listener"); if (existing_active_listener != active_listeners_.end()) { drainListener(std::move(*existing_active_listener)); *existing_active_listener = std::move(*existing_warming_listener); } else { active_listeners_.emplace_back(std::move(*existing_warming_listener)); } warming_listeners_.erase(existing_warming_listener); updateWarmingActiveGauges(); } uint64_t ListenerManagerImpl::numConnections() { uint64_t num_connections = 0; for (const auto& worker : workers_) { num_connections += worker->numConnections(); } return num_connections; }
当前Listener处于active状态时的变化图2
(removing listener部分未完待续)
当前Listener处于active状态时的变化图3
新增Listener
新增Listener时,会先去“draining”状态的Listener中查找是否已有相同地址的Listener,有相同的重新拉回来,并根据当前工作线程是否已就绪加入对应的Listener。
// Typically we catch address issues when we try to bind to the same address multiple times. // However, for listeners that do not bind we must check to make sure we are not duplicating. // This is an edge case and nothing will explicitly break, but there is no possibility that // two listeners that do not bind will ever be used. Only the first one will be used when // searched for by address. Thus we block it. if (!new_listener->bindToPort() && (hasListenerWithAddress(warming_listeners_, *new_listener->address()) || hasListenerWithAddress(active_listeners_, *new_listener->address()))) { const std::string message = fmt::format("error adding listener: '{}' has duplicate address '{}' as existing listener", name, new_listener->address()->asString()); ENVOY_LOG(warn, "{}", message); throw EnvoyException(message); } // We have no warming or active listener so we need to make a new one. What we do depends on // whether workers have been started or not. Additionally, search through draining listeners // to see if there is a listener that has a socket bound to the address we are configured for. // This is an edge case, but may happen if a listener is removed and then added back with a same // or different name and intended to listen on the same address. This should work and not fail. Network::SocketSharedPtr draining_listener_socket; auto existing_draining_listener = std::find_if( draining_listeners_.cbegin(), draining_listeners_.cend(), [&new_listener](const DrainingListener& listener) { return *new_listener->address() == *listener.listener_->socket().localAddress(); }); if (existing_draining_listener != draining_listeners_.cend()) { draining_listener_socket = existing_draining_listener->listener_->getSocket(); } new_listener->setSocket(draining_listener_socket ? draining_listener_socket : factory_.createListenSocket(new_listener->address(), new_listener->socketType(), new_listener->listenSocketOptions(), new_listener->bindToPort())); if (workers_started_) { new_listener->debugLog("add warming listener"); warming_listeners_.emplace_back(std::move(new_listener)); } else { new_listener->debugLog("add active listener"); active_listeners_.emplace_back(std::move(new_listener)); } added = true;
一些思考
在新旧Listener进行替换时,已经创建的tcp连接如何优雅地重新建立连接?
参数--drain-time-s的设置有什么意义?
- 点赞
- 收藏
- 关注作者
评论(0)