我希望能在Scala中处理MongoDB

首先我以前对 MongoDB 不太感兴趣,但最近尝试使用 Scala 时发现 Observable 不太符合 Scala 的特点,所以觉得用 Future 处理会更容易。

环境

    • Scala: 2.13

 

    • sbt: 1.3.4

 

    • mongo-scala-driver: 2.7.0

 

    • cats: 2.0.0

 

项目的产出

标准的使用方法我們將從標準的使用方式開始。

首先,我们可以使用case class来定义模式(schema),因此我们会利用它。我们定义一个似乎没有什么用途的模式A,它简单地作为子模式包含多个B。

package com.example

import org.mongodb.scala.bson.ObjectId

final case class A(_id: ObjectId, name: String, children: Seq[B])

final case class B(name: String)

如果在数据库初始化时或集合初始化时,将上述内容作为withCodecRegistry参数传递,它会自动将其解析为BSON。

现在,我们马上来对上述模式进行CRUD操作试试看。

据说mongo-scala-driver是一个公式库,它是对mongo-java-driver的封装。它将各种处理结果包装成了类似于熟悉Rx系的Observable的异步Java类型,并返回。它采用了以具有onNext、onError和onComplete的Observer类型回调作为参数的subscribe方法来处理这些结果(与RxJS不同,Observable似乎只有在调用subscribe方法时才会执行…异步很难啊)。

package com.example

import scala.concurrent.ExecutionContext

import org.bson.codecs.configuration.CodecRegistries
import org.mongodb.scala._
import org.mongodb.scala.bson.ObjectId
import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
import org.mongodb.scala.bson.codecs.Macros._
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.Updates._

object Main extends App with LazyLogging {

  implicit val executionContext: ExecutionContext = ExecutionContext.global

  val mongoClient = MongoClient(
    "mongodb://root:1234@localhost:27017/?authSource=admin"
  )

  val mongoDb = mongoClient
    .getDatabase("test_db")
    .withCodecRegistry(
      CodecRegistries.fromRegistries(
        CodecRegistries.fromProviders(classOf[A], classOf[B]),
        DEFAULT_CODEC_REGISTRY
      )
    )

  val mongoCollection = mongoDb.getCollection[A]("test")

  val id1 = new ObjectId()

  val id2 = new ObjectId()

  val id3 = new ObjectId()

  logger.info("=========== start version of normal ==========")

  val f1 = for {
    _ <- mongoCollection.drop()
    _ <- mongoCollection.insertOne(
          A(id1, "a_1", Seq(B("b_1"), B("b_2")))
        )
    _ <- mongoCollection.insertOne(
          A(id2, "a_2", Seq.empty[B])
        )
    _ <- mongoCollection.insertOne(
          A(id3, "a_3", Seq.empty[B])
        )
    _ <- mongoCollection.updateOne(
          equal("_id", id2),
          combine(set("name", "a_4"), set("children", Seq(B("b_3"))))
        )
    _ <- mongoCollection.deleteOne(equal("_id", id3))
    found <- mongoCollection.find()
  } yield found

  f1.subscribe(new Observer[A] {
    override def onNext(result: A): Unit = logger.info(result.toString)

    override def onError(e: Throwable): Unit = logger.error("failed!!")

    override def onComplete(): Unit ={
      logger.info("=========== finish version of normal ==========")

      mongoClient.close()
    }
  })
}

う〜ん・・・。一応上記のようにfor構文を使えなくもない感じですが、それぞれのコールバック(onNext,onError,onComplete)がどう処理されてんのかよく分からりません。特に途中のonErrorが処理されない気配がプンプンする。

for構文が使えなかったらコールバック地獄行きだし、for構文使うならやっぱりMonadとして組みたいのでどうにかしたいなぁって感じです。

想办法
其实,Observable已经预先提供了一个名为toFuture的函数,它可以将Observable转换为Future,所以很容易实现任意转换。

因此,如往常一样,让我们将其拆分到仓库中。这次,我选择了一个简单的期望使用Monad的格式。

package com.example

import org.mongodb.scala.bson.ObjectId

trait Repository[F[_]] {

  def drop: F[Unit]

  def findAll: F[Seq[A]]

  def insert(entity: A): F[Unit]

  def update(entity: A): F[Unit]

  def delete(_id: ObjectId): F[Unit]
}

我将尝试使用Cats的Kleisli和IO来实现以上内容。

package com.example

import scala.concurrent.ExecutionContext

import cats.data.Kleisli
import cats.effect.{ContextShift, IO}
import org.mongodb.scala.MongoCollection
import org.mongodb.scala.bson.ObjectId
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.Updates._

object RepositoryImpl extends Repository[Kleisli[IO, MongoCollection[A], *]] {

  implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)

  override def drop: Kleisli[IO, MongoCollection[A], Unit] =
    Kleisli(c => IO.fromFuture(IO(c.drop().toFuture())).map(_ => ()))

  override def findAll: Kleisli[IO, MongoCollection[A], Seq[A]] =
    Kleisli(c => IO.fromFuture(IO(c.find().toFuture())))

  override def insert(entity: A): Kleisli[IO, MongoCollection[A], Unit] =
    Kleisli(c => IO.fromFuture(IO(c.insertOne(entity).toFuture())).map(_ => ()))

  override def update(entity: A): Kleisli[IO, MongoCollection[A], Unit] =
    Kleisli(
      c =>
        IO.fromFuture(
            IO(
              c.updateOne(equal("_id", entity._id),
                           combine(set("name", entity.name),
                                   set("children", entity.children)))
                .toFuture()
            )
          )
          .map(_ => ())
    )

  override def delete(
      _id: ObjectId
  ): Kleisli[IO, MongoCollection[A], Unit] =
    Kleisli(
      c =>
        IO.fromFuture(IO(c.deleteOne(equal("_id", _id)).toFuture()))
          .map(_ => ())
    )
}

走り書き感が溢れてますが、とりあえず利用する側はObservableを意識せず、関数合成できるようになるはずです。

因此,我們回到了主要的部分,並進行了使用存儲庫的版本添加和部分修正。

package com.example

import java.util.concurrent.TimeUnit

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

import cats.data.Kleisli
import cats.implicits._
import com.typesafe.scalalogging.LazyLogging
import org.bson.codecs.configuration.CodecRegistries
import org.mongodb.scala._
import org.mongodb.scala.bson.ObjectId
import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
import org.mongodb.scala.bson.codecs.Macros._
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.Updates._

object Main extends App with LazyLogging {

  implicit val executionContext: ExecutionContext = ExecutionContext.global

  val mongoClient = MongoClient(
    "mongodb://root:1234@localhost:27017/?authSource=admin"
  )

  val mongoDb = mongoClient
    .getDatabase("test_db")
    .withCodecRegistry(
      CodecRegistries.fromRegistries(
        CodecRegistries.fromProviders(classOf[A], classOf[B]),
        DEFAULT_CODEC_REGISTRY
      )
    )

  val mongoCollection = mongoDb.getCollection[A]("test")

  val id1 = new ObjectId()

  val id2 = new ObjectId()

  val id3 = new ObjectId()

  logger.info("=========== start version of normal ==========")

  // f1 = Observable[A]
  val f1 = for {
    _ <- mongoCollection.drop()
    _ <- mongoCollection.insertOne(
          A(id1, "a_1", Seq(B("b_1"), B("b_2")))
        )
    _ <- mongoCollection.insertOne(
          A(id2, "a_2", Seq.empty[B])
        )
    _ <- mongoCollection.insertOne(
          A(id3, "a_3", Seq.empty[B])
        )
    _ <- mongoCollection.updateOne(
          equal("_id", id2),
          combine(set("name", "a_4"), set("children", Seq(B("b_3"))))
        )
    _ <- mongoCollection.deleteOne(equal("_id", id3))
    found <- mongoCollection.find()
  } yield found

  f1.subscribe(new Observer[A] {
    override def onNext(result: A): Unit = logger.info(result.toString)

    override def onError(e: Throwable): Unit = logger.error("failed!!")

    override def onComplete(): Unit =
      logger.info("=========== finish version of normal ==========")
  })

  logger.info("=========== start version of repository ==========")

  // f2 = Klesli[IO, MongoCollection[A], Seq[A]]
  val f2 = for {
    _ <- RepositoryImpl.drop
    _ <- RepositoryImpl.insert(
          A(id1,
            "a_1",
            Seq(
              B("b_1"),
              B("b_2")
            ))
        )
    _ <- RepositoryImpl.insert(
          A(id2, "a_2", Seq.empty[B])
        )
    _ <- RepositoryImpl.insert(
          A(id3, "a_3", Seq.empty[B])
        )
    _ <- RepositoryImpl
          .update(A(id2, "a_4", Seq(B("b_3"))))
    _ <- RepositoryImpl.delete(id3)
    found <- RepositoryImpl.findAll
  } yield found

  f2.run(mongoCollection).unsafeRunAsync {
    case Right(v) =>
      v.foreach(a => logger.info(a.toString))
      logger.info("=========== finish version of repository ==========")

    case Left(_) =>
      logger.error("failed!!")
  }

  // 30秒もあれば終わるはず
  Await.result(Future(Thread.sleep(30000)), Duration(30, TimeUnit.SECONDS))

  mongoClient.close()
}

执行上述操作后,应该在控制台中显示如下(省略了驱动程序输出的日志)。

=========== start version of normal ==========
=========== start version of repository ==========
A(5ddcb68d27918964b23fddb4,a_1,List(B(b_1), B(b_2)))
A(5ddcb68d27918964b23fddb5,a_4,List(B(b_3)))
=========== finish version of normal ==========
A(5ddcb68d27918964b23fddb4,a_1,List(B(b_1), B(b_2)))
A(5ddcb68d27918964b23fddb5,a_4,List(B(b_3)))
=========== finish version of repository ==========

最后
ちなみに公式のチュートリアルはチュートリアル用のヘルパーが用意してあってあっさりFuture->Awaitにしてたりする。

听说还可以使用ReactiveStream之类的,但关于那方面的事情我们以后再说。

bannerAds