In Google Cloud Dataflow, my join fails with " TupleTag Tag corresponds to a non-singleton result " From error stack it seems this is happening in overide method in CoGBKResults.
String Ad_ID = e.getKey();
String Ad_Info = "none";
Ad_Info = e.getValue().getOnly(AdInfoTag);
Following is my join method.
static PCollection<String> joinEvents(PCollection<TableRow> ImpressionTable,
PCollection<TableRow> AdTable) throws Exception {
final TupleTag<String> ImpressionInfoTag = new TupleTag<String>();
final TupleTag<String> AdInfoTag = new TupleTag<String>();
// transform both input collections to tuple collections, where the keys are Ad_ID
PCollection<KV<String, String>> ImpressionInfo = ImpressionTable.apply(
ParDo.of(new ExtractImpressionDataInfoFn()));
PCollection<KV<String, String>> AdInfo = AdTable.apply(
ParDo.of(new ExtractAdDataInfoFn()));
// Ad_ID 'key' -> CGBKR (<ImpressionInfo>, <AdInfo>)
PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
.of(ImpressionInfoTag, ImpressionInfo)
.and(AdInfoTag, AdInfo)
.apply(CoGroupByKey.<String>create());
// Process the CoGbkResult elements generated by the CoGroupByKey transform.
// Ad_ID 'key' -> string of <Impressioninfo>, <Adinfo>
PCollection<KV<String, String>> finalResultCollection =
kvpCollection.apply(ParDo.named("Process").of(
new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) {
KV<String, CoGbkResult> e = c.element();
String Ad_ID = e.getKey();
String Ad_Info = "none";
Ad_Info = e.getValue().getOnly(AdInfoTag);
for (String eventInfo : c.element().getValue().getAll(ImpressionInfoTag)) {
// Generate a string that combines information from both collection values
c.output(KV.of(Ad_ID, " " + Ad_Info
+ " " + eventInfo));
}
}
}));
//write to GCS
PCollection<String> formattedResults = finalResultCollection
.apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() {
@Override
public void processElement(ProcessContext c) {
String outputstring = "AdUnitID: " + c.element().getKey()
+ ", " + c.element().getValue();
c.output(outputstring);
}
}));
return formattedResults;
}
My ExtractImpressionDataInfoFn class and ExtractAdDatInfoFn class are below.
static class ExtractImpressionDataInfoFn extends DoFn<TableRow, KV<String, String>> {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) {
TableRow row = c.element();
String Ad_ID = (String) row.get("AdUnitID");
String User_ID = (String) row.get("UserID");
String Client_ID = (String) row.get("ClientID");
String Impr_Time = (String) row.get("GfpActivityAdEventTIme");
String ImprInfo = "UserID: " + User_ID + ", ClientID: " + Client_ID + ", GfpActivityAdEventTIme: " + Impr_Time;
c.output(KV.of(Ad_ID, ImprInfo));
}
}
static class ExtractAdDataInfoFn extends DoFn<TableRow, KV<String, String>> {
private static final long serialVersionUID = 1L;
@Override
public void processElement(ProcessContext c) {
TableRow row = c.element();
String Ad_ID = (String) row.get("AdUnitID");
String Content_ID = (String) row.get("ContentID");
String Pub_ID = (String) row.get("Publisher");
String Add_Info = "ContentID: " + Content_ID + ", Publisher: " + Pub_ID;
c.output(KV.of(Ad_ID, Add_Info));
}
}
Schema for Impression and Ad are below
Impression:
AdUnitID
UserID
ClientID
GfpActivityAdEventTIme
Ad: AdUnitID ClientID Publisher
enter image description here
That error suggests that when you are calling getOnly
the CoGroupByKey
had more than one result. Specifically this line:
Ad_Info = e.getValue().getOnly(AdInfoTag);
If you change that to getAll(AdInfoTag)
it should work.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With