Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Elasticsearch-Hadoop library cannot connect to to docker container

I have spark job that reads from Cassandra, processes/transforms/filters the data, and writes the results to Elasticsearch. I use docker for my integration tests, and I am running into trouble of writing from spark to Elasticsearch.

Dependencies:

"joda-time"              % "joda-time"          % "2.9.4",
"javax.servlet"          %  "javax.servlet-api" % "3.1.0",
"org.elasticsearch"      %  "elasticsearch"     % "2.3.2",
"org.scalatest"          %% "scalatest"         % "2.2.1",
"com.github.nscala-time" %% "nscala-time"       % "2.10.0",
"cascading"              %   "cascading-hadoop" % "2.6.3",
"cascading"              %   "cascading-local"  % "2.6.3",
"com.datastax.spark"     %% "spark-cassandra-connector" % "1.4.2",
"com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5",
"org.elasticsearch"      %  "elasticsearch-hadoop"      % "2.3.2" excludeAll(ExclusionRule("org.apache.storm")),
"org.apache.spark"       %% "spark-catalyst"            % "1.4.0" % "provided"

In my unit tests I can connect to elasticsearch using a TransportClient to setup my template and index

aka. This works

val conf = new SparkConf().setAppName("test_reindex").setMaster("local")
  .set("spark.cassandra.input.split.size_in_mb", "67108864")
  .set("spark.cassandra.connection.host", cassandraHostString)
  .set("es.nodes", elasticsearchHostString)
  .set("es.port", "9200")
  .set("http.publish_host", "")
sc = new SparkContext(conf)
esClient = TransportClient.builder().build()
esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(elasticsearchHostString), 9300))
esClient.admin().indices().preparePutTemplate(testTemplate).setSource(Source.fromInputStream(getClass.getResourceAsStream("/mytemplate.json")).mkString).execute().actionGet()
esClient.admin().indices().prepareCreate(esTestIndex).execute().actionGet()
esClient.admin().indices().prepareAliases().addAlias(esTestIndex, "hot").execute().actionGet()

However when I try to run

EsSpark.saveToEs(
  myRDD,
  "hot/mytype",
  Map("es.mapping.id" -> "id", "es.mapping.parent" -> "parent_id")
)

I receive this stack trace

org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.2:9200]] 
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:442)
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:518)
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:524)
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:491)
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:412)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:400)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/08/08 12:30:46 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost): org.elasticsearch.hadoop.rest.EsHadoopNoNodesLeftException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[172.17.0.2:9200]] 
at org.elasticsearch.hadoop.rest.NetworkClient.execute(NetworkClient.java:142)
at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:434)
at org.elasticsearch.hadoop.rest.RestClient.executeNotFoundAllowed(RestClient.java:442)
at org.elasticsearch.hadoop.rest.RestClient.exists(RestClient.java:518)
at org.elasticsearch.hadoop.rest.RestClient.touch(RestClient.java:524)
at org.elasticsearch.hadoop.rest.RestRepository.touch(RestRepository.java:491)
at org.elasticsearch.hadoop.rest.RestService.initSingleIndex(RestService.java:412)
at org.elasticsearch.hadoop.rest.RestService.createWriter(RestService.java:400)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:40)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$saveToEs$1.apply(EsSpark.scala:67)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I can verify using 'docker network inspect bridge that it is trying to connect to the correct ip address.

docker network inspect bridge
[
{
    "Name": "bridge",
    "Id": "ef184e3be3637be28f854c3278f1c8647be822a9413120a8957de6d2d5355de1",
    "Scope": "local",
    "Driver": "bridge",
    "EnableIPv6": false,
    "IPAM": {
        "Driver": "default",
        "Options": null,
        "Config": [
            {
                "Subnet": "172.17.0.0/16",
                "Gateway": "172.17.0.1"
            }
        ]
    },
    "Internal": false,
    "Containers": {
        "0c79680de8ef815bbe4bdd297a6f845cce97ef18bb2f2c12da7fe364906c3676": {
            "Name": "analytics_rabbitmq_1",
            "EndpointID": "3f03fdabd015fa1e2af802558aa59523f4a3c8c72f1231d07c47a6c8e60ae0d4",
            "MacAddress": "02:42:ac:11:00:04",
            "IPv4Address": "172.17.0.4/16",
            "IPv6Address": ""
        },
        "9b1f37c8df344c50e042c4b3c75fcb2774888f93fd7a77719fb286bb13f76f38": {
            "Name": "analytics_elasticsearch_1",
            "EndpointID": "fb083d27aaf8c0db1aac90c2a1ea2f752c46d8ac045e365f4b9b7d1651038a56",
            "MacAddress": "02:42:ac:11:00:02",
            "IPv4Address": "172.17.0.2/16",
            "IPv6Address": ""
        },
        "ed0cfad868dbac29bda66de6bee93e7c8caf04d623d9442737a00de0d43c372a": {
            "Name": "analytics_cassandra_1",
            "EndpointID": "2efa95980d681b3627a7c5e952e2f01980cf5ffd0fe4ba6185b2cab735784df6",
            "MacAddress": "02:42:ac:11:00:03",
            "IPv4Address": "172.17.0.3/16",
            "IPv6Address": ""
        }
    },
    "Options": {
        "com.docker.network.bridge.default_bridge": "true",
        "com.docker.network.bridge.enable_icc": "true",
        "com.docker.network.bridge.enable_ip_masquerade": "true",
        "com.docker.network.bridge.host_binding_ipv4": "0.0.0.0",
        "com.docker.network.bridge.name": "docker0",
        "com.docker.network.driver.mtu": "1500"
    },
    "Labels": {}
}
]

I am running everything locally on a macbook/osx. I am at a loss for why I can connect to the docker container using the TransportClient and through my browser, but the function EsSpark.saveToES(...) always fails.

like image 935
Needs Help Avatar asked Oct 30 '22 22:10

Needs Help


1 Answers

By setting

.config("es.nodes.wan.only", "true")

can solve this issue

es.nodes.ingest.only

(default false) Whether to use Elasticsearch ingest nodes only. When enabled, elasticsearch-hadoop will route all of its requests (after nodes discovery, if enabled) through the ingest nodes within the cluster. The purpose of this configuration setting is to avoid incurring the cost of forwarding data meant for a pipeline from non-ingest nodes; Really only useful when writing data to an Ingest Pipeline (see es.ingest.pipeline above).

like image 138
Frank Cheng Avatar answered Nov 15 '22 06:11

Frank Cheng