I am running a Flink graph using RocksDB as my statebackend. For one of the join operators in my graph, I get the below exception. (the actual group #s vary of course from run to run).
java.lang.IllegalArgumentException: Key group 45 is not in KeyGroupRange{startKeyGroup=0, endKeyGroup=42}.
My operator is not too is as follows
Source1 -----> Map1A ---> KeyBy--->\___ >
\----> Map1B ---> KeyBy--->-----> Join1AB ---->
\____>
Source2 ----->------------KeyBy---> -----------------> Join2,1AB ---->
The error is thrown for in the Join2,1AB operator which joins (a) the result of Join1AB with the (keyed) source2.
Any ideas what could be causing this? I have the full stacktrace below, and I understand this is still very vague - but any pointers in the right direction is much appreciated.
Caused by: java.lang.IllegalArgumentException: Key group 45 is not in KeyGroupRange{startKeyGroup=0, endKeyGroup=42}.
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
at org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeKVStateData(RocksDBKeyedStateBackend.java:664)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$RocksDBFullSnapshotOperation.writeDBSnapshot(RocksDBKeyedStateBackend.java:521)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:417)
at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend$3.performOperation(RocksDBKeyedStateBackend.java:399)
at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
... 5 more
[CIRCULAR REFERENCE:java.lang.IllegalArgumentException: Key group 45 is not in KeyGroupRange{startKeyGroup=0, endKeyGroup=42}.]
EDIT: If I change my state backend to be file system (i.e. FsStateBackend), then I get the following stack trace. Something off with key group indexing.
java.lang.IllegalArgumentException: Key group index out of range of key group range [43, 86).
at org.apache.flink.runtime.state.heap.NestedMapsStateTable.setMapForKeyGroup(NestedMapsStateTable.java:104)
at org.apache.flink.runtime.state.heap.NestedMapsStateTable.putAndGetOld(NestedMapsStateTable.java:218)
at org.apache.flink.runtime.state.heap.NestedMapsStateTable.put(NestedMapsStateTable.java:207)
at org.apache.flink.runtime.state.heap.NestedMapsStateTable.put(NestedMapsStateTable.java:145)
at org.apache.flink.runtime.state.heap.HeapValueState.update(HeapValueState.java:72)
<snip user code stack trace>
org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:77)
at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:242)
at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
The problem was my data objects (POJOs) had an mutable hashcode. Specifically, the hash code contained Enums. Example if I have a stream of Cars where the hashcode is composed of the car year and the car type (enum) as below.
Car {
private final CarType carType;
private final int carYear
public long hashCode() {
int result = 17;
result = 31 * result + carYear;
result = 31 * result + carType.hasCode(); <---- This is mutable!
}
}
The enum's hashCode is essentially Object.hashCode() (which is memory address dependent). Subsequently, the hashCode on one machine (or process) will not be the same as on another machine (or process). This also explains why I only ran into this problem when running in a distributed environment as opposed to running locally.
To resolve this, I changed my hashCode() to be immutable. Doing String.hashCode() is poor performance, so I may need to optimize that. But the below definition of Car will fix the problem.
Car {
private final CarType carType;
private final int carYear
public long hashCode() {
int result = 17;
result = 31 * result + carYear;
result = 31 * result + carType.name().hasCode(); <---- This is IMMUTABLE!
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With