前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >PlayScala 开发技巧 - 实时同步 MongoDB 高可用方案

PlayScala 开发技巧 - 实时同步 MongoDB 高可用方案

作者头像
joymufeng
发布2019-03-12 16:03:40
6570
发布2019-03-12 16:03:40
举报
文章被收录于专栏:Play & Scala 技术分享

1 如何实时同步MongoDB?

MongoDB 从 3.6 开始为开发者提供了 Change Streams 功能,利用 Change Streams 功能可以非常方便地监听指定 Collection 上的数据变化。例如在 mongo shell 中,我们可以通过如下方式监听 shopping 数据库 order 表上的变化:

代码语言:javascript
复制
watchCursor = db.getSiblingDB("shopping").order.watch()
while (!watchCursor.isExhausted()){
   if (watchCursor.hasNext()){
      printjson(watchCursor.next());
   }
}

2 在Play中如何操作?

利用 Play Mongo 可以方便地实现数据监听功能,并且我们可以将 Change Stream 转换成 Akka Stream,然后以流的方式处理指定 Collection 上的数据变化,

代码语言:javascript
复制
mongo
  .collection[Order]
  .watch()
  .fullDocument
  .toSource
  .groupedWithin(10, 1000.millis)
  .throttle(elements = 1, per = 1.second, maximumBurst = 1, ThrottleMode.shaping)
  .runForeach{ seq => 
    // ...
  }

上面的代码实现了以下几个功能:

  • 将从 Change Stream 接收到的元素进行缓冲,以方便批处理,当满足下面任意一个条件时便结束缓冲向后传递:
    • 缓冲满10个元素
    • 缓冲时间超过了1000毫秒
  • 对缓冲后的元素进行流控,每秒只允许通过1个元素

3 如何实现高可用?

上面的代码并没有考虑可用性,如果在监听过程中发生了网络错误,如何从错误中恢复呢? 上面的实现代码底层是基于官方的 mongo-java-driver 实现的,关于可用性官方文档有如下描述:

Change streams provide a way to watch changes to documents in a collection. To improve the usability of this new stage, the MongoCollection API includes a new watch method. The ChangeStreamIterable sets up the change stream and automatically attempts to resume if it encounters a potentially recoverable error.

文档中提及程序可以自动从可恢复的错误中恢复。经测试验证,如果网络中断在 30 秒以内均属于可恢复错误;但是如果大于 30 秒,则会报连接超时错误并且无法从错误中自动恢复:

代码语言:javascript
复制
com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches ReadPreferenceServerSelector{readPreference=primary}. Client view of cluster state is {type=UNKNOWN, servers=[{address=127.0.0.1:27117, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused}}]
    at com.mongodb.internal.connection.BaseCluster.createTimeoutException(BaseCluster.java:401)
    at com.mongodb.internal.connection.BaseCluster.handleServerSelectionRequest(BaseCluster.java:309)
    at com.mongodb.internal.connection.BaseCluster.access$800(BaseCluster.java:65)
    at com.mongodb.internal.connection.BaseCluster$WaitQueueHandler.run(BaseCluster.java:482)
    at java.lang.Thread.run(Thread.java:748)

幸运的是,Akka Stream 的 RestartSource 可以帮我们解决这种不可恢复错误,解决方式就是通过指数规避(exponential back-off)方式不断重试。下面是一个通用的创建 RestartSource 的方法实现:

代码语言:javascript
复制
def restartSource(colName: String): Source[ChangeStreamDocument[JsObject], _] = {
  RestartSource.withBackoff(
    minBackoff = 3.seconds,
    maxBackoff = 10.seconds,
    randomFactor = 0.2, 
    maxRestarts = 1000000 
  ) { () ⇒
    Logger.warn(s"Creating source for watching ${colName}.")
    mongo.collection(colName).watch().fullDocument.toSource
  }
}

通过 Backoff 参数可以指定重试策略:

  • minBackoff 最小重试时间间隔
  • maxBackoff 最大重试时间间隔
  • randomFactor 设置一个随机的浮动因子,使得每次计算的间隔有些许差异
  • maxRestarts 最大重试次数

当发生错误时,RestartSource 会尝试重新创建一个 Source:

代码语言:javascript
复制
Logger.warn(s"Creating source for watching ${colName}.")
mongo.collection(colName).watch().fullDocument.toSource

完整代码如下:

代码语言:javascript
复制
val colName = "common-user"
restartSource(colName)
  .groupedWithin(10, 1000.millis)
  .throttle(elements = 1, per = 1.second, maximumBurst = 1, ThrottleMode.shaping)
  .runForeach{ seq =>
    try {
      Logger.info(seq.toString())    
    } catch { case t: Throwable =>
      Logger.error(s"Watch change stream of ${colName} error: ${t.getMessage}", t)
    }
  }

需要注意的是 runForeach 中需要显式捕获异常并处理,否则会导致 Source 结束并退出。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018/09/03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 如何实时同步MongoDB?
  • 2 在Play中如何操作?
  • 3 如何实现高可用?
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档