Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

apache beam bigtable Iterable mutation

I'm migrating my google dataflow java 1.9 to beam 2.0 and I'm trying to use the BigtableIO.Write

    ....
.apply("", BigtableIO.write()
                .withBigtableOptions(bigtableOptions)
                .withTableId("twoSecondVitals"));

In the ParDo before the BigtableIO I'm struggling trying to make the Iterable.

          try{
        Mutation mutation = Mutation.parseFrom(new ObjectMapper().writeValueAsBytes(v));
        Mutation mu[] = {mutation};
        Iterable<Mutation> imu = Arrays.asList(mu);
        log.severe("imu");
        c.output(KV.of(ByteString.copyFromUtf8(rowKey+"_"+v.getEpoch()), imu));
      }catch (Exception e){
        log.severe(rowKey+"_"+v.getEpoch()+" error:"+e.getMessage());
      }

The code above throws the following exception InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag

v is a list of objects (Vitals.class). The hbase api uses Put method to create the mutation. How does one create a BigTable mutation that will work with the BigtableIO sink?

like image 393
Mike Avatar asked Feb 15 '26 04:02

Mike


1 Answers

By looking through the sdk's tests I was able to find my answer.

            Iterable<Mutation> mutations =
                ImmutableList.of(Mutation.newBuilder()
                .setSetCell(
                        Mutation.SetCell.newBuilder()
                        .setValue(ByteString.copyFrom(new ObjectMapper().writeValueAsBytes(v)))
                        .setFamilyName("vitals")
                ).build());
like image 126
Mike Avatar answered Feb 17 '26 20:02

Mike



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!