Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Cassandra : timeout during SIMPLE write query at consistency LOCAL_QUORUM

I'm trying to ingest data (one partition = 1MB BLOB) from Spark to Cassandra with theses conf parameters :

spark.sql.catalog.cassandra.spark.cassandra.output.batch.size.rows 1
spark.sql.catalog.cassandra.spark.cassandra.output.concurrent.writes 100
spark.sql.catalog.cassandra.spark.cassandra.output.batch.grouping.key none
spark.sql.catalog.cassandra.spark.cassandra.output.throughputMBPerSec 1
spark.sql.catalog.cassandra.spark.cassandra.output.consistency.level LOCAL_QUORUM
spark.sql.catalog.cassandra.spark.cassandra.output.metrics false
spark.sql.catalog.cassandra.spark.cassandra.connection.timeoutMS 90000
spark.sql.catalog.cassandra.spark.cassandra.query.retry.count 10
spark.sql.catalog.cassandra com.datastax.spark.connector.datasource.CassandraCatalog
spark.sql.extensions com.datastax.spark.connector.CassandraSparkExtensions

I start with a total 16 cores Spark Job, and down to juste 1 core Spark Job.

Anyway, every time, after some times, the response is as follow, and the driver go to state failed :

21/09/19 19:03:50 ERROR QueryExecutor: Failed to execute: com.datastax.spark.connector.writer.RichBoundStatementWrapper@532adef2
com.datastax.oss.driver.api.core.servererrors.WriteTimeoutException: Cassandra timeout during SIMPLE write query at consistency LOCAL_QUORUM (2 replica were required but only 0 acknowledged the write)

It may be related to some nodes overloaded.. but how to debug ? What conf to adjust ?

Thanks

like image 225
Klun Avatar asked Dec 21 '25 10:12

Klun


1 Answers

Problem solved!

The problem was MY DATA, and NOT Cassandra.

Indeed, the size of few partitions (2000 of 60 000 000) were about 50 MB, instead of 1MB that I expected.

I just filtered to exclude large partition while ingesting in Spark :

import org.apache.spark.sql.functions.{col, expr, length}
...
spark.read.parquet("...")
// EXCLUDE LARGE PARTITIONS
.withColumn("bytes_count",length(col("blob")))
.filter("bytes_count< " + argSkipPartitionLargerThan)
// PROJECT
.select("data_key","blob")
// COMMIT
.writeTo(DS + "." + argTargetKS + "."+argTargetTable).append()

Ingestion is now OK with Spark in just 10 minutes (500 GB data)

like image 171
Klun Avatar answered Dec 24 '25 04:12

Klun



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!