云原生网关 APISIX 的核心流程以源码分析的方式剖析其工作原理-2
@[TOC](云原生网关 APISIX 的核心流程以源码分析的方式剖析其工作原理-2)
✨博主介绍
🌊 作者主页:苏州程序大白
🌊 作者简介:🏆CSDN人工智能域优质创作者🥇,苏州市凯捷智能科技有限公司创始之一,目前合作公司富士康、歌尔等几家新能源公司
💬如果文章对你有帮助,欢迎关注、点赞、收藏
💅 有任何问题欢迎私信,看到会及时回复
💅关注苏州程序大白,分享粉丝福利
etcd
etcd 在 APISIX 中作用相同与 PostgreSQL 之于 Kong,内部使用 lua-resty-etcd 作为客户端,使用 timer 定时执行和长轮询获取跟踪 etcd 中数据的变化。
这里的优化点与 Kong 一样,在 init_by_lua
阶段进行数据的 warm up,之后数据会 fork 到其他的进程中。
It does not really make much sense to use this library in the context of init_by_lua because the cache will not get shared by any of the worker processes (unless you just want to “warm up” the cache with predefined items which will get inherited by the workers via
fork()
).8
初始化
读取 etcd 数据到全局单例的 Lua table。
-- 初始化 etcd
function _M.init()
local local_conf, err = config_local.local_conf()
if not local_conf then
return nil, err
end
if table.try_read_attr(local_conf, "apisix", "disable_sync_configuration_during_start") then
return true
end
-- 获取 etcd cli
local etcd_cli, err = get_etcd()
if not etcd_cli then
return nil, "failed to start a etcd instance: " .. err
end
local etcd_conf = local_conf.etcd
local prefix = etcd_conf.prefix
-- 加载 etcd 所有数据到 lua table 中, 单例模式
local res, err = readdir(etcd_cli, prefix, create_formatter(prefix))
if not res then
return nil, err
end
return true
end
对数据进行格式化,存入 Lua table 中:
-- 创建格式化 formatter
local function create_formatter(prefix)
-- 返回闭包函数, 对 etcd 返回的结果进行格式化
-- 格式个毛, 这就是个 hook 函数
return function (res)
res.body.nodes = {}
local dirs
if is_http then
dirs = constants.HTTP_ETCD_DIRECTORY
else
dirs = constants.STREAM_ETCD_DIRECTORY
end
local curr_dir_data
local curr_key
for _, item in ipairs(res.body.kvs) do
if curr_dir_data then
-- 将匹配的内容插入 table
if core_str.has_prefix(item.key, curr_key) then
table.insert(curr_dir_data, etcd_apisix.kvs_to_node(item))
goto CONTINUE
end
curr_dir_data = nil
end
-- 截取 prefix 后的 key
local key = sub_str(item.key, #prefix + 1)
if dirs[key] then
-- single item
loaded_configuration[key] = {
body = etcd_apisix.kvs_to_node(item),
headers = res.headers,
}
else
-- 前缀一致
local key = sub_str(item.key, #prefix + 1, #item.key - 1) -- 去掉末尾的 /
-- ensure the same key hasn't been handled as single item
if dirs[key] and not loaded_configuration[key] then
loaded_configuration[key] = {
body = {
nodes = {},
},
headers = res.headers,
}
curr_dir_data = loaded_configuration[key].body.nodes
curr_key = item.key
end
end
::CONTINUE::
end
return res
end
end
这部分逻辑在 init_by_lua
执行,fork 到其他子进程。
数据校验
schema_def.lua
文件中定义了所有储存数据结构的 schema 校验规则,使用 jsonschema 库进行数据校验。
core/schema.lua
中使用 LRU 缓存校验器。
load_full_data
函数加载数据结构所需的 etcd kvs,并进行数据转换、校验、格式化、执行回调。
local function load_full_data(self, dir_res, headers)
local err
local changed = false
if self.single_item then
-- table size 为 1
...
-- 执行逻辑与下面数组格式类似
else
if not dir_res.nodes then
dir_res.nodes = {}
end
self.values = new_tab(#dir_res.nodes, 0)
self.values_hash = new_tab(0, #dir_res.nodes)
for _, item in ipairs(dir_res.nodes) do
local key = short_key(self, item.key)
local data_valid = true
-- 数据格式校验...
-- schema 校验...
-- 过滤器...
if data_valid then
changed = true
insert_tab(self.values, item)
self.values_hash[key] = #self.values
item.value.id = key
item.clean_handlers = {}
-- 执行回调
if self.filter then
self.filter(item)
end
end
-- 更新 mvcc 版本
self:upgrade_version(item.modifiedIndex)
end
end
...
self.need_reload = false
end
后台数据同步
利用 etcd watch 机制进行数据变更的同步。
-- 定时器自动同步 etcd 数据
local function _automatic_fetch(premature, self)
if premature then
return
end
local i = 0
while not exiting() and self.running and i <= 32 do
i = i + 1
local ok, err = xpcall(function()
if not self.etcd_cli then
local etcd_cli, err = get_etcd()
if not etcd_cli then
error("failed to create etcd instance for key ["
.. self.key .. "]: " .. (err or "unknown"))
end
self.etcd_cli = etcd_cli
end
-- 同步数据
local ok, err = sync_data(self)
if err then
if err ~= "timeout" and err ~= "Key not found"
and self.last_err ~= err then
log.error("failed to fetch data from etcd: ", err, ", ",
tostring(self))
end
if err ~= self.last_err then
self.last_err = err
self.last_err_time = ngx_time()
else
if ngx_time() - self.last_err_time >= 30 then
self.last_err = nil
end
end
ngx_sleep(self.resync_delay + rand() * 0.5 * self.resync_delay)
elseif not ok then
-- no error. reentry the sync with different state
ngx_sleep(0.05)
end
end, debug.traceback)
if not ok then
log.error("failed to fetch data from etcd: ", err, ", ",
tostring(self))
ngx_sleep(self.resync_delay + rand() * 0.5 * self.resync_delay)
break
end
end
-- 进行下一次循环
if not exiting() and self.running then
ngx_timer_at(0, _automatic_fetch, self)
end
end
配置同步
封装上述的逻辑提供给 routes
、plugins
、services
等数据结构使用,每个数据结构监听自己的 prefix,同步数据并执行回调,通常在回调逻辑上触发更新,例如重新构建 Router、重新构建 plugins table 等。
-- etcd 配置创建
function _M.new(key, opts)
local local_conf, err = config_local.local_conf()
if not local_conf then
return nil, err
end
-- etcd 重新同步事件 5 秒, 与 Kong 重新 poll db 数据一致
local etcd_conf = local_conf.etcd
local prefix = etcd_conf.prefix
local resync_delay = etcd_conf.resync_delay
if not resync_delay or resync_delay < 0 then
resync_delay = 5
end
local automatic = opts and opts.automatic
local item_schema = opts and opts.item_schema
local filter_fun = opts and opts.filter
local timeout = opts and opts.timeout
local single_item = opts and opts.single_item
local checker = opts and opts.checker
local obj = setmetatable({
etcd_cli = nil,
key = key and prefix .. key,
automatic = automatic,
item_schema = item_schema,
checker = checker,
sync_times = 0,
running = true,
conf_version = 0,
values = nil,
need_reload = true,
routes_hash = nil,
prev_index = 0,
last_err = nil,
last_err_time = nil,
resync_delay = resync_delay,
timeout = timeout,
single_item = single_item,
filter = filter_fun,
}, mt)
if automatic then
-- timer 定时获取数据
if not key then
return nil, "missing `key` argument"
end
-- 从单例 table 获取 etcd 数据, 进行处理
if loaded_configuration[key] then
local res = loaded_configuration[key]
-- 清空 table
loaded_configuration[key] = nil -- tried to load
log.notice("use loaded configuration ", key)
local dir_res, headers = res.body, res.headers
-- 加载数据并校验数据, 过滤数据
load_full_data(obj, dir_res, headers)
end
-- 创建定时器自动同步
ngx_timer_at(0, _automatic_fetch, obj)
else
local etcd_cli, err = get_etcd()
if not etcd_cli then
return nil, "failed to start a etcd instance: " .. err
end
obj.etcd_cli = etcd_cli
end
if key then
created_obj[key] = obj
end
return obj
end
Router
APISIX 的 Router 匹配基于压缩字典树(Radix Tree)实现,主要使用 lua-resty-radixtree 库。内置多种解析模式,这里只关注 HTTP 默认的 radixtree_uri
实现。
路由构建
core.config.new
调用的是 etcd 库(config_etcd.lua
)维护的配置同步方法,返回原表,可以访问从 etcd 同步的数据。core.schema.route
包含了 route 这个数据结构的 schema 及校验规则,check_route
内部检查 route 直接绑定 plugin 的数据结构。
APISIX 引入 route 直接绑定 plugin 的简化配置,不需要额外创建 plugin 对象。
-- 初始化 router
function _M.init_worker(filter)
local user_routes, err = core.config.new("/routes", {
automatic = true, -- 自动同步
item_schema = core.schema.route,
checker = check_route,
filter = filter,
})
if not user_routes then
error("failed to create etcd instance for fetching /routes : " .. err)
end
return user_routes
end
filter
是回调函数,下述的流程中会注入。
路由初始化
router.http_init_worker
中进行 Router 初始化。
-- attach common methods if the router doesn't provide its custom implementation
local function attach_http_router_common_methods(http_router)
...
if http_router.init_worker == nil then
http_router.init_worker = function (filter)
-- 添加路由
http_router.user_routes = http_route.init_worker(filter)
end
end
end
function _M.http_init_worker()
local conf = core.config.local_conf()
-- 默认的匹配模式
local router_http_name = "radixtree_uri"
local router_ssl_name = "radixtree_sni"
if conf and conf.apisix and conf.apisix.router then
router_http_name = conf.apisix.router.http or router_http_name
router_ssl_name = conf.apisix.router.ssl or router_ssl_name
end
-- 创建 router 实例
local router_http = require("apisix.http.router." .. router_http_name)
-- 修改 router 的 table
attach_http_router_common_methods(router_http)
-- 初始化路由
-- 调用 apisix.http.route.init_worker 方法
-- 从 etcd 获取数据并执行回调
-- filter 为格式化, 解析 upstream
router_http.init_worker(filter)
_M.router_http = router_http
local router_ssl = require("apisix.ssl.router." .. router_ssl_name)
router_ssl.init_worker()
_M.router_ssl = router_ssl
_M.api = require("apisix.api_router")
...
end
http_router.user_routes
储存在 router 的 table 中,会在路由匹配时用到(懒加载)。
路由匹配
access_by_lua
阶段中进行路由匹配,将匹配结果(route、service)传递到 ctx 中供 balancer 请求上游。
do
local uri_routes = {}
local uri_router
local match_opts = {}
function _M.match(api_ctx)
-- 从 module 的 user_routes 属性获取路由, 在 etcd route 变化时回调添加
local user_routes = _M.user_routes
if not cached_version or cached_version ~= user_routes.conf_version then
uri_router = base_router.create_radixtree_uri_router(user_routes.values,
uri_routes, false)
cached_version = user_routes.conf_version
end
if not uri_router then
core.log.error("failed to fetch valid `uri` router: ")
return true
end
return base_router.match_uri(uri_router, match_opts, api_ctx)
end
end
radixtree
路由匹配库提供了匹配成功回调 handler,匹配成功后传递到 ctx 中。
core.table.insert(uri_routes, {
...
handler = function (api_ctx, match_opts)
api_ctx.matched_params = nil
api_ctx.matched_route = route
api_ctx.curr_req_matched = match_opts.matched
end
})
Balancer
Balancer 部分与 Kong 逻辑一致,甚至代码里函数名都一样,主要逻辑是 Service/Upstream 节点解析、负载均衡策略、健康检查与失败重试。
APISIX 支持的一特性是外部服务发现,Kong 中默认支持通过 DNS 解析 Service host,根据 AAAA、A、SRV 记录添加 IP 与优先级,APISIX 支持了从 consul、eruka 和其他注册中心获取 IP 地址列表,并同步节点数据(长轮询)。
服务发现
如果 serivce host 是域名, 通过外部注册中心进行服务发现,获取上游 IP 列表。
function _M.set_by_route(route, api_ctx)
...
-- 如果 serivce host 是域名, 通过 discovery 发现, dns 解析
if up_conf.service_name then
...
-- 外部注册中心
local dis = discovery[up_conf.discovery_type]
if not dis then
return 500, "discovery " .. up_conf.discovery_type .. " is uninitialized"
end
-- 从注册中心数据源(缓存本地 table)获取 IP
local new_nodes, err = dis.nodes(up_conf.service_name)
if not new_nodes then
return HTTP_CODE_UPSTREAM_UNAVAILABLE, "no valid upstream node: " .. (err or "nil")
end
...
end
-- 将 upstream 节点信息存入 ctx
set_directly(api_ctx, up_conf.type .. "#upstream_" .. tostring(up_conf),
api_ctx.conf_version, up_conf)
local nodes_count = up_conf.nodes and #up_conf.nodes or 0
if nodes_count == 0 then
return HTTP_CODE_UPSTREAM_UNAVAILABLE, "no valid upstream node"
end
...
set_upstream_scheme(api_ctx, up_conf)
local ok, err = fill_node_info(up_conf, api_ctx.upstream_scheme, false)
if not ok then
return 503, err
end
...
local scheme = up_conf.scheme
if (scheme == "https" or scheme == "grpcs") and up_conf.tls then
...
end
return
end
负载均衡
不同于 Kong 使用自己封装的 lua-resty-dns-client/balancer 作为负载均衡器,APISIX 基于 lua-resty-balancer 封装了负载均衡策略,基于 lua-resty-healthcheck(fork 版本)实现节点健康检查。
API 网关的负载均衡策略(Kong/APISIX)都是基于 OpenResty lua-resty-core/balancer 提供的负载均衡函数实现,set_current_peer
设置当前请求上游地址,set_more_tries
设置请求失败重试次数,get_last_failure
获取上一次请求失败结果判断是否需要继续重试,set_timeouts
设置单个请求超时时间。
set_balancer_opts
设置 Nginx Balancer 参数。
-- set_balancer_opts will be called in balancer phase and before any tries
local function set_balancer_opts(route, ctx)
local up_conf = ctx.upstream_conf
-- If the matched route has timeout config, prefer to use the route config.
local timeout = nil
if route and route.value and route.value.timeout then
timeout = route.value.timeout
else
if up_conf.timeout then
timeout = up_conf.timeout
end
end
-- 设置 Nginx 请求超时时间
if timeout then
local ok, err = set_timeouts(timeout.connect, timeout.send,
timeout.read)
if not ok then
core.log.error("could not set upstream timeouts: ", err)
end
end
local retries = up_conf.retries
if not retries or retries < 0 then
retries = #up_conf.nodes - 1
end
-- 设置 Nginx 失败重试次数
if retries > 0 then
local ok, err = set_more_tries(retries)
...
end
end
在 access_by_lua
阶段中服务发现,调用 balancer 库获取 peer 节点,balancer_by_lua
中从 ctx 中获取 peer 节点信息,访问后端节点,若失败重试(该阶段再次被调用),重新获取 peer 节点,重新创建请求(recreate_request()
)再次访问后端节点。
Plugin
插件机制也与 Kong 类似,插件开发者可以定义 Schema 配置数据结构,以及 Handler 注入 Nginx 请求生命周期,API 网关提供核心的库供开发者使用(SDK)。
APISIX 相比 Kong,开源的插件较多,插件 Schema 便于编写,同时插件只需要单文件,而 Kong 的插件通常是单独一个仓库,不方便维护。但是考虑到插件需要单独的 Test::Nginx 单元测试,单独一个仓库也未尝不可(Kong 还说了以后会把 Github 项目主仓库的插件代码移到单独的仓库)。
具体各个阶段执行逻辑应该与 Kong 相同,即部分阶段插件开协程并发执行,部分阶段避免数据竞争,插件顺序执行。
值得注意的一点是 APISIX 生命周期里没有 rewrite_by_lua
阶段,插件实现的该阶段会在 access_by_lua
中优先于 access_by_lua
插件逻辑执行。
The apisix run both “.access” and “.rewrite” in the “access” phase.9
插件加载
插件列表从本地 yaml 文件获取,同时监听本地文件变化,同步配置;插件配置信息从 etcd 获取。
local init_plugins_syncer
do
local plugins_conf
function init_plugins_syncer()
local err
-- 储存插件的配置信息, 一条 kv
plugins_conf, err = core.config.new("/plugins", {
automatic = true, -- 后台创建 timer watch etcd 自动同步配置
item_schema = core.schema.plugins,
single_item = true,
-- filter 方法中访问到 etcd kv 的 item, 这里进行插件加载的回调
-- 每次 etcd 插件配置变动, 自动同步
filter = function(item)
-- we need to pass 'item' instead of plugins_conf because
-- the latter one is nil at the first run
_M.load(item)
end,
})
if not plugins_conf then
error("failed to create etcd instance for fetching /plugins : " .. err)
end
end
end
插件列表会储存到 Lua table 中:
-- 加载插件
local function load(plugin_names)
local processed = {}
for _, name in ipairs(plugin_names) do
if processed[name] == nil then
processed[name] = true
end
end
core.log.warn("new plugins: ", core.json.delay_encode(processed))
-- 移除已经存在的 module
for name in pairs(local_plugins_hash) do
unload_plugin(name)
end
core.table.clear(local_plugins)
core.table.clear(local_plugins_hash)
-- 加载插件
for name in pairs(processed) do
load_plugin(name, local_plugins)
end
-- 插件排序, priority 越高的插件越先执行, 与 Kong 同样
-- sort by plugin's priority
if #local_plugins > 1 then
sort_tab(local_plugins, sort_plugin)
end
-- 打印调试日志
for i, plugin in ipairs(local_plugins) do
...
end
return true
end
插件配置信息 plugin_meta
也加载到 Lua table 中,在插件匹配的时候会获取。
插件匹配
插件过滤,遍历插件列表,匹配开启的插件,O(n) 操作 plugin.filter(route)
:
-- 插件配置绑定
function _M.filter(user_route, plugins)
...
plugins = plugins or core.tablepool.fetch("plugins", 32, 0)
for _, plugin_obj in ipairs(local_plugins) do
local name = plugin_obj.name
local plugin_conf = user_plugin_conf[name]
-- 插件和插件配置存入
if type(plugin_conf) == "table" and not plugin_conf.disable then
core.table.insert(plugins, plugin_obj)
core.table.insert(plugins, plugin_conf)
end
end
trace_plugins_info_for_debug(plugins)
return plugins
end
插件执行
这里以 access_by_lua
阶段插件执行逻辑为例,根据 Route、Service 匹配插件,创建临时 Table 储存 plugin 和 plugin_conf,存入 ctx 中。
-- 插件过滤, 遍历插件列表, 匹配开启的插件, O(n)
local plugins = plugin.filter(route)
api_ctx.plugins = plugins
-- fake 执行 rewrite 阶段
plugin.run_plugin("rewrite", plugins, api_ctx)
if api_ctx.consumer then
local changed
route, changed = plugin.merge_consumer_route(
route,
api_ctx.consumer,
api_ctx
)
core.log.info("find consumer ", api_ctx.consumer.username,
", config changed: ", changed)
if changed then
core.table.clear(api_ctx.plugins)
api_ctx.plugins = plugin.filter(route, api_ctx.plugins)
end
end
-- 执行 access 阶段
plugin.run_plugin("access", plugins, api_ctx)
- 点赞
- 收藏
- 关注作者
评论(0)