为什么选择 MongoDB?
在 Reactive 越来越流行的今天,传统阻塞式的数据库驱动已经无法满足Reactive应用的需要了,为此我们将目光转向新诞生的数据库新星 MongoDB 。MongoDB 从诞生以来就争议不断,总结一下主要有以下几点:
其实Schemaless
和不支持事务
是技术选型时的决定,不应该受到吐槽,主要看是否满足业务需求以及团队的喜好,没什么可争议的。至于默认忽略错误
也是无稽之谈,对于那些非关键数据,MongoDB为你提供了一个Fire and Forget
模式,可以显著提高系统性能,并且几乎所有的MongoDB驱动都默认关闭了这个模式,如果需要你可以手动打开。默认关闭认证
并不是不支持认证
,只是为了方便快速原型,如果你敢在线上裸奔MongoDB,我只能默默地为你点根蜡烛。数据丢失
问题已经成为历史,曾经在网上广为流传的两篇关于MongoDB数据丢失问题(1, 2), 经过分布式系统安全性测试组织JEPSEN最新的测试分析表明,MongoDB 3.4.0已经解决了这些问题。
聊完争议,我们来看看MongoDB有哪些优点:
其中异步数据库驱动
最为吸引人,该技术是实现 Reactive 应用的基石。
如何进行 MongoDB 开发 ?
目前有如下三个基于 Scala 开发的 MongoDB 驱动可供选择:
Mongo Scala Driver 是 MongoDB 官方维护的 Scala 驱动,该驱动底层基于官方的 Java 驱动,在此基础上提供了一层很薄的 Scala 包装。Mongo Scala Driver 提供了一套基于 Java 的 Bson Api,无法与 Play Json 集成。另外 Mongo Scala Driver 并没有实现 Reactive Streams 规范,而是实现了一套与 Reactive Streams 类似的 Reactive Api,即 Observable, Subscription 和 Observer。另外 Mongo Scala Driver 的数据库操作默认返回 Observable 类型,如果你忘记了调用 toFuture 方法,或是没有消费返回数据,则数据库操作实际上并不会被执行,在开发中很容易引入一些Bug。
ReactiveMongo 是 Play Framework 团队成员私下维护的项目,似乎并没有得到官方的支持。该项目基于 Akka 和 Netty 重新实现了 MongoDB 通信协议,并且基于 Scala 实现了一套原生的 Bson Api。该项目提供了一个 Play 模块,实现了 Bson 和 Json 的自动转换。ReactiveMongo 主要有三个问题,一是版本更新不够及时,无法跟上 MongoDB 的更新节奏;二是可能存在安全隐患,容易造成生产事故,详情参考:issue#721。三是语法过于繁琐,向开发者暴露了太多细节,例如批量插入操作:
val docs = seq.map(c => implicitly[statChatCol.ImplicitlyDocumentProducer](c.toStatChat))
collection.bulkInsert(false)(docs: _*)
让开发者编写类似implicitly[statChatCol.ImplicitlyDocumentProducer]
这样的代码似乎不太合适。
由于 Reactive Mongo 的种种问题,最终诞生了 Play Mongo。Play Mongo 是由 PlayScala 社区为 Play Framework 开发的 MongoDB 模块, 该项目基于 MongoDB 官方的 Scala 驱动,并且提供了更多的实用功能,例如,
Play Mongo 基于官方驱动开发,可以为开发者提供最佳的稳定性,并能及时跟进 MongoDB 的版本升级。另外 Play Mongo 不会过多关注底层驱动的实现细节,而是将关注点放在与 Play Framework 的集成上,可以为开发者提供更舒适的开发体验。本文将采用 Play Mongo 讲述 MongoDB 的开发细节。
Play Mongo 开发入门
Play Mongo 只是为我们提供了数据访问层,我们还需要基于访问层构建模型层。关于模型层的设计,我们可以选择贫血模型、充血模型以及应对复杂业务的领域模型。关于模型层的设计,我们将会在“第四部分 Play 框架开发实战”中继续讨论。为了方便阐述,我们这里选择最简单的贫血模型,即模型层只包含数据,不包含任何的业务逻辑实现。
添加依赖
打开 Play 项目,编辑 build.sbt
,添加如下依赖,
libraryDependencies += "cn.playscala" % "play-mongo_2.12" % "0.3.0"
addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.1" cross CrossVersion.full)
打开 conf/application.conf
, 添加数据库连接,
mongodb.uri = "mongodb://user:password@host:port/dbName?authMode=scram-sha1"
定义模型层
我们建议在定义 Model 类时要显式声明 _id 属性,该属性为 MongoDB 的默认主键,如果没有,在插入时会自动生成。下面代码定义了一个 Person 类:
package models
@Entity("common-person")
case class Person(_id: String, name: String, age: Int)
@Entity 注解参数用于指定关联的 mongodb collection 名称, 如果未指定,则默认为 Model 类名称。 作为约定,Model 类使用 _id 字段作为唯一标识, 该字段同时也是 mongodb collection 的默认主键。
模型层编解码
在应用启动时指定模型层(models)的包路径,编辑app/Module
类,
class Module extends AbstractModule {
override def configure() = {
Mongo.setModelsPackage("models")
}
}
Mongo.setModelsPackage方法将会查找指定包路径下的所有Case Class,自动生成驱动所需的编解码器。需要注意的是,这些编解码器是驱动私有的,外界无法共享。我们仍然需要定义全局共享的隐式 Format 对象:
import play.api.libs.json.Format
package object models {
implicit val personFormat = Json.format[Person]
}
如果有很多的 Case Class,则需要逐个定义,编写起来还是挺麻烦的。我们可以使用 @JsonFormat 宏注解,通过一行代码为所有 Case Class 生成相应的隐式 Format 对象:
import cn.playscala.mongo.codecs.macrocodecs.JsonFormat
package object models {
@JsonFormat("models")
implicit val formats = ???
}
由于这些隐式的 Format 对象是在模型层的包对象(package object)中创建的,所以使用时无需显式导入,编译器会自动加载。
依赖注入
至此,我们便可以将 Mongo 实例注入到任意需要的地方:
@Singleton
class Application @Inject()(cc: ControllerComponents, mongo: Mongo) extends AbstractController(cc) {}
模型类和Collection
模型类使用 @Entity 注解标注, 一个模型类实例表示 mongodb collection 中的一个文档, 一个 mongodb collection 在概念上类似于关系数据库的一张表。
@Entity("common-user")
case class User(_id: String, name: String, password: String, addTime: Instant)
@Entity 注解参数用于指定关联的 mongodb collection 名称, 如果未指定,则默认为 Model 类名称。 作为约定,模型类使用 _id 字段作为唯一标识, 该字段同时也是 mongodb collection 的默认主键。
我们可以通过两种方式访问 mongodb collection, 第一种方式是使用模型类,
mongo.find[User]().list().map{ users => ... }
这里的参数类型 User 不仅用于指定关联的 mongodb collection, 而且用于指明返回的结果类型。 这意味着查询操作将会在 common-user collection 上执行, 并且返回的结果类型是 User。 需要注意的是,在该方式下无法改变返回的结果类型。
第二种方式是使用 mongo.collection 方法,
mongo.collection("common-user").find[User]().list().map{ users => }
在这里, find 方法上的参数类型 User 仅仅用于指定返回的结果类型, 我们可以通过更改该参数类型设置不同的返回结果类型,
mongo.collection("common-user").find[JsObject]().list().map{ jsObjList => }
mongo.collection("common-user").find[User](Json.obj("userType" -> "common")).list().map{ commonUsers => }
当然,我们也可以使用 model 类指定关联的 mongodb collection,
mongo.collection[User].find[User]().list().map{ user => }
第1个参数类型 User 用于指定关联的 mongodb collection, 第2个参数类型 User 用于指定返回的结果类型。 我们仍然可以通过改变第2个参数类型从而改变返回的结果类型。
常见操作
以下示例代码默认执行了 import play.api.libs.json.Json._
导入, 所以 Json.obj()
可以被简写为 obj()
。
创建操作
// 插入 Model
mongo.insert[User](User("0", "joymufeng", "123456", Instant.now))
// 插入 Json
val jsObj = obj("_id" -> "0", "name" -> "joymufeng", "password" -> "123456", "addTime" -> Instant.now)
mongo.collection[User].insert(jsObj)
mongo.collection("common-user").insert(jsObj)
更新操作
mongo.updateById[User]("0", obj("$set" -> obj("password" -> "123321")))
mongo.updateOne[User](obj("_id" -> "0"), obj("$set" -> obj("password" -> "123321")))
mongo.collection[User].updateById("0", obj("$set" -> obj("password" -> "123321")))
mongo.collection[User].updateOne(obj("_id" -> "0"), obj("$set" -> obj("password" -> "123321")))
mongo.collection("common-user").updateById("0", obj("$set" -> obj("password" -> "123321")))
mongo.collection("common-user").updateOne(obj("_id" -> "0"), obj("$set" -> obj("password" -> "123321")))
查询操作
mongo.findById[User]("0") // Future[Option[User]]
mongo.find[User](obj("_id" -> "0")).first // Future[Option[User]]
mongo.collection[User].findById[User]("0") // Future[Option[User]]
mongo.collection[User].find[User](obj("_id" -> "0")).first // Future[Option[User]]
mongo.collection[User].findById[JsObject]("0") // Future[Option[JsObject]]
mongo.collection[User].find[JsObject](obj("_id" -> "0")).first // Future[Option[JsObject]]
mongo.collection("common-user").findById[User]("0") // Future[Option[User]]
mongo.collection("common-user").find[User](obj("_id" -> "0")).first // Future[Option[User]]
mongo.collection("common-user").findById[JsObject]("0") // Future[Option[JsObject]]
mongo.collection("common-user").find[JsObject](obj("_id" -> "0")).first // Future[Option[JsObject]]
删除操作
mongo.deleteById[User]("0")
mongo.deleteOne[User](obj("_id" -> "0"))
mongo.collection[User].deleteById("0")
mongo.collection[User].deleteOne(obj("_id" -> "0"))
mongo.collection("common-user").deleteById("0")
mongo.collection("common-user").deleteOne(obj("_id" -> "0"))
上传和下载文件
// Upload and get the fileId
mongo.gridFSBucket.uploadFromFile("image.jpg", "image/jpg", new File("./image.jpg")).map{ fileId =>
Ok(fileId)
}
// Download file by fileId
mongo.gridFSBucket.findById("5b1183fed3ba643a3826325f").map{
case Some(file) =>
Ok.chunked(file.stream.toSource)
.as(file.getContentType)
case None =>
NotFound
}
Change Stream
我们可以通过 toSource
方法将 Change Stream 转换成 Akka Source,之后便会有趣很多。例如下面的代码拥有如下几个功能:
mongo
.collection[User]
.watch()
.fullDocument
.toSource
.groupedWithin(10, 1000.millis)
.throttle(elements = 1, per = 1.second, maximumBurst = 1, ThrottleMode.shaping)
.runForeach{ seq =>
// ...
}
关联查询操作
@Entity("common-article")
case class Article(_id: String, title: String, content: String, authorId: String)
@Entity("common-author")
case class Author(_id: String, name: String)
mongo.find[Article].fetch[Author]("authorId").list().map{ _.map{ t =>
val (article, author) = t
}
}
对于满足查询条件的每一个 article , 将会根据匹配条件article.authorId == author._id
拉取关联的 author。
小结
MongoDB自2009发布以来,产品和社区都已经非常成熟,已经有商业公司在云上提供MongoDB服务。除此之外,MongoDB不仅方便开发,而且容易维护,普通的开发人员利用自带的mongodump
和mongorestore
命令便可进行备份、恢复操作。当然更重要的是,利用MongoDB的异步驱动以及ChangeStreams,我们可以开发高性能的实时应用。