I'm working on a project using rxjava1 and I have an Observable chain that occasionally will contain thousands of observables merged or concatted together. When this happens a StackOverflow exception will occur and we will get something like this:
java.lang.StackOverflowError
at java.util.HashMap.putVal(HashMap.java:631)
at java.util.HashMap.put(HashMap.java:612)
at rx.internal.operators.OnSubscribeToMap$ToMapSubscriber.onNext(OnSubscribeToMap.java:127)
at rx.internal.operators.OnSubscribeFilter$FilterSubscriber.onNext(OnSubscribeFilter.java:76)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
And the stacktrace will continue for hundreds of lines. The only related post I've seen about this is this issue in github: https://github.com/ReactiveX/RxJava/issues/3035. But the proposed solution of adding observables to a list is something we have used and doesn't work.
What can I do to prevent these StackOverflow exceptions? Do I need to do some sort of throttling or backpressuring?
Here is an example of what the current code looks like and is causing stackoverflows:
public Observable<Map<String, JsonObject>> extractTopLevelSummariesFromForms(JsonArray summaries, Func2<String, String, Observable<JsonObject>> summaryGatherer) {
List<Observable<JsonObject>> summaryObservables = new LinkedList<>();
summaries.stream()
.map(JsonUtil::safeJsonObject)
.filter(summary -> StringUtils.isNotEmpty(summary.getString(NAME))|| StringUtils.isNotEmpty(summary.getString(Form.TITLE)))
.forEach(summary -> {
if (StringUtils.isNotEmpty(summary.getString(TEXT)))
summaryObservables.add(gatherSummariesFromElement(summary.getString(Summary.SHORT_NAME), Summary.SummaryValues.FORM,
summary.getString(Summary.SHORT_NAME) + ".hidden",
summary.getString(VALUE), summaryGatherer));
if (StringUtils.isNotEmpty(summary.getString(Form.TEXT)))
summaryObservables.add(gatherSummariesFromElement(summary.getString(Summary.SHORT_NAME), Summary.SummaryValues.FORM,
summary.getString(Summary.SHORT_NAME) + ".title",
summary.getString(Summary.VALUE), summaryGatherer, true));
});
return Observable.merge(Observable.from(summaryObservables))
.filter(summaryResult -> summaryResult != null)
.toMap(summaryResult -> summaryResult.getString(KEY), summaryResult -> summaryResult.getJsonObject(TEXT));
}
private Observable<JsonObject> gatherSummariesFromElement(String parentName, String parentType, String elementName, String summaryValue, Func2<String, String, Observable<JsonObject>> summaryGatherer, Set<String> visited, boolean isList) {
if (visited.contains(elementName))
return Observable.just(null);
visited.add(elementName);
Map<String, JsonObject> summariesMap = new HashMap<>();
summariesMap.put(elementName, new JsonObject().put(Summary.SummaryValues.FORM, form).put(SUMMARY_TYPE, parentType));
Set<String> variables = TextEngine.getVariables(summariesMap);
Observable<JsonObject> elementSummaryObservable = Observable.just(getSummaryEntry(elementName, form, parentType, isList));
if (variables != null && !variables.isEmpty()) {
elementSummaryObservable = elementSummaryObservable.mergeWith(Observable.from(variables).flatMap(variable -> {
if (StringUtils.contains(variable, ".") && StringUtils.equals(parentName, StringUtils.split(variable, ".")[0]))
return Observable.just(null);
else
return summaryGatherer.call(parentName, variable).flatMap(variableEntry -> {
if (variableEntry == null)
return Observable.just(null);
else
return gatherSummariesFromElement(parentName, variableEntry.getString(SOURCE_TYPE), variable, variableEntry.getString(FORM), summaryGatherer, visited, variableEntry.getBoolean(Summary.SummaryValues.IS_LIST, false));
});
}));
}
return elementSummaryObservable;
}
I've tried running everything in the Schedulers.computation()
scheduler except for network requests, those are being run in Schedulers.io()
schedulers and I am still getting stackoverflows:
Exception in thread "pool-26-thread-2" java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:59)
at rx.internal.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.run(ExecutorScheduler.java:107)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.StackOverflowError
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
gatherSummariesFromElement
which perhaps go very deeplyObservable.just(null)
and then filtering null values you may use Observable.empty()
summaryObservables
looks like overkill. You may construct list of valid summaries instead of and then process them in the flatMapgatherSummariesFromElement
with the recursive list of elements creation and then create observable from that list_
public Observable<Map<String, JsonObject>> extractTopLevelSummariesFromForms(JsonArray summaries, Func2<String, String, JsonObject> summaryGatherer) {
List<JsonObject> validSummaries = new LinkedList<>();
summaries.stream()
.map(JsonUtil::safeJsonObject)
.filter(summary -> StringUtils.isNotEmpty(summary.getString(NAME)) || StringUtils.isNotEmpty(summary.getString(Form.TITLE)))
.forEach(validSummaries::add);
Set<String> visited = new HashSet<>();
return Observable.from(validSummaries)
.flatMap(summary -> {
if (StringUtils.isNotEmpty(summary.getString(TEXT)))
Observable.from(gatherSummariesFromElement(summary.getString(Summary.SHORT_NAME), Summary.SummaryValues.FORM,
summary.getString(Summary.SHORT_NAME) + ".hidden",
summary.getString(VALUE), visited, summaryGatherer)));
if (StringUtils.isNotEmpty(summary.getString(Form.TEXT)))
Observable.from(gatherSummariesFromElement(summary.getString(Summary.SHORT_NAME), Summary.SummaryValues.FORM,
summary.getString(Summary.SHORT_NAME) + ".title",
summary.getString(Summary.VALUE), summaryGatherer, visited,true)));
})
.toMap(summaryResult -> summaryResult.getString(KEY), summaryResult -> summaryResult.getJsonObject(TEXT));
}
private List<JsonObject> gatherSummariesFromElement(String parentName, String parentType, String elementName, String summaryValue, Func2<String, String, JsonObject> summaryGatherer, Set<String> visited, boolean isList) {
if (visited.contains(elementName))
return Collections.emptyList();
visited.add(elementName);
List<JsonObject> result = new ArrayList<>()
Map<String, JsonObject> summariesMap = new HashMap<>();
summariesMap.put(elementName, new JsonObject().put(Summary.SummaryValues.FORM, form).put(SUMMARY_TYPE, parentType));
Set<String> variables = TextEngine.getVariables(summariesMap);
result.add(getSummaryEntry(elementName, form, parentType, isList));
if (variables != null && !variables.isEmpty()) {
for (String variable : variables) {
if (StringUtils.contains(variable, ".") && StringUtils.equals(parentName, StringUtils.split(variable, ".")[0])) {
// do nothing
} else {
JsonObject variableEntry = summaryGatherer.call(parentName, variable)
if (variableEntry != null) {
result.addAll(gatherSummariesFromElement(parentName, variableEntry.getString(SOURCE_TYPE), variable, variableEntry.getString(FORM), summaryGatherer, visited, variableEntry.getBoolean(Summary.SummaryValues.IS_LIST, false));
}
}
}
}
return result;
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With