最近的主要工作是包装AI 算法,使之成为算法服务集群。说白了就是包装若干算法能力,提供远程调用接口,供各个调用方来调用。算法主要是媒体资源的处理,包括打标签、媒体资源质量提升(分辨率提升、画面质量提升) 算法模块比较多,大约20个左右。
因项目比较紧急,所以起初通过搬砖模式完成了3个模块的落地,落地也没什么架构可言,就像下面这样:
(PS:因项目比较紧急,所以前期并没有做细致的容量性能评估,只能后续优化了,当前情况下先上能力比什么都重要,如果是比较大的项目,或者要求服务稳定性、性能高的服务,必须要做容量性能评估,才好确认什么是好的架构,而且这个细致的过程需要经过技术团队的评审,并且需要预留充足的评审时间,否则加班是难免的)
详细解释:
客户端请求
-> 请求参数处理
-> 下载待分析资源
-> m_srv处理
-> 吐出结果
-> 回调通知调用方处理结果
在包装了3个模块之后,个人发现:不考虑服务架构上面的问题
在紧急上线了3个模块之后,我对上述无脑式的垒代码的工作方式产生了反思。如何解决上面2个问题,是我接下来最需要优先解决的。否则,大概率陷入到苦力模式,陷入泥潭。
总结出了上述2个问题,接下来的事情就是一一分析、找解决方案了:
if...else
来实现,实现起来也没问题,但是会带来如下问题:
if...else
代码块 4中的2个问题有没有解决方案呢?有的,那就是通过策略设计模式来实现。
class AbstractHandler:
def __init__(self, next_handler=None):
self.next_handler = next_handler
def _judge(self, request):
pass
def _handler(self):
# 处理请求,吐出结果
pass
def handler(self, request):
if judgement(request):
self._handler(request):
else:
if self.next:
return self.next.handler(request)
return None
class Msr1Handler(AbstractHandler):
def __init__(self):
super().__init__()
def _judge(self, request):
return True if request > 10 else False
def _handler(self):
print('Msr1Handler handled this request')
class Msr2Handler(AbstractHandler):
def __init__(self):
super().__init__()
def _judge(self, request):
return True if request < 10 else False
def _handler(self):
print('Msr2Handler handled this request')
... 简介下上述3个文件充当的角色:
AbstractHandler
-策略契约定义者,其中定义了处理过程,如果当前遵守契约的处理单元不能处理此请求,则交由下一个处理单元去处理。Msr1Handler
& Msr2Handler
- 遵守(extends)策略契约的处理单元,其中只定义了 _judge()
判定自己可处理请求、和 _handler()
处理请求的核心逻辑,但是其具有 handel()
方法,因为通过 classMsr2Handler(AbstractHandler):
继承而来。AbstractHandlernext属性
- next把遵守契约的处理单元串成链表,这样请求就可以在这条链上传递,直至被某个处理单元处理。
优化后的代码结构
简单解释下:
common: 封装了常用操作,如下载资源、生成输出文件存储目录、下载文件重命名等功能‘
exception: 自定义的业务逻辑异常
handler_engine: Ai 模块处理引擎仓库,可实现平铺式新增能力。
module_handler_list: 模块引擎初始化入口
workflow: 一个统一化接口,接收请求,并路由请求到处理引擎。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
# @Time : 2020/4/4 下午8:32
# @Author :
# @Site :
# @File : handler.py
# @Software: PyCharm
# 抽象处理逻辑为职责链,通过外部请求参数控制链节点进行请求处理
eg: xx?module_name=A -> moduleA.handler()
"""
import json
import functools
print = functools.partial(print, flush=True)
from concurrent.futures import ThreadPoolExecutor
from config.ai_module_config import all_modules
from common.common_handler_util import *
from exception.async_handler_exception import AsyncHandlerException
EXECUTOR = ThreadPoolExecutor(8)
class AbstractRequestEntity():
"""请求实例化为entity"""
class AbstractModuleHandler:
"""
抽象模型处理器
"""
def __init__(self):
self.fill_attr()
self.next_module_handler = None
def _fill_attr(self, attr_tuple):
"""
动态填充属性 辅助函数
:param attr_tuple: (key,val) = self.$key = $val
:return:
"""
if not isinstance(attr_tuple, tuple):
return False, 'dynamic attr not match format,please give attr like (key,val)'
attr_name, attr_val = attr_tuple
if getattr(self, attr_name, None):
return
setattr(self, attr_name, attr_val)
def fill_attr(self):
current_module = all_modules.get(self._module_name, {})
for module in current_module.items():
self._fill_attr(module)
return
def _check_request_param(self, request):
if self._module_name != request.get('module_name', ''):
''' judge request module name '''
return False, 'this module not the correct module handler'
for param in request.items():
param_name = param[0]
request_params = getattr(self, 'request_params', None)
if not request_params:
return False, 'this module don\'t need request params!'
if param_name not in request_params:
return False, 'this request is not legitimate'
return True, 'check request params success'
def check_request_params(self, request):
if not request:
return False, 'request params not be none!', ''
if not isinstance(request, dict):
return False, 'request params\'s type must be map', ''
return self._check_request_param(request), ''
def _async_prev_handler(self, request):
"""
前置处理器
:param request:
:return:
"""
local_file_path = ''
output_root_dir = ''
source_url = request.get('source_url', {})
download_flag = getattr(self, 'download_flag')
store_local_flag = getattr(self, 'store_local_flag')
allow_media_type = getattr(self, 'allow_media_type')
source_type = source_url.split('.')[-1]
origin_file_name = source_url.split('/')[-1]
# 验证资源
if source_type not in allow_media_type:
raise AsyncHandlerException(res_code=-1, res_msg='source type not allow')
# 下载资源
if download_flag:
try:
print(source_url)
ret, msg, local_file_path, gen_date = download_remote_source(remote_uri=source_url,
file_name=origin_file_name,
module_name=self._module_name)
except Exception as err:
raise AsyncHandlerException(res_code=-2,
res_msg='download remote source has occurred failed,detail=%s' % str(err))
# 创建输出文件夹
if download_flag and store_local_flag:
output_root_dir = gen_store_root_dir(gen_date, 'output', self._module_name)
return local_file_path, output_root_dir, request
def _async_core_ability(self, *args):
"""
核心能力
:return:
"""
def _async_post_handler(self, *args):
"""
核心处理器完成之后,进行后续处理 如 upload cdn
:return:
"""
def _async_after_completion(self, *args):
"""post handler 完成后,进行回调通知 or 资源销毁等动作"""
print(args)
ret_data, request = args
ret_data['data']['extra'] = request.get('extra')
call_back_url = request.get('call_back_url')
payload = "{\"bussiness_data\": %s\n}" % json.dumps(ret_data)
headers = {
'content-type': "application/json"
}
response = requests.request("POST", call_back_url, data=payload, headers=headers)
print(response.text)
def async_handler(self, request):
try:
# 异步前置处理
args = self._async_prev_handler(request)
# 异步核心能力调用
args = self._async_core_ability(args)
# 异步后置处理,如上传cdn
ret_data = self._async_post_handler(args)
except Exception as err:
if isinstance(err, AsyncHandlerException):
ret_data = err.res_data if err.res_data else {"res_code": err.res_code, "err_msg": err.res_msg}
else:
ret_data = {"res_code": -10,
"err_msg": "server has occurred error,detail=%s,please call bofengliu" % str(err)}
finally:
# 异步清理资源,回调处理,没想好怎么写
self._async_after_completion(ret_data, request)
def handle(self, request):
"""
核心处理器
:param request:
:return:
"""
ret, msg = self._check_request_param(request)
if not ret:
if self.next_module_handler:
return self.next_module_handler.handle(request)
return json.dumps(
{"res_code": -6, "err_msg": "request params not match %s required params" % self._module_name})
# 执行之前先返回,确认接受参数的响应
EXECUTOR.submit(self.async_handler, request)
return json.dumps({"res_code": 0, "err_msg": "received %s task success, running now!" % self._module_name})
最重要的是 handler()
& async_handler()
方法,在这个类中,我们定义了处理请求的核心接口,其他具体实现这个抽象契约类的子类,只需要关注 _async_core_ability()
和 _async_post_handler()
后置处理了。
@app.route('/module/v1/api', methods=['POST'])
def dispatcher_handler():
"""
统一网管
:return:
"""
if 'POST' != request.method:
return jsonify({
"res_code": 1,
"err_msg": "request method must be POST"
})
request_params = request.get_json()
return all_module_handlers.handle(request_params)
采用策略模式后,如果新上线一个模型,我们只需要新建一个继承策略类的module类,且实现 _async_core_ability()
和 _async_post_handler()
就可以了,接口完全不需要改变。工作效率提升 70% 没有问题,至此就已经逃离了苦力模式,美滋滋。