云原生网关 APISIX 的核心流程以源码分析的方式剖析其工作原理-1
@[TOC](云原生网关 APISIX 的核心流程以源码分析的方式剖析其工作原理-1)
✨博主介绍
🌊 作者主页:苏州程序大白
🌊 作者简介:🏆CSDN人工智能域优质创作者🥇,苏州市凯捷智能科技有限公司创始之一,目前合作公司富士康、歌尔等几家新能源公司
💬如果文章对你有帮助,欢迎关注、点赞、收藏
💅 有任何问题欢迎私信,看到会及时回复
💅关注苏州程序大白,分享粉丝福利
APISIX介绍:
Apache APISIX 是一个动态、实时、高性能的 API 网关, 提供负载均衡、动态上游、灰度发布、服务熔断、身份认证、可观测性等丰富的流量管理功能。
你可以使用 Apache APISIX 来处理传统的南北向流量,以及服务间的东西向流量, 也可以当做 k8s ingress controller 来使用。
Apache APISIX 的技术架构如下图所示:
特性
你可以把 Apache APISIX 当做流量入口,来处理所有的业务数据,包括动态路由、动态上游、动态证书、 A/B 测试、金丝雀发布(灰度发布)、蓝绿部署、限流限速、抵御恶意攻击、监控报警、服务可观测性、服务治理等。
- 全平台
- 云原生: 平台无关,没有供应商锁定,无论裸机还是 Kubernetes,APISIX 都可以运行。
- 运行环境: OpenResty 和 Tengine 都支持。
- 支持 ARM64: 不用担心底层技术的锁定。
- 多协议
- TCP/UDP 代理: 动态 TCP/UDP 代理。
- Dubbo 代理: 动态代理 HTTP 请求到 Dubbo 后端。
- 动态 MQTT 代理: 支持用
client_id
对 MQTT 进行负载均衡,同时支持 MQTT 3.1.* 和 5.0 两个协议标准。 - gRPC 代理:通过 APISIX 代理 gRPC 连接,并使用 APISIX 的大部分特性管理你的 gRPC 服务。
- gRPC 协议转换:支持协议的转换,这样客户端可以通过 HTTP/JSON 来访问你的 gRPC API。
- Websocket 代理
- Proxy Protocol
- Dubbo 代理:基于 Tengine,可以实现 Dubbo 请求的代理。
- HTTP(S) 反向代理
- SSL:动态加载 SSL 证书。
- 全动态能力
- 热更新和热插件: 无需重启服务,就可以持续更新配置和插件。
- 代理请求重写: 支持重写请求上游的
host
、uri
、schema
、enable_websocket
、headers
信息。 - 输出内容重写: 支持自定义修改返回内容的
status code
、body
、headers
。 - Serverless: 在 APISIX 的每一个阶段,你都可以添加并调用自己编写的函数。
- 动态负载均衡:动态支持有权重的 round-robin 负载平衡。
- 支持一致性 hash 的负载均衡:动态支持一致性 hash 的负载均衡。
- 健康检查:启用上游节点的健康检查,将在负载均衡期间自动过滤不健康的节点,以确保系统稳定性。
- 熔断器: 智能跟踪不健康上游服务。
- 代理镜像: 提供镜像客户端请求的能力。
- 流量拆分: 允许用户逐步控制各个上游之间的流量百分比。
- 精细化路由
- 支持全路径匹配和前缀匹配
- 支持使用 Nginx 所有内置变量做为路由的条件,所以你可以使用
cookie
,args
等做为路由的条件,来实现灰度发布、A/B 测试等功能 - 支持各类操作符做为路由的判断条件,比如
{"arg_age", ">", 24}
- 支持自定义路由匹配函数
- IPv6:支持使用 IPv6 格式匹配路由
- 支持路由的自动过期(TTL)
- 支持路由的优先级
- 支持批量 Http 请求
- 支持通过GraphQL属性过滤路由
- 安全防护
- 运维友好
- OpenTracing 可观测性: 支持 Apache Skywalking 和 Zipkin。
- 对接外部服务发现:除了内置的 etcd 外,还支持 Consul 和 Nacos,以及 Eureka。
- 监控和指标: Prometheus
- 集群:APISIX 节点是无状态的,创建配置中心集群请参考 etcd Clustering Guide。
- 高可用:支持配置同一个集群内的多个 etcd 地址。
- 控制台: 操作 APISIX 集群。
- 版本控制:支持操作的多次回滚。
- CLI: 使用命令行来启动、关闭和重启 APISIX。
- 单机模式: 支持从本地配置文件中加载路由规则,在 kubernetes(k8s) 等环境下更友好。
- 全局规则:允许对所有请求执行插件,比如黑白名单、限流限速等。
- 高性能:在单核上 QPS 可以达到 18k,同时延迟只有 0.2 毫秒。
- 故障注入
- REST Admin API: 使用 REST Admin API 来控制 Apache APISIX,默认只允许 127.0.0.1 访问,你可以修改
conf/config.yaml
中的allow_admin
字段,指定允许调用 Admin API 的 IP 列表。同时需要注意的是,Admin API 使用 key auth 来校验调用者身份,在部署前需要修改conf/config.yaml
中的admin_key
字段,来保证安全。 - 外部日志记录器:将访问日志导出到外部日志管理工具。(HTTP Logger, TCP Logger, Kafka Logger, UDP Logger)
- Helm charts
- 高度可扩展
- 自定义插件: 允许挂载常见阶段,例如
init
,rewrite
,access
,balancer
,header filter
,body filter
和log
阶段。 - 插件可以用 Java/Go 编写
- 自定义负载均衡算法:可以在
balancer
阶段使用自定义负载均衡算法。 - 自定义路由: 支持用户自己实现路由算法。
- 自定义插件: 允许挂载常见阶段,例如
项目概述
APISIX 是基于 OpenResty 开发的 API 网关,与 OpenResty 的请求生命周期一致,APISIX 利用 Lua Nginx Module 提供的 *_by_lua
添加 Hook。
APISIX 抽象了 Route、Service、Upstream、Plugin、Consumer 等数据模型,与 Kong 网关如出一辙。
基本上可以看作 APISIX 是 Kong 网关的重构——运用大量 LuaJIT、OpenResty 技巧优化性能、简化复杂的数据结构、替换储存引擎为 etcd 等。
值得一提的是,在 APISIX 的一个 issue 中,项目开发者说不确定是什么原因,我们看看 Kong 网关是怎么解决的吧。
“Kong是如何解决类似问题的?"
生态概述
Kong 网关开源生态有的,APISIX 基本都有或者正在做。包含:Kubernetes Ingress Controller、Mesh、Dashboard。
插件方面比 Kong 开源版本多了 Skywalking APM 数据上报、Traffit 流量拆分、Mirror 流量镜像等功能。
基本流程
本节概述 APISIX 的目录结构,以及其启动流程。
目录结构
$ tree -L 2
.
├── apisix
│ ├── admin # Admin API
│ ├── api_router.lua
│ ├── balancer # 负载均衡器
│ ├── balancer.lua
│ ├── cli # CLI, Lua 脚本
│ ├── constants.lua # 常量
│ ├── consumer.lua
│ ├── control
│ ├── core # 主要是封装的公共方法
│ ├── core.lua
│ ├── debug.lua
│ ├── discovery # 服务发现, 支持 consul, eruka, dns
│ ├── http
│ ├── init.lua # _by_lua 函数入口
│ ├── patch.lua
│ ├── plugin_config.lua
│ ├── plugin.lua # 插件
│ ├── plugins
│ ├── router.lua # Router
│ ├── schema_def.lua # jsonschema 定义
│ ├── script.lua
│ ├── ssl
│ ├── ssl.lua
│ ├── stream
│ ├── timers.lua # timer 封装
│ ├── upstream.lua
│ └── utils
├── bin
│ └── apisix # apisix CLI, shell 脚本
├── ci # CI 脚本
├── conf # 默认配置文件
├── deps
├── docs
├── Makefile # 快捷指令
├── rockspec # luarocks 包管理
├── t # Test::Nginx 测试
└── utils # Shell 脚本
启动流程
CLI 默认会用 LuaJIT 启动,若版本不够便退回到 Lua 5.1 解释器执行。
# 查找 APISIX LUA 包路径
# shell -s 判断文件是否存在且 size > 0
# ref: https://stackoverflow.com/questions/53319817/what-is-the-meaning-of-n-z-x-l-d-etc-in-shell-script
if [ -s './apisix/cli/apisix.lua' ]; then
...
fi
# shell -e 判断文件是否存在
if [[ -e $OR_EXEC && "$OR_VER" =~ "1.19" ]]; then
# use the luajit of openresty
echo "$LUAJIT_BIN $APISIX_LUA $*"
exec $LUAJIT_BIN $APISIX_LUA $*
elif [[ "$LUA_VERSION" =~ "Lua 5.1" ]]; then
# OpenResty version is not 1.19, use Lua 5.1 by default
# shell &* 传递所有 args
# ref: https://stackoverflow.com/questions/4824590/propagate-all-arguments-in-a-bash-shell-script
echo "lua $APISIX_LUA $*"
exec lua $APISIX_LUA $*
fi
启动过程中:
- 调用
popen
执行 CMD 命令; - 使用 luasocket 库发起 HTTP 请求(非 OpenResty 运行时);
- 使用 ltn12 sink 进行流处理;
- 创建 etcd prefix,value 为
init
;
基本类型操作
基本上为了追求极致性能,能用 FFI 调用实现的都用了。
字符串
使用 FFI 调用 libc 函数 memcmp
进行字符串比较内存地址的前 n 长度是否相同。
local ffi = require("ffi")
local C = ffi.C
-- ref: https://www.cplusplus.com/reference/cstring/memcmp/
-- ref: https://www.tutorialspoint.com/c_standard_library/c_function_memcmp.htm
ffi.cdef[[
int memcmp(const void *s1, const void *s2, size_t n);
]]
接收类型是 const void *
,不可变类型可以直接传入 Lua string 类型。
如果你的 C 函数接受
const char *
或者等价的const unsigned char/int8_t/... *
这样的参数类型, 可以直接传递 Lua string 进去,而无需另外准备一个ffi.new
申请的数组。
string 前缀比较,比较 s, prefix 内存地址的前 n (#prefix) 长度是否相同。
-- 用 ffi 扩展 string 方法
function _M.has_prefix(s, prefix)
if type(s) ~= "string" or type(prefix) ~= "string" then
error("unexpected type: s:" .. type(s) .. ", prefix:" .. type(prefix))
end
if #s < #prefix then
return false
end
-- 比较 s, prefix 内存地址的前 n (#prefix) 长度是否相同
local rc = C.memcmp(s, prefix, #prefix)
return rc == 0
end
同理比较后缀:
C.memcmp(ffi_cast("char *", s) + #s - #suffix, suffix, #suffix)
Table
Table 是 Lua 中最常用的类型了,与其他语言比较的话相当于 PHP 的 Array 一样实用。
Lua Table 需要注意的地方其一:
table.new(narray, nhash)
这个函数,会预先分配好指定的数组和哈希的空间大小,而不是在插入元素时自增长,这也是它的两个参数 narray 和 nhash 的含义。 如果不使用这个函数,自增长是一个代价比较高的操作,会涉及到空间分配、resize 和 rehash 等,我们应该尽量避免。
table.new 的文档并没有出现在 LuaJIT 的官网,而是深藏在 GitHub 项目的 扩展文档 里,用谷歌也很难找到,所以很多人并不知道这个函数的存在。
超出预设的空间大小,也可以正常使用,只不过性能会退化,也就失去了使用 table.new 的意义。
需要根据实际场景,来预设好 table.new 中数组和哈希空间的大小,这样才能在性能和内存占用上找到一个平衡点。3
Lua Table 需要注意的地方其二:
table.insert 虽然是一个很常见的操作,但性能并不乐观。 如果不是根据指定下标来插入元素,那么每次都需要调用 LuaJIT 的 lj_tab_len 来获取数组的长度,以便插入队尾。获取 table 长度的时间复杂度为 O(n) 。
参考 APISIX 作者给 ingress-nginx 项目提的 Table 操作优化 PR:used table functions of LuaJIT for better performance.
OpenResty Fork 的 LuaJIT 新增的 table 函数4:
- table.isempty
- table.isarray
- table.nkeys
- table.clone
回到 APISIX 封装的 Table 操作符:
-- 自行构建 index 插入 table, 比 table.insert 效率高
function _M.insert_tail(tab, ...)
local idx = #tab
-- 遍历输入的参数
for i = 1, select('#', ...) do
idx = idx + 1
tab[idx] = select(i, ...)
end
return idx
end
select('#', ...)
获取输入参数的数量,select(i, ...)
获取第 n 个参数,Table 的遍历中大量使用该结构。
try_read_attr
实现了 path.node.x
的 table 访问方式,便于读取多层级配置项。
function _M.try_read_attr(tab, ...)
for i = 1, select('#', ...) do
local attr = select(i, ...)
if type(tab) ~= "table" then
return nil
end
tab = tab[attr]
end
return tab
end
使用示例:
local size = core_tab.try_read_attr(local_conf, "graphql", "max_size")
if size then
max_size = size
end
工具类
APISIX 封装了许多工具类,这些工具共同组成了 APISIX 的 PDK(Plugin Development Kit),利用这些方法,插件开发能够增速许多。
JSON 操作
local delay_tab = setmetatable({data = "", force = false}, {
__tostring = function(self)
local res, err = encode(self.data, self.force)
if not res then
ngx.log(ngx.WARN, "failed to encode: ", err,
" force: ", self.force)
end
return res
end
})
-- this is a non-thread safe implementation
-- it works well with log, eg: log.info(..., json.delay_encode({...}))
function _M.delay_encode(data, force)
delay_tab.data = data
delay_tab.force = force
return delay_tab
end
设置了元表的 __tostring
方法,在字符串转换时才使用匿名函数调用 json.encode
,在日志打印时,被忽略的日志会不执行 JSON 压缩,避免额外的性能损耗。
LRU 缓存
lua-resty-lrucache 在写入时会清理 TTL 过期的缓存,读时如果数据过期了,会作为第二个参数返回:
function _M.get(self, key)
local hasht = self.hasht
local val = hasht[key]
if val == nil then
return nil
end
local node = self.key2node[key]
-- print(key, ": moving node ", tostring(node), " to cache queue head")
local cache_queue = self.cache_queue
queue_remove(node)
queue_insert_head(cache_queue, node)
if node.expire >= 0 and node.expire < ngx_now() then
-- print("expired: ", node.expire, " > ", ngx_now())
return nil, val, node.user_flags
end
return val, nil, node.user_flags
end
local function fetch_valid_cache(lru_obj, invalid_stale, item_ttl,
item_release, key, version)
local obj, stale_obj = lru_obj:get(key)
if obj and obj.ver == version then
return obj
end
-- 如果 TTL 到期的数据版本号仍一致, 重新 set 该缓存
if not invalid_stale and stale_obj and stale_obj.ver == version then
lru_obj:set(key, stale_obj, item_ttl)
return stale_obj
end
-- release 回调
if item_release and obj then
item_release(obj.val)
end
return nil
end
-- 返回创建 LRU 的匿名函数
local function new_lru_fun(opts)
local item_count, item_ttl
if opts and opts.type == 'plugin' then
item_count = opts.count or PLUGIN_ITEMS_COUNT
item_ttl = opts.ttl or PLUGIN_TTL
else
item_count = opts and opts.count or GLOBAL_ITEMS_COUNT
item_ttl = opts and opts.ttl or GLOBAL_TTL
end
local item_release = opts and opts.release
local invalid_stale = opts and opts.invalid_stale
-- 是否使用并发锁
local serial_creating = opts and opts.serial_creating
-- 参数为 LRU size
local lru_obj = lru_new(item_count)
return function (key, version, create_obj_fun, ...)
-- 不支持的 yielding 的 Nginx phase 无法使用 resty.lock
if not serial_creating or not can_yield_phases[get_phase()] then
local cache_obj = fetch_valid_cache(lru_obj, invalid_stale,
item_ttl, item_release, key, version)
if cache_obj then
return cache_obj.val
end
local obj, err = create_obj_fun(...)
if obj ~= nil then
lru_obj:set(key, {val = obj, ver = version}, item_ttl)
end
return obj, err
end
local cache_obj = fetch_valid_cache(lru_obj, invalid_stale, item_ttl,
item_release, key, version)
if cache_obj then
return cache_obj.val
end
-- 当缓存失效时获取锁
-- 创建共享内存 lock
local lock, err = resty_lock:new(lock_shdict_name)
if not lock then
return nil, "failed to create lock: " .. err
end
local key_s = tostring(key)
log.info("try to lock with key ", key_s)
-- 获取 lock
local elapsed, err = lock:lock(key_s)
if not elapsed then
return nil, "failed to acquire the lock: " .. err
end
-- 再次获取缓存
cache_obj = fetch_valid_cache(lru_obj, invalid_stale, item_ttl,
nil, key, version)
if cache_obj then
lock:unlock()
log.info("unlock with key ", key_s)
return cache_obj.val
end
local obj, err = create_obj_fun(...)
if obj ~= nil then
lru_obj:set(key, {val = obj, ver = version}, item_ttl)
end
lock:unlock()
log.info("unlock with key ", key_s)
return obj, err
end
end
这段代码关联到两个 PR:
- bugfix(lrucache): when creating cached objects, use resty-lock to avoid repeated creation.
- change: make lrucache lock optional
使用 lua-resty-lock 通过共享内存竞争锁,用在缓存中避免缓存击穿,当该 Lib 出于 Luajit 限制,无法在 init_by_lua
, init_worker_by_lua
, header_filter_by_lua
, body_filter_by_lua
, balancer_by_lua
, log_by_lua
阶段中使用。
引入的 serial_creating
属性用于判断插件是否需要启用锁。
Kong 使用的 lua-resty-mlcache 库内部也使用 resty.lock 防止缓存击穿(可选)。
后台任务
两个地方默认初始化了定时器(Nginx Timer)执行后台任务。
init_by_lua
阶段创建 OpenResty 特权进程,负责执行特定的后台任务,不会干扰其他 Worker 进程,权限相当于 root;init_by_worker
阶段创建 Background Timer,执行并发执行后台任务。
OpenResty 特权进程不能处理请求,只能由 Timer 触发,逻辑上编写 if type(ngx.process.type()) == "privileged agent"
只在特权进程中执行操作。5
Enables the privileged agent process in Nginx.
The privileged agent process does not listen on any virtual server ports like those worker processes. And it uses the same system account as the nginx master process, which is usually a privileged account like
root
.The
init_worker_by_lua*
directive handler still runs in the privileged agent process. And one can use the type function provided by this module to check if the current process is a privileged agent.6
-- worker 默认后台运行的 timer, 执行各种后台任务
local function background_timer()
if core.table.nkeys(timers) == 0 then
return
end
local threads = {}
for name, timer in pairs(timers) do
core.log.info("run timer[", name, "]")
-- 开启协程执行
local th, err = thread_spawn(timer)
if not th then
core.log.error("failed to spawn thread for timer [", name, "]: ", err)
goto continue
end
core.table.insert(threads, th)
::continue::
end
local ok, err = thread_wait(unpack(threads))
if not ok then
core.log.error("failed to wait threads: ", err)
end
end
function _M.init_worker()
local opts = {
each_ttl = 0,
sleep_succ = 0,
check_interval = check_interval, -- 默认间隔为 1 秒
}
local timer, err = core.timer.new("background", background_timer, opts)
if not timer then
core.log.error("failed to create background timer: ", err)
return
end
core.log.notice("succeed to create background timer")
end
APISIX 引入特权进程的一个目的在于实现 Log Rotate 插件功能。
请求生命周期
ctx
Use
ngx.ctx
wherever you can.ngx.var
is much more expensive and is also limited to string values. The latter should only be used to exchange data with other nginx C modules.7
APISIX 中使用缓存 ngx.var
获取的结果, 在不同生命周期中传递。使用 lua-var-nginx-module Nginx C 模块和 FFI 获取变量,在没有开启 Nginx C 模块的情况下回退到 ngx.var
方式获取。APISIX 默认没有在构建脚本中加载 C 模块,提交的 PR feat: add lua-var-nginx-module 在编译 OpenResty 时添加了该模块。
function _M.set_vars_meta(ctx)
-- 从 table 池中获取/创建一个 hash 长度为 32 的 table
local var = tablepool.fetch("ctx_var", 0, 32)
if not var._cache then
var._cache = {}
end
-- 通过 resty.core.base 获取原始 request C 指针 (?)
-- ref: https://github.com/openresty/lua-resty-core/blob/master/lib/resty/core/base.lua
var._request = get_request()
-- 绑定元表
setmetatable(var, mt)
-- 缓存到 ngx ctx 中
ctx.var = var
end
使用 tablepool 从 Lua table 池中获取 table,避免频繁分配内存。
do
-- 获取特殊 var 类型的方法
local var_methods = {
method = ngx.req.get_method,
-- ref: https://github.com/cloudflare/lua-resty-cookie
cookie = function () return ck:new() end
}
local ngx_var_names = {
upstream_scheme = true,
upstream_host = true,
...
var_x_forwarded_proto = true,
}
local mt = {
-- 重载 hash 元方法
-- t 是 self
__index = function(t, key)
-- 若 cache table 存在直接返回
local cached = t._cache[key]
if cached ~= nil then
return cached
end
if type(key) ~= "string" then
error("invalid argument, expect string value", 2)
end
local val
-- 如果是特殊类型, 使用特定方法获取
local method = var_methods[key]
if method then
val = method()
elseif core_str.has_prefix(key, "cookie_") then
-- 通过 var_methods 访问到 resty.cookie
local cookie = t.cookie
if cookie then
local err
val, err = cookie:get(sub_str(key, 8))
if not val then
log.warn("failed to fetch cookie value by key: ",
key, " error: ", err)
end
end
elseif core_str.has_prefix(key, "http_") then
key = key:lower()
key = re_gsub(key, "-", "_", "jo")
-- 最终通过 ngx.var 获取
val = get_var(key, t._request)
elseif core_str.has_prefix(key, "graphql_") then
-- trim the "graphql_" prefix
key = sub_str(key, 9)
val = get_parsed_graphql(t)[key]
elseif key == "route_id" then
val = ngx.ctx.api_ctx and ngx.ctx.api_ctx.route_id
elseif key == "service_id" then
val = ngx.ctx.api_ctx and ngx.ctx.api_ctx.service_id
elseif key == "consumer_name" then
val = ngx.ctx.api_ctx and ngx.ctx.api_ctx.consumer_name
elseif key == "route_name" then
val = ngx.ctx.api_ctx and ngx.ctx.api_ctx.route_name
elseif key == "service_name" then
val = ngx.ctx.api_ctx and ngx.ctx.api_ctx.service_name
elseif key == "balancer_ip" then
val = ngx.ctx.api_ctx and ngx.ctx.api_ctx.balancer_ip
elseif key == "balancer_port" then
val = ngx.ctx.api_ctx and ngx.ctx.api_ctx.balancer_port
else
val = get_var(key, t._request)
end
if val ~= nil then
t._cache[key] = val
end
-- 为空返回 nil
return val
end,
__newindex = function(t, key, val)
if ngx_var_names[key] then
ngx_var[key] = val
end
-- log.info("key: ", key, " new val: ", val)
t._cache[key] = val
end,
}
部分 APISIX 路由匹配的内部参数在其他阶段注入。
headers
-- 用 ngx.ctx table 缓存 headers, 避免再进行一次 ffi 调用
local function _headers(ctx)
if not ctx then
ctx = ngx.ctx.api_ctx
end
local headers = ctx.headers
if not headers then
headers = get_headers()
ctx.headers = headers
end
return headers
end
用到了上述的 ctx 库。
- 点赞
- 收藏
- 关注作者
评论(0)