I am just starting up with Spark. I am trying to use it to implement distributed processing for a deduplication application. The part i am working on now is supposed to get a RDD list of Pair<String[],String[]> which are the columns of the records. This process should be highly parallelisable but currently i am just working on local. When debuging everything seems to work as expected in the map function, when the collect tries to execute though everything breaks :( and i have no idea why, its not even running on a cluster.
This is the part of the code let me know if you need to see more:
JavaRDD<Pair<String[],String[]>> rddData = javaSparkContext.parallelize(data.stream()
.map(p->Pair.of(p.getLeft().fields(),p.getRight().fields()))
.collect(Collectors.toList()),2);
final int featureVecSize = featureCombinationToFeatureIndex.size();
List<double[]> distributedArchFeatures = rddData.map(recordPair ->{
String[] valuesA = recordPair.getLeft();
String[] valuesB = recordPair.getRight();
double[] scores = new double[featureVecSize];
for(Map.Entry<Triple<Integer, ComparisonFeature, ComparisonModifier>, Integer> t : featureCombinationToFeatureIndex.entrySet()) {
int index = t.getKey().getLeft();
ComparisonFeature feature = t.getKey().getMiddle();
ComparisonModifier modifier = t.getKey().getRight();
double score = modifier.calculateScore(feature, valuesA[index], valuesB[index]);
scores[t.getValue()] = score;
}
return scores;
}).collect();
And this is the stacktrace i get, it seems to be retrying to retreive the data but no dice:
[analyzerbeans-pool1-thread-27] INFO com.hi.identify7.execution.simple.SimpleMatchingExecutionContext - Scoring all matches...
[analyzerbeans-pool1-thread-27] INFO org.apache.spark.SparkContext - Starting job: collect at SimpleMatchingExecutionContext.java:177
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Got job 0 (collect at SimpleMatchingExecutionContext.java:177) with 2 output partitions
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Final stage: ResultStage 0 (collect at SimpleMatchingExecutionContext.java:177)
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Parents of final stage: List()
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Missing parents: List()
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting ResultStage 0 (MapPartitionsRDD[1] at map at SimpleMatchingExecutionContext.java:165), which has no missing parents
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_0 stored as values in memory (estimated size 4.9 KB, free 2.5 GB)
[dag-scheduler-event-loop] INFO org.apache.spark.storage.memory.MemoryStore - Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.7 KB, free 2.5 GB)
[dispatcher-event-loop-0] INFO org.apache.spark.storage.BlockManagerInfo - Added broadcast_0_piece0 in memory on 10.209.1.88:52600 (size: 2.7 KB, free: 2.5 GB)
[dag-scheduler-event-loop] INFO org.apache.spark.SparkContext - Created broadcast 0 from broadcast at DAGScheduler.scala:1006
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.DAGScheduler - Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SimpleMatchingExecutionContext.java:165) (first 15 tasks are for partitions Vector(0, 1))
[dag-scheduler-event-loop] INFO org.apache.spark.scheduler.TaskSchedulerImpl - Adding task set 0.0 with 2 tasks
[dispatcher-event-loop-1] WARN org.apache.spark.scheduler.TaskSetManager - Stage 0 contains a task of very large size (11664 KB). The maximum recommended task size is 100 KB.
[dispatcher-event-loop-1] INFO org.apache.spark.scheduler.TaskSetManager - Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 11944196 bytes)
[Executor task launch worker for task 0] INFO org.apache.spark.executor.Executor - Running task 0.0 in stage 0.0 (TID 0)
[Executor task launch worker for task 0] INFO org.apache.spark.storage.memory.MemoryStore - Block taskresult_0 stored as bytes in memory (estimated size 185.3 MB, free 2.3 GB)
[dispatcher-event-loop-1] INFO org.apache.spark.storage.BlockManagerInfo - Added taskresult_0 in memory on 10.209.1.88:52600 (size: 185.3 MB, free: 2.3 GB)
[Executor task launch worker for task 0] INFO org.apache.spark.executor.Executor - Finished task 0.0 in stage 0.0 (TID 0). 194298473 bytes result sent via BlockManager)
[dispatcher-event-loop-2] INFO org.apache.spark.scheduler.TaskSetManager - Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 11932301 bytes)
[Executor task launch worker for task 1] INFO org.apache.spark.executor.Executor - Running task 1.0 in stage 0.0 (TID 1)
[task-result-getter-0] INFO org.apache.spark.network.client.TransportClientFactory - Successfully created connection to /10.209.1.88:52600 after 59 ms (0 ms spent in bootstraps)
[shuffle-client-4-1] ERROR org.apache.spark.network.client.TransportClient - Failed to send RPC 7173534709356817937 to /10.209.1.88:52600: java.lang.AbstractMethodError: org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted;
java.lang.AbstractMethodError: org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted;
at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:73)
at io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:107)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:810)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:111)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1089)
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1136)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1078)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
[shuffle-client-4-1] ERROR org.apache.spark.network.shuffle.OneForOneBlockFetcher - Failed while starting block fetches
java.io.IOException: Failed to send RPC 7173534709356817937 to /10.209.1.88:52600: java.lang.AbstractMethodError: org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted;
at org.apache.spark.network.client.TransportClient.lambda$sendRpc$2(TransportClient.java:237)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:122)
at io.netty.util.internal.PromiseNotificationUtil.tryFailure(PromiseNotificationUtil.java:64)
at io.netty.channel.AbstractChannelHandlerContext.notifyOutboundHandlerException(AbstractChannelHandlerContext.java:837)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:740)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1089)
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1136)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1078)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AbstractMethodError: org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted;
at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:73)
at io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:107)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:810)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:111)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
... 17 more
[shuffle-client-4-1] INFO org.apache.spark.network.shuffle.RetryingBlockFetcher - Retrying fetch (1/3) for 1 outstanding blocks after 5000 ms
[Block Fetch Retry-6-1] INFO org.apache.spark.network.client.TransportClientFactory - Found inactive connection to /10.209.1.88:52600, creating a new one.
[Block Fetch Retry-6-1] INFO org.apache.spark.network.client.TransportClientFactory - Successfully created connection to /10.209.1.88:52600 after 1 ms (0 ms spent in bootstraps)
[shuffle-client-4-1] ERROR org.apache.spark.network.client.TransportClient - Failed to send RPC 6680513752030028512 to /10.209.1.88:52600: java.lang.AbstractMethodError: org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted;
java.lang.AbstractMethodError: org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted;
at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:73)
at io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:107)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:810)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:111)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1089)
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1136)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1078)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
[shuffle-client-4-1] ERROR org.apache.spark.network.shuffle.OneForOneBlockFetcher - Failed while starting block fetches
java.io.IOException: Failed to send RPC 6680513752030028512 to /10.209.1.88:52600: java.lang.AbstractMethodError: org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted;
at org.apache.spark.network.client.TransportClient.lambda$sendRpc$2(TransportClient.java:237)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:122)
at io.netty.util.internal.PromiseNotificationUtil.tryFailure(PromiseNotificationUtil.java:64)
at io.netty.channel.AbstractChannelHandlerContext.notifyOutboundHandlerException(AbstractChannelHandlerContext.java:837)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:740)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1089)
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1136)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1078)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AbstractMethodError: org.apache.spark.network.protocol.MessageWithHeader.touch(Ljava/lang/Object;)Lio/netty/util/ReferenceCounted;
at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:73)
at io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:107)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:810)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:111)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
This is due to netty dependency conflicts. Check the dependency tree of your project and use the one that spark needs
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With