I have a hard task to read from a Cassandra table millions of rows. Actually this table contains like 40~50 millions of rows. The data is actually internal URLs for our system and we need to fire all of them. To fire it, we are using Akka Streams and it have been working pretty good, doing some back pressure as needed. But we still have not found a way to read everything effectively.
What we have tried so far:
Reading the data as Stream using Akka Stream. We are using phantom-dsl that provides a publisher for a specific table. But it does not read everything, only a small portion. Actually it stops to read after the first 1 million.
Reading using Spark by a specific date. Our table is modeled like a time series table, with year, month, day, minutes... columns. Right now we are selecting by day, so Spark will not fetch a lot of things to be processed, but this is a pain to select all those days.
The code is the following:
val cassandraRdd =
sc
.cassandraTable("keyspace", "my_table")
.select("id", "url")
.where("year = ? and month = ? and day = ?", date.getYear, date.getMonthOfYear, date.getDayOfMonth)
Unfortunately I can't iterate over the partitions to get less data, I have to use a collect because it complains the actor is not serializable.
val httpPool: Flow[(HttpRequest, String), (Try[HttpResponse], String), HostConnectionPool] = Http().cachedHostConnectionPool[String](host, port).async
val source =
Source
.actorRef[CassandraRow](10000000, OverflowStrategy.fail)
.map(row => makeUrl(row.getString("id"), row.getString("url")))
.map(url => HttpRequest(uri = url) -> url)
val ref = Flow[(HttpRequest, String)]
.via(httpPool.withAttributes(ActorAttributes.supervisionStrategy(decider)))
.to(Sink.actorRef(httpHandlerActor, IsDone))
.runWith(source)
cassandraRdd.collect().foreach { row =>
ref ! row
}
I would like to know if any of you have such experience on reading millions of rows for doing anything different from aggregation and so on.
Also I have thought to read everything and send to a Kafka topic, where I would be receiving using Streaming(spark or Akka), but the problem would be the same, how to load all those data effectively ?
EDIT
For now, I'm running on a cluster with a reasonable amount of memory 100GB and doing a collect and iterating over it.
Also, this is far different from getting bigdata with spark and analyze it using things like reduceByKey, aggregateByKey, etc, etc.
I need to fetch and send everything over HTTP =/
So far it is working the way I did, but I'm afraid this data get bigger and bigger to a point where fetching everything into memory makes no sense.
Streaming this data would be the best solution, fetching in chunks, but I haven't found a good approach yet for this.
At the end, I'm thinking of to use Spark to get all those data, generate a CSV file and use Akka Stream IO to process, this way I would evict to keep a lot of things in memory since it takes hours to process every million.
Well, after spending sometime reading, talking with other guys and doing tests the result could be achieve by the following code sample:
val sc = new SparkContext(sparkConf)
val cassandraRdd = sc.cassandraTable(config.getString("myKeyspace"), "myTable")
.select("key", "value")
.as((key: String, value: String) => (key, value))
.partitionBy(new HashPartitioner(2 * sc.defaultParallelism))
.cache()
cassandraRdd
.groupByKey()
.foreachPartition { partition =>
partition.foreach { row =>
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val myActor = system.actorOf(Props(new MyActor(system)), name = "my-actor")
val source = Source.fromIterator { () => row._2.toIterator }
source
.map { str =>
myActor ! Count
str
}
.to(Sink.actorRef(myActor, Finish))
.run()
}
}
sc.stop()
class MyActor(system: ActorSystem) extends Actor {
var count = 0
def receive = {
case Count =>
count = count + 1
case Finish =>
println(s"total: $count")
system.shutdown()
}
}
case object Count
case object Finish
What I'm doing is the following:
Perhaps this code could be even more improved, but so far I'm happy to do not make the use of .collect anymore and working only inside Spark.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With