I am trying to use apache spark and cassandra for the data analysis. So i wrote a java code to access the cassandra which is running on a remote machine. I used the following java code.
public class JavaDemo implements Serializable {
private transient SparkConf conf;
private JavaDemo(SparkConf conf) {
this.conf = conf;
}
private void run() {
JavaSparkContext sc = new JavaSparkContext(conf);
generateData(sc);
compute(sc);
showResults(sc);
sc.stop();
}
private void generateData(JavaSparkContext sc) {
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
Session session = connector.openSession();
// Prepare the schema
session.execute("DROP KEYSPACE IF EXISTS java_api");
session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)");
session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)");
}
private void compute(JavaSparkContext sc) {
System.out.println("IN compute");
}
private void showResults(JavaSparkContext sc) {
System.out.println("IN showResults");
}
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setAppName("Java API demo");
conf.setMaster("local[1]");
System.out.println("---------------------------------");
conf.set("spark.cassandra.connection.host", "192.168.1.219");
JavaDemo app = new JavaDemo(conf);
app.run();
}
}
Where 192.168.1.219 is my remote host where cassandra is running. And the default port is 9160.When i runtis progran i am getting following error.
15/01/29 10:14:26 INFO ui.SparkUI: Started Spark Web UI at http://Justin:4040
15/01/29 10:14:27 WARN core.FrameCompressor: Cannot find LZ4 class, you should make sure the LZ4 library is in the classpath if you intend to use it. LZ4 compression will not be available for the protocol.
Exception in thread "main" com.datastax.driver.core.exceptions.AuthenticationException: Authentication error on host /192.168.1.219:9042: Host /192.168.1.219:9042 requires authentication, but no authenticator found in Cluster configuration
at com.datastax.driver.core.AuthProvider$1.newAuthenticator(AuthProvider.java:38)
at com.datastax.driver.core.Connection.initializeTransport(Connection.java:139)
at com.datastax.driver.core.Connection.<init>(Connection.java:111)
at com.datastax.driver.core.Connection$Factory.open(Connection.java:445)
at com.datastax.driver.core.ControlConnection.tryConnect(ControlConnection.java:216)
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:172)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:80)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1145)
at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:313)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:166)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$4.apply(CassandraConnector.scala:151)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$4.apply(CassandraConnector.scala:151)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:72)
at com.datastax.spark.demo.JavaDemo.generateData(JavaDemo.java:42)
at com.datastax.spark.demo.JavaDemo.run(JavaDemo.java:34)
at com.datastax.spark.demo.JavaDemo.main(JavaDemo.java:73)
Is there anything i am missing. Its directly connectiong to the port 9042. How can i connect this?
It looks like you have authentication configured on your cassandra cluster. Since you are not providing credentials it is not allowing you to connect. You can pass in the auth credentials using the spark.cassandra.auth.username
and spark.cassandra.auth.password
properties as described here.
So you could do something like:
conf.set("spark.cassandra.auth.username", "cassandra");
conf.set("spark.cassandra.auth.password", "cassandra");
In your code to make this work.
If you have authentication enabled and you haven't created/changed any users yet, you could get away with using 'cassandra' for the username and password. In production though you should create a separate account and use that instead, and change the cassandra users password since it has access to everything.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With