源码分析——Envoy LDS
【摘要】 监听器发现服务(LDS)是一个可选的 API,Envoy 将调用它来动态获取监听器。Envoy 将协调 API 响应,并根据需要添加、修改或删除已知的监听器。
LDS是什么?
Envoy的架构使其可以使用不同类型的配置管理方法。部署中采用的方法将取决于实现者的需求。完全静态的配置可以实现简单的部署。更复杂的部署可能会逐步增加更复杂的动态配置,前提是实现者必须提供一个或多个基于外部gRPC/REST的配置提供程序API。这些API统称为“xDS”(*发现服务)。xDS包括LDS、RDS、CDS、EDS。
其中监听器发现服务(LDS)是一个可选的API,Envoy将调用它来动态获取监听器。Envoy将协调API响应,并根据需要添加、修改或删除已知的监听器。
监听器更新的语义如下:
每个监听器必须有一个独特的名字。如果没有提供名称,Envoy将创建一个UUID。要动态更新的监听器,管理服务必须提供监听器的唯一名称。
当一个监听器被添加,在参与连接处理之前,会先进入“warming”阶段。例如,如果监听器引用RDS配置,那么在监听器迁移到“active”之前,将会解析并提取该配置。
监听器一旦创建,实际上就会保持不变。因此,更新监听器时,会创建一个全新的监听器(使用相同的侦听套接字)。新增加的监听者都会通过上面所描述的相同“预热”过程。
当更新或删除监听器时,旧的监听器将被置于“draining”状态,就像整个服务重新启动时一样。监听器移除之后,该监听器所拥有的连接,经过一段时间优雅地关闭(如果可能的话)剩余的连接。逐出时间通过--drain-time-s选项设置。
注意:任何在Envoy配置中静态定义的监听器都不能通过LDS API进行修改或删除。
整体结构
LDS通过一个ListenerManager管理n个Listener,每个Listener又通过一个FilterChainManager管理n个FilterChain。下图为LDS的相关类的UML图,多数类集中声明在lds_api.h、listener_manager_impl.h和filter_chain_manager_impl.h中。
LDS相关类的UML图
Listener的三种状态
ListenerManagerImpl实例管理全部的监听器。它内部包含了三个list,它们分别代表处于三种状态的Listener。
活跃监听器(active_listeners):
活跃监听器是当前正在接受工作节点上的新连接的监听器。
半活跃监听器(warming_listeners):
半活跃监听器是可能需要通过监听器的init管理器进一步初始化的监听器。如RDS,未来KDS等。一旦一个监听器成为半活跃状态,它将转变为活跃的。
待回收监听器(draining_listeners):
待回收监听器是正在被回收和移除的监听器。他们经历了两个阶段,首先工人停止接受新的连接,现有的连接被耗尽。之后,监听器从所有工作节点中移除,并关闭所有剩余的连接。
LDS初始化过程
在初始化bootstrap配置时,如果有Lds配置会进行初始化。
void InstanceImpl::initialize(const Options& options,
Network::Address::InstanceConstSharedPtr local_address,
ComponentFactory& component_factory, ListenerHooks& hooks) {
......
// Instruct the listener manager to create the LDS provider if needed. This must be done later
// because various items do not yet exist when the listener manager is created.
if (bootstrap_.dynamic_resources().has_lds_config()) {
listener_manager_->createLdsApi(bootstrap_.dynamic_resources().lds_config());
}
在ListenerManagerImpl类中通过ProdListenerComponentFactory配置类创建Lds。
class ProdListenerComponentFactory : public ListenerComponentFactory,
Logger::Loggable<Logger::Id::config> {
public:
......
// Server::ListenerComponentFactory
LdsApiPtr createLdsApi(const envoy::api::v2::core::ConfigSource& lds_config) override {
return std::make_unique<LdsApiImpl>(
lds_config, server_.clusterManager(), server_.initManager(), server_.stats(),
server_.listenerManager(), server_.messageValidationContext().dynamicValidationVisitor());
}
......
新建Lds时,会创建一个subscription。
LdsApiImpl::LdsApiImpl(const envoy::api::v2::core::ConfigSource& lds_config,
Upstream::ClusterManager& cm, Init::Manager& init_manager,
Stats::Scope& scope, ListenerManager& lm,
ProtobufMessage::ValidationVisitor& validation_visitor)
: listener_manager_(lm), scope_(scope.createScope("listener_manager.lds.")), cm_(cm),
init_target_("LDS", [this]() { subscription_->start({}); }),
validation_visitor_(validation_visitor) {
subscription_ = cm.subscriptionFactory().subscriptionFromConfigSource(
lds_config, Grpc::Common::typeUrl(envoy::api::v2::Listener().GetDescriptor()->full_name()),
*scope_, *this);
init_manager.add(init_target_);
}
LDS更新过程
有数据下发时,subscription会调用callback触发onConfigUpdate。
// Config::GrpcMuxCallbacks
void GrpcMuxSubscriptionImpl::onConfigUpdate(
const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) {
disableInitFetchTimeoutTimer();
// TODO(mattklein123): In the future if we start tracking per-resource versions, we need to
// supply those versions to onConfigUpdate() along with the xDS response ("system")
// version_info. This way, both types of versions can be tracked and exposed for debugging by
// the configuration update targets.
callbacks_.onConfigUpdate(resources, version_info);
stats_.update_success_.inc();
stats_.update_attempt_.inc();
stats_.version_.set(HashUtil::xxHash64(version_info));
ENVOY_LOG(debug, "gRPC config for {} accepted with {} resources with version {}", type_url_,
resources.size(), version_info);
}
两参数的onConfigUpdate方法通过获取到的Listener列表和本地的Listener列表计算出要移除的Listener和要新增的Listener,最后再通过调用三参数的onConfigUpdate执行真正的更新操作。
要移除的Listener = 当前的Listener - 获取到的Listener
要新增的Listener = 获取到的Listener - 当前的Listener
void LdsApiImpl::onConfigUpdate(const Protobuf::RepeatedPtrField<ProtobufWkt::Any>& resources,
const std::string& version_info) {
// We need to keep track of which listeners need to remove.
// Specifically, it's [listeners we currently have] - [listeners found in the response].
std::unordered_set<std::string> listeners_to_remove;
for (const auto& listener : listener_manager_.listeners()) {
listeners_to_remove.insert(listener.get().name());
}
Protobuf::RepeatedPtrField<envoy::api::v2::Resource> to_add_repeated;
for (const auto& listener_blob : resources) {
// Add this resource to our delta added/updated pile...
envoy::api::v2::Resource* to_add = to_add_repeated.Add();
const std::string listener_name =
MessageUtil::anyConvert<envoy::api::v2::Listener>(listener_blob, validation_visitor_)
.name();
to_add->set_name(listener_name);
to_add->set_version(version_info);
to_add->mutable_resource()->MergeFrom(listener_blob);
// ...and remove its name from our delta removed pile.
listeners_to_remove.erase(listener_name);
}
// Copy our delta removed pile into the desired format.
Protobuf::RepeatedPtrField<std::string> to_remove_repeated;
for (const auto& listener : listeners_to_remove) {
*to_remove_repeated.Add() = listener;
}
onConfigUpdate(to_add_repeated, to_remove_repeated, version_info);
}
移除Listener
在添加或更新Listener之前,我们执行所有Listener删除。这允许添加与要移除的Listener具有相同地址的新Listener。
void LdsApiImpl::onConfigUpdate(
const Protobuf::RepeatedPtrField<envoy::api::v2::Resource>& added_resources,
const Protobuf::RepeatedPtrField<std::string>& removed_resources,
const std::string& system_version_info) {
cm_.adsMux().pause(Config::TypeUrl::get().RouteConfiguration);
Cleanup rds_resume([this] { cm_.adsMux().resume(Config::TypeUrl::get().RouteConfiguration); });
bool any_applied = false;
// We do all listener removals before adding the new listeners. This allows adding a new listener
// with the same address as a listener that is to be removed. Do not change the order.
for (const auto& removed_listener : removed_resources) {
if (listener_manager_.removeListener(removed_listener)) {
ENVOY_LOG(info, "lds: remove listener '{}'", removed_listener);
any_applied = true;
}
}
......
移除Listener的动作的细节在ListenerManagerImpl类的removeListener方法下。如果是在“warming”阶段的Listener直接删除,如果是在活动的Listener将其置于”draining“中,过一段时间关闭。
bool ListenerManagerImpl::removeListener(const std::string& name) {
ENVOY_LOG(debug, "begin remove listener: name={}", name);
auto existing_active_listener = getListenerByName(active_listeners_, name);
auto existing_warming_listener = getListenerByName(warming_listeners_, name);
if ((existing_warming_listener == warming_listeners_.end() ||
(*existing_warming_listener)->blockRemove()) &&
(existing_active_listener == active_listeners_.end() ||
(*existing_active_listener)->blockRemove())) {
ENVOY_LOG(debug, "unknown/locked listener '{}'. no remove", name);
return false;
}
// Destroy a warming listener directly.
if (existing_warming_listener != warming_listeners_.end()) {
(*existing_warming_listener)->debugLog("removing warming listener");
warming_listeners_.erase(existing_warming_listener);
}
// If there is an active listener it needs to be moved to draining after workers have started, or
// destroyed directly.
if (existing_active_listener != active_listeners_.end()) {
// Listeners in active_listeners_ are added to workers after workers start, so we drain
// listeners only after this occurs.
if (workers_started_) {
drainListener(std::move(*existing_active_listener));
}
active_listeners_.erase(existing_active_listener);
}
stats_.listener_removed_.inc();
updateWarmingActiveGauges();
return true;
}
更新Listener
正如前面提到的,每个Listener必须有一个独一无二的名字,否则创建一个对应的uuid。
bool ListenerManagerImpl::addOrUpdateListener(const envoy::api::v2::Listener& config,
const std::string& version_info, bool added_via_api) {
std::string name;
if (!config.name().empty()) {
name = config.name();
} else {
name = server_.random().uuid();
}
const uint64_t hash = MessageUtil::hash(config);
ENVOY_LOG(debug, "begin add/update listener: name={} hash={}", name, hash);
判断相同名称的Listener必须有相同的配置地址。
if ((existing_warming_listener != warming_listeners_.end() &&
(*existing_warming_listener)->blockUpdate(hash)) ||
(existing_active_listener != active_listeners_.end() &&
(*existing_active_listener)->blockUpdate(hash))) {
ENVOY_LOG(debug, "duplicate/locked listener '{}'. no add/update", name);
return false;
}
不论是增加还是更新Listener都是直接创建,而旧的Listener将来会被“draining”。
ListenerImplPtr new_listener(new ListenerImpl(
config, version_info, *this, name, added_via_api, workers_started_, hash,
added_via_api ? server_.messageValidationContext().dynamicValidationVisitor()
: server_.messageValidationContext().staticValidationVisitor()));
ListenerImpl& new_listener_ref = *new_listener;
在新建Listener的时候,还要创建对应的FilterChainManager以及每个Filter的FilterFactory。这里是通过Filter的名字去FactoryRegistry获取,而每个对应的FilterFactory(如DubboProxyFilterConfigFactory)在之前已经被注册到FactoryRegistry了。
std::vector<Network::FilterFactoryCb> ProdListenerComponentFactory::createNetworkFilterFactoryList_(
const Protobuf::RepeatedPtrField<envoy::api::v2::listener::Filter>& filters,
Configuration::FactoryContext& context) {
std::vector<Network::FilterFactoryCb> ret;
for (ssize_t i = 0; i < filters.size(); i++) {
const auto& proto_config = filters[i];
const std::string& string_name = proto_config.name();
ENVOY_LOG(debug, " filter #{}:", i);
ENVOY_LOG(debug, " name: {}", string_name);
const Json::ObjectSharedPtr filter_config =
MessageUtil::getJsonObjectFromMessage(proto_config.config());
ENVOY_LOG(debug, " config: {}", filter_config->asJsonString());
// Now see if there is a factory that will accept the config.
auto& factory =
Config::Utility::getAndCheckFactory<Configuration::NamedNetworkFilterConfigFactory>(
string_name);
Config::Utility::validateTerminalFilters(filters[i].name(), "network",
factory.isTerminalFilter(), i == filters.size() - 1);
Network::FilterFactoryCb callback;
if (Config::Utility::allowDeprecatedV1Config(context.runtime(), *filter_config)) {
callback = factory.createFilterFactory(*filter_config->getObject("value", true), context);
} else {
auto message = Config::Utility::translateToFactoryConfig(
proto_config, context.messageValidationVisitor(), factory);
callback = factory.createFilterFactoryFromProto(*message, context);
}
ret.push_back(callback);
}
return ret;
}
判断相同名称的Listener必须有相同的配置地址。
if ((existing_warming_listener != warming_listeners_.end() &&
*(*existing_warming_listener)->address() != *new_listener->address()) ||
(existing_active_listener != active_listeners_.end() &&
*(*existing_active_listener)->address() != *new_listener->address())) {
const std::string message = fmt::format(
"error updating listener: '{}' has a different address '{}' from existing listener", name,
new_listener->address()->asString());
ENVOY_LOG(warn, "{}", message);
throw EnvoyException(message);
}
那么接下来分为三种情况:
新创建的Listener在warming_listeners列表中
新创建的Listener在active_listeners列表中
新创建的Listener不在上述两个列表中
前两种情况我们进行相应的更新Listener操作,而第三种情况将进行新增Listener操作。
bool added = false;
if (existing_warming_listener != warming_listeners_.end()) {
// 1.在warming_listeners列表中
} else if (existing_active_listener != active_listeners_.end()) {
// 2.在active_listeners列表中
} else {
// 3.新增Listener
added = true;
}
如果更新的Listener在“warming”阶段的Listener中,直接内部替换。
if (existing_warming_listener != warming_listeners_.end()) {
// In this case we can just replace inline.
ASSERT(workers_started_);
new_listener->debugLog("update warming listener");
new_listener->setSocket((*existing_warming_listener)->getSocket());
*existing_warming_listener = std::move(new_listener);
如果更新的Listener在活动中的Listener中。当前工作线程已经启动的话,加入“预热”阶段的Listener。当前工作线程未启动状态,直接内部替换。
此处打印日志:add warming listener
} else if (existing_active_listener != active_listeners_.end()) {
// In this case we have no warming listener, so what we do depends on whether workers
// have been started or not. Either way we get the socket from the existing listener.
new_listener->setSocket((*existing_active_listener)->getSocket());
if (workers_started_) {
new_listener->debugLog("add warming listener");
warming_listeners_.emplace_back(std::move(new_listener));
} else {
new_listener->debugLog("update active listener");
*existing_active_listener = std::move(new_listener);
}
}
当前Listener处于active状态时的变化图1
在addOrUpdateListeneraddOrUpdateListener方法的最后才对新的Listener执行initialize操作。
......
new_listener_ref.initialize();
return true;
}
init操作实际上就是执行onListenerWarmed方法,在Listener被创建的时候提前把此方法注册到了Init_watcher内部。
ListenerImpl::ListenerImpl(const envoy::api::v2::Listener& config, const std::string& version_info,
ListenerManagerImpl& parent, const std::string& name, bool added_via_api,
bool workers_started, uint64_t hash,
ProtobufMessage::ValidationVisitor& validation_visitor)
: parent_(parent), address_(Network::Address::resolveProtoAddress(config.address())),
......
dynamic_init_manager_(fmt::format("Listener {}", name)),
init_watcher_(std::make_unique<Init::WatcherImpl>(
"ListenerImpl", [this] { parent_.onListenerWarmed(*this); })),
......
执行onListenerWarmed方法的时候把warming列表内的Listener更新到active状态,把active状态的对应Listener更新到draining状态。
void ListenerManagerImpl::onListenerWarmed(ListenerImpl& listener) {
// The warmed listener should be added first so that the worker will accept new connections
// when it stops listening on the old listener.
for (const auto& worker : workers_) {
addListenerToWorker(*worker, listener);
}
auto existing_active_listener = getListenerByName(active_listeners_, listener.name());
auto existing_warming_listener = getListenerByName(warming_listeners_, listener.name());
(*existing_warming_listener)->debugLog("warm complete. updating active listener");
if (existing_active_listener != active_listeners_.end()) {
drainListener(std::move(*existing_active_listener));
*existing_active_listener = std::move(*existing_warming_listener);
} else {
active_listeners_.emplace_back(std::move(*existing_warming_listener));
}
warming_listeners_.erase(existing_warming_listener);
updateWarmingActiveGauges();
}
uint64_t ListenerManagerImpl::numConnections() {
uint64_t num_connections = 0;
for (const auto& worker : workers_) {
num_connections += worker->numConnections();
}
return num_connections;
}
当前Listener处于active状态时的变化图2
(removing listener部分未完待续)
当前Listener处于active状态时的变化图3
新增Listener
新增Listener时,会先去“draining”状态的Listener中查找是否已有相同地址的Listener,有相同的重新拉回来,并根据当前工作线程是否已就绪加入对应的Listener。
// Typically we catch address issues when we try to bind to the same address multiple times.
// However, for listeners that do not bind we must check to make sure we are not duplicating.
// This is an edge case and nothing will explicitly break, but there is no possibility that
// two listeners that do not bind will ever be used. Only the first one will be used when
// searched for by address. Thus we block it.
if (!new_listener->bindToPort() &&
(hasListenerWithAddress(warming_listeners_, *new_listener->address()) ||
hasListenerWithAddress(active_listeners_, *new_listener->address()))) {
const std::string message =
fmt::format("error adding listener: '{}' has duplicate address '{}' as existing listener",
name, new_listener->address()->asString());
ENVOY_LOG(warn, "{}", message);
throw EnvoyException(message);
}
// We have no warming or active listener so we need to make a new one. What we do depends on
// whether workers have been started or not. Additionally, search through draining listeners
// to see if there is a listener that has a socket bound to the address we are configured for.
// This is an edge case, but may happen if a listener is removed and then added back with a same
// or different name and intended to listen on the same address. This should work and not fail.
Network::SocketSharedPtr draining_listener_socket;
auto existing_draining_listener = std::find_if(
draining_listeners_.cbegin(), draining_listeners_.cend(),
[&new_listener](const DrainingListener& listener) {
return *new_listener->address() == *listener.listener_->socket().localAddress();
});
if (existing_draining_listener != draining_listeners_.cend()) {
draining_listener_socket = existing_draining_listener->listener_->getSocket();
}
new_listener->setSocket(draining_listener_socket
? draining_listener_socket
: factory_.createListenSocket(new_listener->address(),
new_listener->socketType(),
new_listener->listenSocketOptions(),
new_listener->bindToPort()));
if (workers_started_) {
new_listener->debugLog("add warming listener");
warming_listeners_.emplace_back(std::move(new_listener));
} else {
new_listener->debugLog("add active listener");
active_listeners_.emplace_back(std::move(new_listener));
}
added = true;
一些思考
在新旧Listener进行替换时,已经创建的tcp连接如何优雅地重新建立连接?
参数--drain-time-s的设置有什么意义?
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)