我希望能在Scala中处理MongoDB
首先我以前对 MongoDB 不太感兴趣,但最近尝试使用 Scala 时发现 Observable 不太符合 Scala 的特点,所以觉得用 Future 处理会更容易。
环境
-
Scala: 2.13
-
- Scala: 2.13
-
- sbt: 1.3.4
-
- mongo-scala-driver: 2.7.0
-
- cats: 2.0.0
- 他
项目的产出
标准的使用方法我們將從標準的使用方式開始。
首先,我们可以使用case class来定义模式(schema),因此我们会利用它。我们定义一个似乎没有什么用途的模式A,它简单地作为子模式包含多个B。
package com.example
import org.mongodb.scala.bson.ObjectId
final case class A(_id: ObjectId, name: String, children: Seq[B])
final case class B(name: String)
如果在数据库初始化时或集合初始化时,将上述内容作为withCodecRegistry参数传递,它会自动将其解析为BSON。
现在,我们马上来对上述模式进行CRUD操作试试看。
据说mongo-scala-driver是一个公式库,它是对mongo-java-driver的封装。它将各种处理结果包装成了类似于熟悉Rx系的Observable的异步Java类型,并返回。它采用了以具有onNext、onError和onComplete的Observer类型回调作为参数的subscribe方法来处理这些结果(与RxJS不同,Observable似乎只有在调用subscribe方法时才会执行…异步很难啊)。
package com.example
import scala.concurrent.ExecutionContext
import org.bson.codecs.configuration.CodecRegistries
import org.mongodb.scala._
import org.mongodb.scala.bson.ObjectId
import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
import org.mongodb.scala.bson.codecs.Macros._
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.Updates._
object Main extends App with LazyLogging {
implicit val executionContext: ExecutionContext = ExecutionContext.global
val mongoClient = MongoClient(
"mongodb://root:1234@localhost:27017/?authSource=admin"
)
val mongoDb = mongoClient
.getDatabase("test_db")
.withCodecRegistry(
CodecRegistries.fromRegistries(
CodecRegistries.fromProviders(classOf[A], classOf[B]),
DEFAULT_CODEC_REGISTRY
)
)
val mongoCollection = mongoDb.getCollection[A]("test")
val id1 = new ObjectId()
val id2 = new ObjectId()
val id3 = new ObjectId()
logger.info("=========== start version of normal ==========")
val f1 = for {
_ <- mongoCollection.drop()
_ <- mongoCollection.insertOne(
A(id1, "a_1", Seq(B("b_1"), B("b_2")))
)
_ <- mongoCollection.insertOne(
A(id2, "a_2", Seq.empty[B])
)
_ <- mongoCollection.insertOne(
A(id3, "a_3", Seq.empty[B])
)
_ <- mongoCollection.updateOne(
equal("_id", id2),
combine(set("name", "a_4"), set("children", Seq(B("b_3"))))
)
_ <- mongoCollection.deleteOne(equal("_id", id3))
found <- mongoCollection.find()
} yield found
f1.subscribe(new Observer[A] {
override def onNext(result: A): Unit = logger.info(result.toString)
override def onError(e: Throwable): Unit = logger.error("failed!!")
override def onComplete(): Unit ={
logger.info("=========== finish version of normal ==========")
mongoClient.close()
}
})
}
う〜ん・・・。一応上記のようにfor構文を使えなくもない感じですが、それぞれのコールバック(onNext,onError,onComplete)がどう処理されてんのかよく分からりません。特に途中のonErrorが処理されない気配がプンプンする。
for構文が使えなかったらコールバック地獄行きだし、for構文使うならやっぱりMonadとして組みたいのでどうにかしたいなぁって感じです。
想办法
其实,Observable已经预先提供了一个名为toFuture的函数,它可以将Observable转换为Future,所以很容易实现任意转换。
因此,如往常一样,让我们将其拆分到仓库中。这次,我选择了一个简单的期望使用Monad的格式。
package com.example
import org.mongodb.scala.bson.ObjectId
trait Repository[F[_]] {
def drop: F[Unit]
def findAll: F[Seq[A]]
def insert(entity: A): F[Unit]
def update(entity: A): F[Unit]
def delete(_id: ObjectId): F[Unit]
}
我将尝试使用Cats的Kleisli和IO来实现以上内容。
package com.example
import scala.concurrent.ExecutionContext
import cats.data.Kleisli
import cats.effect.{ContextShift, IO}
import org.mongodb.scala.MongoCollection
import org.mongodb.scala.bson.ObjectId
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.Updates._
object RepositoryImpl extends Repository[Kleisli[IO, MongoCollection[A], *]] {
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
override def drop: Kleisli[IO, MongoCollection[A], Unit] =
Kleisli(c => IO.fromFuture(IO(c.drop().toFuture())).map(_ => ()))
override def findAll: Kleisli[IO, MongoCollection[A], Seq[A]] =
Kleisli(c => IO.fromFuture(IO(c.find().toFuture())))
override def insert(entity: A): Kleisli[IO, MongoCollection[A], Unit] =
Kleisli(c => IO.fromFuture(IO(c.insertOne(entity).toFuture())).map(_ => ()))
override def update(entity: A): Kleisli[IO, MongoCollection[A], Unit] =
Kleisli(
c =>
IO.fromFuture(
IO(
c.updateOne(equal("_id", entity._id),
combine(set("name", entity.name),
set("children", entity.children)))
.toFuture()
)
)
.map(_ => ())
)
override def delete(
_id: ObjectId
): Kleisli[IO, MongoCollection[A], Unit] =
Kleisli(
c =>
IO.fromFuture(IO(c.deleteOne(equal("_id", _id)).toFuture()))
.map(_ => ())
)
}
走り書き感が溢れてますが、とりあえず利用する側はObservableを意識せず、関数合成できるようになるはずです。
因此,我們回到了主要的部分,並進行了使用存儲庫的版本添加和部分修正。
package com.example
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
import cats.data.Kleisli
import cats.implicits._
import com.typesafe.scalalogging.LazyLogging
import org.bson.codecs.configuration.CodecRegistries
import org.mongodb.scala._
import org.mongodb.scala.bson.ObjectId
import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
import org.mongodb.scala.bson.codecs.Macros._
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.Updates._
object Main extends App with LazyLogging {
implicit val executionContext: ExecutionContext = ExecutionContext.global
val mongoClient = MongoClient(
"mongodb://root:1234@localhost:27017/?authSource=admin"
)
val mongoDb = mongoClient
.getDatabase("test_db")
.withCodecRegistry(
CodecRegistries.fromRegistries(
CodecRegistries.fromProviders(classOf[A], classOf[B]),
DEFAULT_CODEC_REGISTRY
)
)
val mongoCollection = mongoDb.getCollection[A]("test")
val id1 = new ObjectId()
val id2 = new ObjectId()
val id3 = new ObjectId()
logger.info("=========== start version of normal ==========")
// f1 = Observable[A]
val f1 = for {
_ <- mongoCollection.drop()
_ <- mongoCollection.insertOne(
A(id1, "a_1", Seq(B("b_1"), B("b_2")))
)
_ <- mongoCollection.insertOne(
A(id2, "a_2", Seq.empty[B])
)
_ <- mongoCollection.insertOne(
A(id3, "a_3", Seq.empty[B])
)
_ <- mongoCollection.updateOne(
equal("_id", id2),
combine(set("name", "a_4"), set("children", Seq(B("b_3"))))
)
_ <- mongoCollection.deleteOne(equal("_id", id3))
found <- mongoCollection.find()
} yield found
f1.subscribe(new Observer[A] {
override def onNext(result: A): Unit = logger.info(result.toString)
override def onError(e: Throwable): Unit = logger.error("failed!!")
override def onComplete(): Unit =
logger.info("=========== finish version of normal ==========")
})
logger.info("=========== start version of repository ==========")
// f2 = Klesli[IO, MongoCollection[A], Seq[A]]
val f2 = for {
_ <- RepositoryImpl.drop
_ <- RepositoryImpl.insert(
A(id1,
"a_1",
Seq(
B("b_1"),
B("b_2")
))
)
_ <- RepositoryImpl.insert(
A(id2, "a_2", Seq.empty[B])
)
_ <- RepositoryImpl.insert(
A(id3, "a_3", Seq.empty[B])
)
_ <- RepositoryImpl
.update(A(id2, "a_4", Seq(B("b_3"))))
_ <- RepositoryImpl.delete(id3)
found <- RepositoryImpl.findAll
} yield found
f2.run(mongoCollection).unsafeRunAsync {
case Right(v) =>
v.foreach(a => logger.info(a.toString))
logger.info("=========== finish version of repository ==========")
case Left(_) =>
logger.error("failed!!")
}
// 30秒もあれば終わるはず
Await.result(Future(Thread.sleep(30000)), Duration(30, TimeUnit.SECONDS))
mongoClient.close()
}
执行上述操作后,应该在控制台中显示如下(省略了驱动程序输出的日志)。
=========== start version of normal ==========
=========== start version of repository ==========
A(5ddcb68d27918964b23fddb4,a_1,List(B(b_1), B(b_2)))
A(5ddcb68d27918964b23fddb5,a_4,List(B(b_3)))
=========== finish version of normal ==========
A(5ddcb68d27918964b23fddb4,a_1,List(B(b_1), B(b_2)))
A(5ddcb68d27918964b23fddb5,a_4,List(B(b_3)))
=========== finish version of repository ==========
最后
ちなみに公式のチュートリアルはチュートリアル用のヘルパーが用意してあってあっさりFuture->Awaitにしてたりする。
听说还可以使用ReactiveStream之类的,但关于那方面的事情我们以后再说。