I am building an app using Play Framework 2.5.0 and ReactiveMongo and I am spending a lot of time, stuck, on something that would be very easy to do in most web languages.
That thing is inserting many documents at once.
To do so, I must use the ReactiveMongo function bulkInsert.
I found this google group that had a very simple example, however it is from 2013 and now the signature changed
from
def bulkInsert[T](enumerator: Enumerator[T])
to
def bulkInsert(documents: Stream[P.Document], ordered: Boolean, writeConcern: WriteConcern)(implicit ec: ExecutionContext): Future[MultiBulkWriteResult]
So here I tried to take that example and find a way to convert Enumerator to a Stream (did not find any way to do so) :
val schemasDocs: Seq[JsObject] = {
jsonSchemas.fields.map {
case (field, value) => Json.obj(field -> value)
}
}
val enumerator = Enumerator.enumerate(schemasDocs)
val schemasStream = Source.fromPublisher(Streams.enumeratorToPublisher(enumerator)) // my attempt to turn enumerator into a Stream
val schemasInsert = {
getCollection("schemas").flatMap(
_.bulkInsert(schemasStream, true)
)
}
Now I find myself diving in the Akka, ReactiveMongo and Play API to try and create a Stream of JsObjects from a Seq of JsObjects..
Then I tried a different approach: the example from the ReactiveMongo's website
val bulkDocs = schemasDocs.map(implicitly[collection.ImplicitlyDocumentProducer](_))
collection.bulkInsert(ordered=true)(bulkDocs: _*)
gives me an error that is as hard to debug :
type mismatch; found : Seq[reactivemongo.play.json.collection.JSONCollection#ImplicitlyDocumentProducer] required: Seq[x$48.ImplicitlyDocumentProducer]
I would rather not use Streams and use the second solution, as I don't like to have things I don't understand in my code..
I just found how to handle bulkInsert. There is an example
build.sbt
...
libraryDependencies ++= Seq(
"org.reactivemongo" %% "play2-reactivemongo" % "0.11.14"
)
...
plugins.sbt
addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.5.12")
CxTransactionsRepository.scala
package cx.repository
import cx.model.CxTransactionEntity
import play.modules.reactivemongo.ReactiveMongoApi
import reactivemongo.play.json.collection.JSONCollection
import scala.concurrent.{ExecutionContext, Future}
class CxTransactionsRepository @Inject()(val reactiveMongoApi: ReactiveMongoApi)(implicit ec: ExecutionContext){
private val cxTransactionsCollectionFuture: Future[JSONCollection] = reactiveMongoApi.database.map(_.collection[JSONCollection]("cxTransactions"))
def bulkInsert(seq: Seq[CxTransactionEntity]): Future[Int] = {
for {
transactions <- cxTransactionsCollectionFuture
writeResult <- transactions.bulkInsert(ordered = false)(seq.map(implicitly[transactions.ImplicitlyDocumentProducer](_)): _*)
} yield {
writeResult.n
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With