资损通常来讲是指支付场景下的资金损失,这里可以从两个维度看
比如对于一个电商业务可能涉及到上图各种业务,业务之间都存在下发、回调、消息传递等各种逻辑或者状态同步,如果其中由于一些交互操作丢失导致库存异常、资金结算换算异常、单据流程未结束异常、逻辑触发重复请求,并发控制处理不当等等,最终导致资产或资金的损失。
对于情况预防手段除了事前的一些列严格测试,事后分析的优化补救,也可以通过事中进行监控,得物在这方面就是通过自研的 DCheck 进行防控。
此系统由“交易 &稳定性”团队主导的,主要是希望及时的发现数据的问题,保障数据稳定的运行,尤其是涉及到资损的场景,为了做到实时有效的监控,在此背景下搭建了准实时核对系统 DCheck,此平台基于 Mysql 的 binglog 监控,以及 MQ 的订阅的信息流技术手段,通过配置触发条件、规则和任务运行以及告警,来确保各个业务上下游业务间的状态一致性,进出扣款金额计算后的准确性。
主题:逻辑数据库 或 MQ 订阅
事件:Update/ Insert 类型操作 或 MQ 自定义消
子事件:事件所返回的数据的第一层过滤
脚本池:有 filter 和 check 两个继承方法,filter 处理二层数据过滤,check 业务上下游逻辑判断
规则:触发和检测核心执行部分
对于此系统的功能和使用简单演示如下
对于 binlog 而言,分成了 INSERT 和 UPDATE 两种,根据上主题配置自动生成无需创建
清洗的数据的过滤层,此处在规则配置中,返回‘TRUE’的才会进入下一层,如:
if( obj.status.toInteger() == 10000 && (obj.type.toInteger() ==101 || obj.type.toInteger() ==301) ) return 'TRUE';
如果想所有都返回,就直接 return 'TRUE'。
所有被执行脚本使用的是 groovy 进行编写的,主要是 BaseScript 实现 filter 和 check 两个方法,内部可以参考脚本库
代码例子:
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import groovy.util.logging.Slf4j; import org.springframework.stereotype.Service; import javax.annotation.Resource; /** * DCheck:冷静期内-平台客服取消订单-退买家支付金额 */ @Slf4j @Service class CheckRefundPayForLess30min implements BaseScript { @Resource private OrderDevOpsApi orderDevOpsApi; @Resource DCheckApi dCheckApi; @Resource private PayServiceFeignClient payServiceFeignClient String logTag = "TAG_CheckCrossAndOverSeaRefundPayForLess30min:{}" // 1.关单时间-支付时间<30分钟 @Override boolean filter(JSONObject doneCleanData) { // 查询支付时间 String unionId = doneCleanData.getString("order_no"); String payTime = getOrderData(unionId,"payTime", DevOpsSceneEnum.FORWARD_PAY); long modifyTime = doneCleanData.getDate("modify_time").getTime(); long diffTime = modifyTime - Long.valueOf(payTime) if (diffTime < 30 * 60 * 1000){ return true; } log.info(logTag,"===>有符合数据进入Check") return false; } @Override String check(JSONObject doneCleanData) { String subOrderNo = doneCleanData.getString("sub_order_no"); Result<List<String>> listResult = dCheckApi.queryPayNoBySubOrderNo(subOrderNo); if(listResult == null || listResult.getData() == null) { return "根据正向查询接口通过子订单号查询支付流水号数据为空"; } if(listResult.getData().size() > 1){ return "根据正向查询接口通过子订单号查询支付流水号多条数据,请查看是否需要优化逻辑"; } String outPayNo = listResult.getData().get(0); RefundQueryRequest refundQueryRequest = new RefundQueryRequest(); refundQueryRequest.setPayLogNum(outPayNo); Result<List<RefundBillDTO>> resp = payServiceFeignClient.queryRefundsByPayLogNum(refundQueryRequest); // 判断支付查询数据是否为空,如果为空直接报数据错误,以及是否查询到了多条数据 if (resp == null || resp.getData() == null) { return "上游数据为空:支付退款查询(根据支付流水号)"; } else if (resp.getData().size() != 1) { return "上游数据为多条请确认逻辑:支付退款查询(根据支付流水号)"; } // 检查点逻辑判断1: 状态为打款成功 if (resp.getData().get(0).getStatus() !=2 ){ return "校验支付打款状态非2"; } // 逻辑校验点2:交易退款和RPC查询的金额一致,否则告警 if (resp.getData().get(0).getAmount() != doneCleanData.getLong("amount")) { return "校验交易退款金额和支付打款金额不一致"; } return "SUCCESS"; } // 数据库查询对应字段值 String getOrderData(String unionNo,String key,DevOpsSceneEnum devOpsSceneEnum){ // 内部方法省略.... return value; } }
复制代码
规则配置为上述所有基础配置的组合以及真正的运行核心,主要两大块,基础信息和降级策略。
基础信息:子事件(支持搜索和多个选中)+ 脚本类(搜索选择) = 触发和执行逻辑
其他为辅助配置信息按照各自域和需求配置。
降级策略:
针对 check 异常的数据,一般首先会发到配置的告警飞书群和配置的个人,点击即可跳转到此页面,主要看错误的具体数据,经确认后是脚本或者部分数据问题的,优化后“重发”可以标记为处理,如果是确定是问题的,则定位“资损”问题。
由于本地脚本调用一些 RPC 接口,目前还没有好的办法能在本地进行 debug,所以就需要通过先配置规则后,使用 mock 进行调试,主要用到的是规则调试,选择指定的规则,搜索或者创造符合 dcheck 场景的 json 格式的请求参数,提交请求查看响应结果即可。
这里有个问题,因为 dcheck 内部逻辑对一些系统异常脚本做了统一处理,很多时候没法看到具体的原因,就是失败或者逻辑外通过,这就需要需要在脚本中多加些打印日志,然后通过日志平台去查看具体逻辑问题。
脚本数据处理有会有很多 list,key-value 的处理,可以充分利用 groovy 的闭包特性,大大简化 java 语言的复杂处理逻辑。
举例:如果如下数据格式的结果返回 objectList:
[{id=10086, refundNo=RE10086, orderNo=100888, userId=15206, bizType=110, payTool=0, payStatus=404,amount=100, feature=, isDel=0, createTime=2021-05-11 21:39:34.000, modifyTime=2021-05-11 21:39:34.000, moneyFrom=1, currency=, countryCode=},{id=10087, refundNo=RE10087, orderNo=100999, userId=15206, bizType=202, payTool=0, payStatus=404, amount=400, feature=, isDel=0, createTime=2021-05-11 21:39:34.000, modifyTime=2021-05-11 21:39:34.000, moneyFrom=1, countryCode=}]
复制代码
1)filter 进行条件判断,可以使用 any
def filterResult = objectList.any{it.bizType in [110,119] && it.payStatus == 404};
return filterResult
2)check 获取某个符合条件的值,可以使用 find
def mount = objectList.find{ it.type=5}.amount
更多 groovy 特性可以参考文章:https://www.jianshu.com/p/5d30f1443aa6
目前本地脚本库没有可以运行调试的环境,虽然有 Mock 工具,但也是需要先配置事件,上传脚本配置后才能进行调试,同时脚本逻辑问题也需要去日志不断改脚本的加日志的方式去查看,另外最后如果在测试环境了进行了调试通过,还要去线环境再重复配置一遍流程。
建议:线上在子事件、脚本、规则增加的页面就增加一个 Debug 按钮,可以直接通过给定参数或者抓取符合条件的一条数据进行调试给出结果,最好也能给出这部分日志。
在开发脚本和配置中发现其实好多 filter 逻辑相同,抑或 check 逻辑相同,但由于脚本中放在一起,就会产生交叉的逻辑编写,无法做到有效的公共剥离。
建议:Filter 和 check 拆分,并且作为公共的池,规则中单独配置,更可以选择模式为引用或者导入,导入的支持主要方便逻辑大部分相同,个别参数不相同下快速修改配置规则上线。
虽然目前整个平台的实际还待观察,但其中后续如果平台效果是很好的,真的发生大批量数据问题或者资金损失问题,平台的机制也只是发生问题的警告,然后技术介入,本质上还是后置的处理的方式,如一开始讲的这种后置的处理,并不能达到及时止损的目的。
建议:后续在平台更加准确时,应该考虑与关键域熔断联动,及时制止损失的产生。
目前规则的执行不执行,只能通过编辑控制流量 0,或是执行时间来关闭,也没有批量的,操作上有些不便。
建议:增加上下线开关,增加批量操作如:流量大小,开关,警告人等
由于好多 check 点是通过各个域的接口进行操作,在流量大的时候,尤其是关键的业务,可能对业务产品很大影响,或者在某接口兼容不好,系统异常导致大量非常规一场报错的时候,各方人会炸掉。
建议:规则分等级,核心业务可以做高级配置,遇到上述问题触发自动调节降流量,对于问题已恢复可自动增加流量。
文|大奇
关注得物技术,携手走向技术的云端
领取专属 10元无门槛券
私享最新 技术干货