如何为算法团队提供高效的工程化上云支持是云原生时代一个很重要的也很有意义的课题,现在开源社区比较完善的应该是 Kubeflow —— 一系列 ML 实验部署环境工具的集合,不过整体来看比较笨重,不适合小团队生产环境快速落地,这里基于 kubevela 和 kfserving 实现一个算法标准化模型的例子,供参考。
项目地址:https://github.com/shikanon/vela-example/tree/main/example/sklearnserver
通过 kubevela 提供了三种对象 mpserver, hpa, httproute。
由于使用到vela
,所以需要先下载vela
客户端
案例放在 exmaple/sklearnserver
下面。
# 编译
docker build -t swr.cn-north-4.myhuaweicloud.com/hw-zt-k8s-images/sklearnserver:demo-iris -f sklearn.Dockerfile .
docker login swr.cn-north-4.myhuaweicloud.com
docker push swr.cn-north-4.myhuaweicloud.com/hw-zt-k8s-images/sklearnserver:demo-iris
demo-iris-01.yaml
的应用文件name: demo-iris-01
services:
demo-iris:
type: mpserver
image: swr.cn-north-4.myhuaweicloud.com/hw-zt-k8s-images/sklearnserver:demo-iris
ports: [8080]
cpu: "200m"
memory: "250Mi"
httproute:
gateways: ["external-gateway"]
hosts: ["demo-iris.rcmd.testing.mpengine"]
servernamespace: rcmd
serverport: 8080
hpa:
min: 1
max: 1
cpuPercent: 60
因为这里使用的是rcmd
命名空间,在创建的时候需要切换,可以通过vela dashboard 通过可视化界面创建一个 rcmd
命名空间的环境:
vela dashboard
成功后可以通过vela env
查看:
$ vela env ls
NAME CURRENT NAMESPACE EMAIL DOMAIN
default default
rcmd * rcmd
$ vela up -f demo-iris-01.yaml
Parsing vela appfile ...
Load Template ...
Rendering configs for service (demo-iris)...
Writing deploy config to (.vela/deploy.yaml)
Applying application ...
Checking if app has been deployed...
App has not been deployed, creating a new deployment...
✅ App has been deployed ???
Port forward: vela port-forward demo-iris-01
SSH: vela exec demo-iris-01
Logging: vela logs demo-iris-01
App status: vela status demo-iris-01
Service status: vela status demo-iris-01 --svc demo-iris
部署好后可以测试:
$ curl -i -d '{"instances":[[5.1, 3.5, 1.4, 0.2]]}' -H "Content-Type: application/json" -X POST demo-iris.rcmd.testing.mpengine:8000/v1/models/model:predict
{"predictions": [0]}
kfserver 提供了多种常用框架的 server,比如 sklearn, lgb, xgb, pytorch 等多种服务的 server 框架, kfserver 基于 tornado 框架进行开发,其提供了 模型加载,接口健康检测,预测及 参考解释等多个抽象接口,详细见kfserving/kfserving/kfserver.py
:
...
def create_application(self):
return tornado.web.Application([
# Server Liveness API returns 200 if server is alive.
(r"/", LivenessHandler),
(r"/v2/health/live", LivenessHandler),
(r"/v1/models",
ListHandler, dict(models=self.registered_models)),
(r"/v2/models",
ListHandler, dict(models=self.registered_models)),
# Model Health API returns 200 if model is ready to serve.
(r"/v1/models/([a-zA-Z0-9_-]+)",
HealthHandler, dict(models=self.registered_models)),
(r"/v2/models/([a-zA-Z0-9_-]+)/status",
HealthHandler, dict(models=self.registered_models)),
(r"/v1/models/([a-zA-Z0-9_-]+):predict",
PredictHandler, dict(models=self.registered_models)),
(r"/v2/models/([a-zA-Z0-9_-]+)/infer",
PredictHandler, dict(models=self.registered_models)),
(r"/v1/models/([a-zA-Z0-9_-]+):explain",
ExplainHandler, dict(models=self.registered_models)),
(r"/v2/models/([a-zA-Z0-9_-]+)/explain",
ExplainHandler, dict(models=self.registered_models)),
(r"/v2/repository/models/([a-zA-Z0-9_-]+)/load",
LoadHandler, dict(models=self.registered_models)),
(r"/v2/repository/models/([a-zA-Z0-9_-]+)/unload",
UnloadHandler, dict(models=self.registered_models)),
])
...
这里我们使用的 sklearn server 的案例主要实现了 predict
接口:
import kfserving
import joblib
import numpy as np
import os
from typing import Dict
MODEL_BASENAME = "model"
MODEL_EXTENSIONS = [".joblib", ".pkl", ".pickle"]
class SKLearnModel(kfserving.KFModel): # pylint:disable=c-extension-no-member
def __init__(self, name: str, model_dir: str):
super().__init__(name)
self.name = name
self.model_dir = model_dir
self.ready = False
def load(self) -> bool:
model_path = kfserving.Storage.download(self.model_dir)
paths = [os.path.join(model_path, MODEL_BASENAME + model_extension)
for model_extension in MODEL_EXTENSIONS]
for path in paths:
if os.path.exists(path):
self._model = joblib.load(path)
self.ready = True
break
return self.ready
def predict(self, request: Dict) -> Dict:
instances = request["instances"]
try:
inputs = np.array(instances)
except Exception as e:
raise Exception(
"Failed to initialize NumPy array from inputs: %s, %s" % (e, instances))
try:
result = self._model.predict(inputs).tolist()
return {"predictions": result}
except Exception as e:
raise Exception("Failed to predict %s" % e)