Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TupleTag Tag <taginfo> corresponds to a non-singleton result

In Google Cloud Dataflow, my join fails with " TupleTag Tag corresponds to a non-singleton result " From error stack it seems this is happening in overide method in CoGBKResults.

String Ad_ID = e.getKey();
String Ad_Info = "none";
Ad_Info = e.getValue().getOnly(AdInfoTag);

Following is my join method.

static PCollection<String> joinEvents(PCollection<TableRow> ImpressionTable,
      PCollection<TableRow> AdTable) throws Exception {

    final TupleTag<String> ImpressionInfoTag = new TupleTag<String>();
    final TupleTag<String> AdInfoTag = new TupleTag<String>();

    // transform both input collections to tuple collections, where the keys are Ad_ID
    PCollection<KV<String, String>> ImpressionInfo = ImpressionTable.apply(
        ParDo.of(new ExtractImpressionDataInfoFn()));
    PCollection<KV<String, String>> AdInfo = AdTable.apply(
        ParDo.of(new ExtractAdDataInfoFn()));

    // Ad_ID 'key' -> CGBKR (<ImpressionInfo>, <AdInfo>)
    PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
        .of(ImpressionInfoTag, ImpressionInfo)
        .and(AdInfoTag, AdInfo)
        .apply(CoGroupByKey.<String>create());

    // Process the CoGbkResult elements generated by the CoGroupByKey transform.
    // Ad_ID 'key' -> string of <Impressioninfo>, <Adinfo>
    PCollection<KV<String, String>> finalResultCollection =
      kvpCollection.apply(ParDo.named("Process").of(
        new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
            private static final long serialVersionUID = 1L;

        @Override
          public void processElement(ProcessContext c) {
            KV<String, CoGbkResult> e = c.element();
            String Ad_ID = e.getKey();
            String Ad_Info = "none";
            Ad_Info = e.getValue().getOnly(AdInfoTag);
            for (String eventInfo : c.element().getValue().getAll(ImpressionInfoTag)) {
              // Generate a string that combines information from both collection values
              c.output(KV.of(Ad_ID, " " + Ad_Info
                      + " " + eventInfo));
            }
          }
      }));

     //write to GCS
    PCollection<String> formattedResults = finalResultCollection
        .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() {
          @Override
          public void processElement(ProcessContext c) {
            String outputstring = "AdUnitID: " + c.element().getKey()
                + ", " + c.element().getValue();
            c.output(outputstring);
          }
        }));
    return formattedResults;
  }

My ExtractImpressionDataInfoFn class and ExtractAdDatInfoFn class are below.

static class ExtractImpressionDataInfoFn extends DoFn<TableRow, KV<String, String>> {
    private static final long serialVersionUID = 1L;

    @Override
    public void processElement(ProcessContext c) {
        TableRow row = c.element();
        String Ad_ID = (String) row.get("AdUnitID");
        String User_ID = (String) row.get("UserID");
        String Client_ID = (String) row.get("ClientID");
        String Impr_Time = (String) row.get("GfpActivityAdEventTIme");
        String ImprInfo = "UserID: " + User_ID + ", ClientID: " + Client_ID + ", GfpActivityAdEventTIme: " + Impr_Time;
        c.output(KV.of(Ad_ID, ImprInfo));
    }
}


static class ExtractAdDataInfoFn extends DoFn<TableRow, KV<String, String>> {
    private static final long serialVersionUID = 1L;

    @Override
    public void processElement(ProcessContext c) {
        TableRow row = c.element();
        String Ad_ID = (String) row.get("AdUnitID");
        String Content_ID = (String) row.get("ContentID");
        String Pub_ID = (String) row.get("Publisher");
        String Add_Info = "ContentID: " + Content_ID + ", Publisher: " + Pub_ID;
        c.output(KV.of(Ad_ID, Add_Info));
    }
}

Schema for Impression and Ad are below Impression: AdUnitID UserID ClientID
GfpActivityAdEventTIme

Ad: AdUnitID ClientID Publisher

enter image description here

like image 829
KosiB Avatar asked Sep 03 '25 14:09

KosiB


1 Answers

That error suggests that when you are calling getOnly the CoGroupByKey had more than one result. Specifically this line:

Ad_Info = e.getValue().getOnly(AdInfoTag);

If you change that to getAll(AdInfoTag) it should work.

like image 64
Ben Chambers Avatar answered Sep 05 '25 03:09

Ben Chambers



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!