随着微服务场景的广泛应用,前端经常需要访问多个后端微服务,这时候往往需要一个API网关对请求做一些通用处理。通用处理指的是由网关层去实现一些非业务类的功能,比如负载均衡、权限校验、频率限制、协议转换、日志监控、缓存管理、熔断降级等,将这些通用功能交给网关层统一实现,比起各个业务自己分别实现会更合适。
API网关用于提供 API的完整生命周期管理,目前市面上流行的API网关有Kong、Tyk、Traefik、Zuul、APISIX、Ambassador等,从成熟度、性能和扩展性的角度来看,Kong都是一个较好的选择。Kong本身基于Nginx内核,异步性能好,支持集群部署,社区版免费插件多,同时支持开发者自己开发插件注入请求生命周期以完成想要的功能。
Kong的架构图:
从Kong的架构图中,可以看到Nginx和OpenResty的存在。OpenResty是以 Nginx 为核心的 Web 开发平台,内部包含lua-nginx-module,集成了大量精良的 Lua 库,开发人员可以使用 Lua 脚本调动各类C和Lua 模块。OpenResty目标是让Web服务直接跑在 Nginx 服务内部,利用 Nginx 的非阻塞I/O模型实现高性能响应。而Kong 是OpenResty的一个应用程序,具有路由转发和API管理功能,扩展性好(支持实例横向扩展,单机和集群部署均可),灵活性好(支持公有云/私有云,多种操作系统等),模块性好(已有插件任意选,提供插件开发套件支持开发自定义插件),生态也比较好(全球5000+公司和组织在使用,官方文档内容详细,社区版功能比较完善,也有企业版可以提供更多升级支持等)。
关于Kong的安装方式和基本概念,推荐直接去看Kong官方文档,介绍和示例非常清晰,可以很快上手。本文假定读者对Kong的service、route、consumer等概念有所了解,实际运行过Kong并配置过http/grpc服务的路由转发(如果没有欢迎先根据官方文档动手试试看),在此之上介绍如何开发自定义插件,这里将介绍如何开发一个配合官方频率限制插件使用的token鉴权插件。
Kong的核心是实现数据库抽象,路由和插件管理。插件由Lua模块组成,这些模块通过插件开发套件(Plugin Development Kit,简称PDK)与请求/响应对象或流进行交互,以实现任意逻辑。 PDK是一组Lua函数,可以使用它来实现插件与Kong的核心组件之间的交互。 插件可以存在于单独的代码库中,并且可以通过几行代码注入到请求生命周期的任何位置。
在Kong源码的插件目录中,可以看到有一个base_plugin.lua的文件,该文件里定义了一个基类BasePlugin,以及该基类所拥有的一些方法。所有插件都从基类BasePlugin继承而来,开发者可以根据插件自身的需求选择重写某些方法,这些方法实际上对应了OpenResty 的不同执行阶段。
对于HTTP/HTTPS 请求,有以下可以利用的请求生命周期上下文阶段及处理函数:
函数名 | 对应阶段 | 描述 |
---|---|---|
:init_worker() | init_worker | 在每次Nginx worker进程启动时执行 |
:certificate() | ssl_certificate | 在SSL握手的SSL证书服务阶段执行 |
:rewrite() | rewrite | 在每个请求的重写阶段执行 |
:access() | access | 在每个请求被代理到上游服务之前执行 |
:header_filter() | header_filter | 当已从上游服务接收到所有响应头字节时执行 |
:body_filter() | body_filter | 对从上游服务接收到的响应主体的每个块执行 |
:log() | log | 当最后一个响应字节已发送到客户端时执行 |
对于TCP stream连接,有以下可以利用的请求生命周期上下文阶段及处理函数:
函数名 | 对应阶段 | 描述 |
---|---|---|
:init_worker() | init_worker | 在每次Nginx worker进程启动时执行 |
:preread() | preread | 每个连接执行一次 |
:log() | log | 关闭每个连接后执行一次 |
一个完整的插件目录结构如下:
complete-plugin
├── api.lua
├── daos.lua
├── handler.lua
├── migrations
│ ├── cassandra.lua
│ └── postgres.lua
└── schema.lua
各模块的功能如下:
模块名 | 是否必须 | 描述 |
---|---|---|
api.lua | 否 | 定义可在Admin API中使用的插件endpoints列表 |
daos.lua | 否 | 数据层相关,当插件需要访问数据库时配置,这些dao是插件所需的自定义实体的抽象 |
handler.lua | 是 | 插件的主要逻辑,每个功能都应由Kong在请求/连接生命周期的所需时刻运行 |
migrations/*.lua | 否 | 插件依赖的数据表结构,启用了 daos.lua 时需要定义 |
schema.lua | 是 | 插件的配置参数定义,主要用于 Kong 参数验证 |
其中handler.lua和schema.lua是必须的,一个简单的插件只需要包含这两个lua文件即可。handler.lua负责实现BasePlugin的子类及对应方法,完成插件主逻辑。schema.lua定义了插件所需的用户自定义参数。
插件开发的流程可以简述为:
-v /data/kong_test/custom-plugin:/etc/kong/plugins/custom-plugin
3. kong提供了一个默认的配置文件,位于/etc/kong/kong.conf.default。kong在开始时,会查找可能包含配置文件的几个默认位置:
/etc/kong/kong.conf/etc/kong.conf
这里将主机上自己的kong.conf挂载到容器里。首先修改kong.conf这两项内容:
> plugins = bundled,custom-plugin
> lua\_package\_path = ./?.lua;./?/init.lua;/etc/?.lua;
这里/etc/?.lua
就是我的插件路径 (Kong requires kong.plugins.custom-plugin.handler, which translates to /etc/kong/plugins/custom-plugin/handler.lua)
然后挂载配置文件:
-v /data/kong_test/kong.conf:/etc/kong/kong.conf
4. 如果用到了custom_nginx.template,运行命令中使用--nginx-conf带上custom_nginx.template。
在tke中部署也类似,把插件文件和配置文件加到ConfigMap,再添加挂载点映射和启动环境变量。
这是一个实际项目中的场景,基于Kong开发一个token鉴权插件,从请求的query中取出token,带token向后端服务请求校验,将校验后的身份参数设置到header,同时针对身份信息进行接口调用次数的频率控制。
鉴权部分的实现步骤如下:
以检查企业token为例,后端token验证的接口示例如下:
请求包体:
{
"access_token":"********"
}
返回包体:
{
"errcode":0 ,
"errmsg":"ok" ,
"corpid":"xxxxxx",
"suite_id":"xxxxxx",
"expire_time": 7200 // token剩余时间, 网关可缓存token和相关的corpid, suitie_id
}
为了减小后端服务的压力,我们需要对token进行缓存。Kong基于OpenResty,OpenResty有两类缓存:Lua LRU cache和Lua shared dict。Lua shared dict(简称shm)使用共享内存,每次操作都是全局锁,高并发环境下不同 worker 之间容易引起竞争, 单个Lua shared dict不宜过大。Lua LRU cache在worker 内使用,不会触发锁机制,效率上有优势,但不同 worker 之间数据不同享,同一数据可能被冗余缓存。因为希望缓存数据可以在nginx所有worker之间共享,这里选择了Lua shared dict。
Kong源码中预留了一部分shm,比如给频控插件使用的kong_rate_limiting_counters,给全局使用的kong_db_cache等shm。由于不想和其他插件或者模块抢shm,这里单独设置token所需的shm。单独设置shm需要在custom_nginx.template完成:
custom_nginx.template用于设置kong.conf所不能满足的nginx配置,一般的启动方式是:
kong start -c kong.conf --nginx-conf custom\_nginx.template
如果是tke部署,可以将该文件映射到容器某个目录,然后在运行参数中使用。
因为验证token的步骤是在请求达到后端服务之前完成,所以这里我们会重写access()函数:
function TokenAuthHandler:access(conf)
-- prevent requests set suite_id or corpid by themselves
kong.service.request.clear_header(SUITE_ID)
kong.service.request.clear_header(CORPID)
-- get query parameter
local access_token = kong.request.get_query_arg(ACCESS_TOKEN)
local suite_access_token = kong.request.get_query_arg(SUITE_ACCESS_TOKEN)
-- check access_token
local username
if access_token then
username = verify_token(conf, access_token, false)
elseif suite_access_token then
username = verify_token(conf, suite_access_token, true)
else
-- if the request does not carry a token, non-whitelist requests will return 403 in the future
return
end
-- rate limit logic
...
end
其中verify_token
的实现如下:
local function verify_token(conf, token, is_suite)
local username, cache
local data_obj, data_json, ttl
local succ, err
if is_suite then
cache = shm_suite
else
cache = shm
end
data_json, err = cache:get(token)
if err then
kong.log.err("shm get token err:", err)
end
if data_json then
kong.log.info("Hit cache:", data_json)
data_obj = cjson.decode(data_json)
else
kong.log.info("No hit cache, start sending http request to verify token...")
data_obj, ttl = http_verify(conf, token, is_suite)
data_json = cjson.encode(data_obj)
succ, err = cache:set(token, data_json, ttl)
if not succ then
kong.log.err("shm set token err:", err)
end
end
username = data_obj.suite_id
kong.service.request.add_header(SUITE_ID, data_obj.suite_id)
if not is_suite then
username = username .. "-" .. data_obj.corpid
kong.service.request.add_header(CORPID, data_obj.corpid)
end
return username
end
如果没有命中cache,则会调用http_verify
向后端请求token验证,后端验证的接口路径参数在schema.lua中定义:
local typedefs = require "kong.db.schema.typedefs"
return {
name = "token-auth",
fields = {
{ protocols = typedefs.protocols_http },
{ config = {
type = "record",
fields = {
{ access_token_endpoint = typedefs.url({ required = true }) },
{ suite_access_token_endpoint = typedefs.url({ required = true }) },
{ timeout = { type = "number", default = 5000 }, },
},
},
},
},
}
http_verify
就是简单的http请求过程:
local function http_verify(conf, token, is_suite)
-- determine the parameters of different types of tokens
local url, body, err_msg
if is_suite then
url = conf.suite_access_token_endpoint
body = cjson.encode({ suite_access_token = token, })
err_msg = ERRORS_MASSAGE_SUITE
else
url = conf.access_token_endpoint
body = cjson.encode({ access_token = token, })
err_msg = ERRORS_MASSAGE
end
-- send http request
local httpc = http.new()
httpc:set_timeout(conf.timeout)
-- httpc:request_uri(uri, params) will manage connection pool inside, and you can also set keepalive/keepalive_timeout/keepalive_pool in params
local res, err = httpc:request_uri(url, {
method = "POST",
headers = { ["Content-Type"] = "application/json" },
body = body
})
-- handle err results
local err_resp = {}
if err ~= nil then
kong.log.err("send request err:", err)
err_resp.errcode = ERRORS.INTERNAL_ERROR
err_resp.errmsg = err_msg[err_resp.errcode]
return kong.response.exit(403, err_resp, { ["Content-Type"] = "application/json" })
end
if res.status ~= 200 then
kong.log.err("response http status err:", res.status)
err_resp.errcode = ERRORS.HTTP_STATUS_ERROR
err_resp.errmsg = err_msg[err_resp.errcode]
return kong.response.exit(403, err_resp, { ["Content-Type"] = "application/json" })
end
local json = cjson.decode(res.body)
if json.errcode ~= 0 then
kong.log.err("token invalid, errcode:", json.errcode, " errmsg:", json.errmsg)
err_resp.errcode = ERRORS.INVALID_TOKEN
err_resp.errmsg = err_msg[err_resp.errcode]
return kong.response.exit(403, err_resp, { ["Content-Type"] = "application/json" })
end
-- success result
local data = {}
data.suite_id = json.suite_id
if not is_suite then
data.corpid = json.corpid
end
return data, json.expires_in
end
两类token错误码收拢:
local ERRORS = {
INVALID_TOKEN = 1,
INTERNAL_ERROR = 2,
HTTP_STATUS_ERROR = 3,
MISSING_TOKEN_PARAMETER = 4,
}
local ERRORS_MASSAGE = {
[ERRORS.INVALID_TOKEN] = "Invalid access token",
[ERRORS.INTERNAL_ERROR] = "Check access token internal error",
[ERRORS.HTTP_STATUS_ERROR] = "Check access token not 200",
}
local ERRORS_MASSAGE_SUITE = {
[ERRORS.INVALID_TOKEN] = "Invalid suite access token",
[ERRORS.INTERNAL_ERROR] = "Check suite access token internal error",
[ERRORS.HTTP_STATUS_ERROR] = "Check suite access token not 200",
}
这部分的需求是实现对请求的频率限制,限制的维度是suiteid或corpid+suiteid。如果请求携带access_token,则在corpid+suiteid维度进行频控,如果请求携带suite_access_token,则在suiteid维度进行频控。
由于看到Kong社区版已经有成熟的频率控制插件rate-limiting,因此这里考虑如何把现有的插件利用起来,同时满足我们的频控条件。rate-limiting的频控维度是service/route/consumer这三者之一,我们可以利用consumer这个维度,把频控参数1 + 频控参数2 + ... + 频控参数n
这些参数的组合当做一个特定的consumer,设置为consumer的username(该字段为unique key)。简单来说就是**结合Kong社区版频控插件,写一个插件运行在官方频控插件之前,根据我们的频控需求设置所需参数。** 由于鉴权和频控密不可分,因此这里的插件和上面的鉴权插件是同一个插件,只是加上了设置频控参数的逻辑。
官网文档上说使用consumer时必须先使用认证类插件(如basic auth插件,hmac auth插件),在阅读rate-limiting插件和认证类插件的源码后发现,认证类插件会在认证consumer身份后,对nginx上下文中的authenticated_consumer进行设置,说明请求是哪个consumer发出的,之后的频控插件才能根据具体的consumer进行频率限制。因此我们要做的是鉴权通过后将鉴权信息组合起来作为consumer object的username(username字段可由用户自己设置,但必须是唯一的,consumer的具体定义见 这里 )。用username先去cache和db查找改consumer是否存在,如果存在就直接设置在ctx里,如果不存在,这里选择了默认创建的方式将consumer存入db,同时设置到ctx(期望即使没有先在konga页面上主动创建consumer也能够正常调用接口,属于静默创建用户模式)。
这里有几个问题需要注意:
UNIQUE\_VIOLATION
,代表插入冲突,这时会进行二次查询,保证流程正常执行。 kong.cache:get(key, opts, cb, ...)
会先在L1和L2 cache中查找,如果找不到会调用传入的回调函数cb
进行查询(我们这里的回调函数实现去db查询)。这里需要注意,kong.cache:get
如果在缓存中没有找到,如果回调函数不在第二个回参返回错误,则会把在db查到的值存入缓存。那么当第一次consumer还未创建时,缓存没有值,db也没有值,就会把一个value为空table的键值对存入缓存。如果缓存失效时间较长(默认失效时间是永不过期),就会导致按照key去cache查时永远可以查到该consumer,但是其value是空。这种设计是本身是合理的,在db确实没有数据时用缓存的空值以挡住对db的无效请求。但是在我们这种会静默创建用户的情形下,如果cache和db查询失败,则会在db创建consumer,因此不期望在第一次请求时将空值存入缓存。我们可以在查询db的时候判断查询到的值是否为空,为空就主动返回错误,避免kong.cache:get
把negative results设置到cache。如果查询的结果为空但不想返回错误,也可以在kong.cache:get
后判断查询的结果是否为空,为空则执行kong.cache:invalidate
让该negative results失效。这里采用第一种做法。官方文档上说回调函数只能有一个返回值被捕获,但阅读lua-resty-mlcache 源码发现回调函数cb
是使用xpcall
函数去执行的,如下:
local pok, perr, err, new_ttl = xpcall(cb, traceback, ...)
if not pok then
return unlock_and_ret(lock, nil, "callback threw an error: " ..
tostring(perr))
end
xpcall
的返回值有4个,除了第一个pok
代表回调函数的执行结果(true/false,如果抛出error就是false),剩余的3个参数 perr, err, new_ttl 也都会被处理。因此我们完全可以在回调函数的第2个参数返回错误,来阻止negative results被设置到cache。这样在第一次insert consumer后,第二次查询时cache里就不会有空值的缓存,会执行回调函数从db加载新值并设置到缓存,这样第三次就可以从缓存读到值。
access()中频控相关的主逻辑如下:
function TokenAuthHandler:access(conf)
-- check token logic
...
-- select consumer
local cache_key = kong.db.consumers:cache_key(username)
local consumer = select_consumer(username, cache_key)
-- insert consumer
if consumer == nil then
consumer = insert_consumer(username, cache_key)
end
-- set authenticated consumer for rate limiting plugin
kong.log.info("authenticated_consumer:", consumer.username)
ngx.ctx.authenticated_consumer = consumer
查询consumer逻辑如下,先查缓存再查db:
-- Select consumer from db
local function load_consumer_from_db(username)
local consumer, err = kong.db.consumers:select_by_username(username)
if err then
error(err, 2)
end
if not consumer then
return nil, DB_NOT_FOUND
end
return consumer, nil
end
-- Select consumer from cache and db
-- Order: lru -> shm -> callback
local function select_consumer(username, cache_key)
-- if it is not retrieved in the cache, cache:get performs the callback function and sets the result to the cache (if the callback function does not return an error)
-- to avoid caching negative results(nil), if not found in db, an error should be returned
local consumer, err = kong.cache:get(cache_key, nil, load_consumer_from_db, username)
if err then
kong.log.err(err)
end
return consumer
end
查询失败则在db创建consumer:
-- Handle concurrent write conflicts
local function handle_error(err_t, username, cache_key)
if type(err_t) ~= "table" then
kong.log.err(err_t)
return nil
end
if err_t.code == Errors.codes.UNIQUE_VIOLATION then
return select_consumer(username, cache_key)
end
return nil
end
-- Insert consumer
local function insert_consumer(username, cache_key)
local consumer, err, err_t
local dao = kong.db["consumers"]
local args = { username = username }
consumer, err, err_t = dao["insert"](dao, args, nil)
if consumer == nil and err_t ~= nil then
kong.log.err(err_t)
consumer = handle_error(err_t, username, cache_key)
end
return consumer
end
最后将consumer.username设置到ngx.ctx.authenticated_consumer供官方频控插件以consumer模式生效。
另外,插件的执行顺序也是一个需要注意的问题。一些插件可能依赖于其他插件的执行来执行某些操作, 例如依赖于使用者身份的插件必须在身份验证插件之后运行。 考虑到这一点,Kong定义了插件执行之间的优先级,以确保遵守顺序。插件执行顺序可以通过handler table里的PRIORITY属性去定义,PRIORITY的值越大执行顺序越靠前。因为官方rate-limiting插件的优先级是901,因此我们的鉴权插件优先级可以设置大一点,比如1000,保证在rate-limiting之前运行。
Kong本身是Lua实现的,因为想要尝试下新语言,于是这次插件开发前朴素地学习了下Lua,使用的PDK也是Lua版本。但其实Kong 2.0也推出了go版本的PDK,Go技术栈的同学也可以直接编写Go插件。另外这次插件开发使用的ide是IDEA+EmmyLua,在用EmmyLua的过程中遇到过不能正确跳转到函数定义的现象,后来发现可以通过写注解增加插件的提示性,遇到类似问题的同学可以试试看。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。