I am writing code based on "Asynchronous iterators for large record sets" described at https://github.com/websudos/phantom#partial-select-queries
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import org.joda.time.format.DateTimeFormatter
import com.anomaly42.aml.dao.CassandraConnector
import com.websudos.phantom.CassandraTable
import com.websudos.phantom.Implicits._
object People extends People {
def getPersonByUpdatedAt(from:String, to:String, start: Int, limit: Int) = {
val dtf:DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ");
val fromDateTime = dtf.parseDateTime(from)
val toDateTime = dtf.parseDateTime(to)
People.select(_.updated_at, _.firstName).allowFiltering.where(_.updated_at gte fromDateTime).and(_.updated_at lte toDateTime).fetchEnumerator().slice(start, limit).collect
}
}
I am using following library dependency:
scalaVersion := "2.11.6"
libraryDependencies ++= Seq(
"com.websudos" %% "phantom-dsl" % "1.5.4",
many more...
)
But I get following error while compilation:
value slice is not a member of play.api.libs.iteratee.Enumerator[(org.joda.time.DateTime, Option[String])]
What I am trying to do is write a query that brings back next 'limit' number of results starting from 'start', each time getPersonByUpdatedAt() method is called.
There are quite a few implementation details to address here. First of all, if you are after pagination there may be an easier way to achieve that with simple range queries instead of filtered data.
Have a look at using CLUSTERING ORDER
, that call to ALLOW FILTERING
shouldn't be there. Furthermore, without CLUSTERING ORDER
the default Murmur3 partitioner is not actually ordered, so you have no guarantee of retrieving data in the same order you've written it.
Which likely means your pagination is not really going to work at all. Last but not least, using enumerators directly is probably not what you are after.
They are asynchronous, so you have to map inside a future to get a slice, but that aside, enumerators are useful when something like Spark loads up an entire table at once, e.g many many results.
To sum it all up, inside the people table:
object id extends UUIDColumn(this) with PartitionKey[UUID]// doesn't have to be UUID
object start extends DateTimeColumn(this) with ClusteringOrder[DateTime] with Ascending
object end extends DateTimeColumn(this) with ClusteringOrder[DateTime] with Ascending
And simply use fetch()
and then Seq.slice
from the Scala collections library. The above assumes you want to paginate in ascending order, e.g retrieves the oldest first.
You also need to figure out what a realistic partition key could be. If 2 users are simultaneously updated, the worst case is you lose data and wind up with a FIFO queue, e.g last update at a given time "wins". I used id
above, but that's not what you need obviously.
And you may well need to have several tables where you store people so you can cover all the queries you need.
you should use Iteratee and Enumerator from Play framework. In your case you need to:
import com.websudos.phantom.iteratee.Iteratee
val enumerator = People.select(_.updated_at, _.firstName).allowFiltering.where(_.updated_at gte fromDateTime).and(_.updated_at lte toDateTime).fetchEnumerator
val iteratee = Iteratee.slice[PeopleCaseClass](start, limit)
enumerator.run( iteratee ).map( _.foldLeft( List.empty[PeopleCaseClass] )( (l,e) => { e :: l } ))
Hope this will help
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With