Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala - Cassandra: cluster read fails with error "Can't use this Cluster instance because it was previously closed"

I'm getting this error when reading from a table in a 5 node cluster using datastax drivers.

2015-02-19 03:24:09,908 ERROR [akka.actor.default-dispatcher-9] OneForOneStrategy akka://user/HealthServiceChecker-49e686b9-e189-48e3-9aeb-a574c875a8ab Can't use this Cluster instance because it was previously closed java.lang.IllegalStateException: Can't use this Cluster instance because it was previously closed at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1128) ~[cassandra-driver-core-2.0.4.jar:na] at com.datastax.driver.core.Cluster.init(Cluster.java:149) ~[cassandra-driver-core-2.0.4.jar:na] at com.datastax.driver.core.Cluster.connect(Cluster.java:225) ~[cassandra-driver-core-2.0.4.jar:na] at com.datastax.driver.core.Cluster.connect(Cluster.java:258) ~[cassandra-driver-core-2.0.4.jar:na]

I am able to connect using cqlsh and perform read operations.

Any clue what could be the problem here?

settings:

  • Consistency Level: ONE
  • keyspace replication strategy: 'class': 'NetworkTopologyStrategy', 'DC2': '1', 'DC1': '1'

  • cassandra version: 2.0.6

The code managing cassandra sessions is central and it is;

trait ConfigCassandraCluster
  extends CassandraCluster
{
  def cassandraConf: CassandraConfig
  lazy val port = cassandraConf.port
  lazy val host = cassandraConf.host
  lazy val cluster: Cluster =
    Cluster.builder()
      .addContactPoints(host)
      .withReconnectionPolicy(new ExponentialReconnectionPolicy(100, 30000))
      .withPort(port)
      .withSocketOptions(new SocketOptions().setKeepAlive(true))
      .build()

  lazy val keyspace = cassandraConf.keyspace
  private lazy val casSession = cluster.connect(keyspace)
  val session = new SessionProvider(casSession)
}

class SessionProvider(casSession: => Session) extends Logging {
  var lastSuccessful: Long = 0
  var firstSuccessful: Long = -1
  def apply[T](fn: Session => T): T = {
    val result = retry(fn, 15)
    if(firstSuccessful < 0)
      firstSuccessful = System.currentTimeMillis()
    lastSuccessful = System.currentTimeMillis()
    result
  }

  private def retry[T](fn: Session => T, remainingAttempts: Int): T = {
    //retry logic
}
like image 787
Kasun Kumara Avatar asked Feb 19 '15 11:02

Kasun Kumara


People also ask

How many Cassandra nodes can be backed by one cluster?

If backed by a cassandra cluster of 1 node there is no issue, only when backed by a cassandra cluster of more than 1 node. I have tried different cassandra config options with no avail, ie

How do I disable the parquet reader at the cluster level?

If you apply the changes at the cluster level, they apply to all notebooks attached to the cluster. Set spark.databricks.io.parquet.fastreader.enabled to false in the cluster’s Spark configuration to disable the fast Parquet reader at the cluster level.

How to enable SSL on Cassandra server?

You may need to check following option in their config: native_transport_port_ssl - it could be set to value 9142 - then SSL should happen via this port. Check comment in the cassandra.yaml around this option.


1 Answers

The problem is, cluster.connect(keyspace) will close the cluster itself if it experiences NoHostAvailableException. Due to that during retry logic, you are experiencing IllegalStateException.

Have a look at Cluster init() method and you will understand more.

The solution for your problem would be, in the retry logic, do Cluster.builder.addContactPoint(node).build.connect(keyspace). This will enable to have a new cluster object while you retry.

like image 164
happysathya Avatar answered Sep 30 '22 12:09

happysathya