Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark streaming JavaCustomReceiver

Spark streaming java custom receiver issue in yarn on EMR client mode as well as in cluster mode.

Following is the issue.

When we run the JavaCustomReceiver of spark streaming on EMR cluster (yarn) it randomly binds listener on one of executor. So all we have to do is, divert all the incoming traffic to that ip which it selects randomly.

Things go even worse, when that executor fails. After restart, it tries to bind listener on another executor, again randomly chosen. Once again we have to divert incoming traffic to that newly selected node.

My questions are: 1. Can we not bind listener on specific executor of our choice? (fix ip for listener to bind on) If not... 2. Is there anyway by which we can know programmatically that on which executor it has restarted the listener? (as it chooses random node to start that receiver) And above all. 3. Can we not bind listener on node where driver is running? (when running in client mode)

FYI

We already tried preferredLocation of Receiver super class, but we didn't had luck with the same.

Any help on this is highly appreciated.

like image 632
desaiankitb Avatar asked Oct 30 '22 13:10

desaiankitb


1 Answers

I was struggling with the same issue using Flume. Even if I passed FlumeUtils.createStream my preferred location hostname, it tried each node to run and failed for all of them except the exact node. If I pass 0.0.0.0 as the host, then it behaves as you described. It selects a random node to start the receiver.

Problem: The resource manager could not decide the preferred worker node to run the receiver.

Solution:

  1. Make sure you override preferredLocation
  2. Make sure you use the exact node reference as the preferred location as indicated by Spark+Flume Integration => "Note that the hostname should be the same as the one used by the resource manager in the cluster (Mesos, YARN or Spark Standalone), so that resource allocation can match the names and launch the receiver in the right machine.".

    For my case FQDN, hostname did not work. IP was the solution.

  3. Make sure the IP you pass belongs to a Spark Worker node(a datanode). If you pass an IP address of a non-datanode machine, it also fails.

    I was also planning to run on the node where driver runs as client but spark pushs the receiver to a worker node. If your client node is a worker node then that wont be a problem. If it is not -like my case that client is a queen (name)node-, then you should select a cool worker node to run the receiver :)

I hope it helps.

like image 198
m.ozcelik Avatar answered Nov 18 '22 17:11

m.ozcelik