I have these method in a util class which are converting specific PCollection to specific PTable.
public static PTable<IdDetails, CASegmentsForModification> getPTableForCASegments(PCollection<CASegmentsForModification> aggregatedPCollectionForCASegments) {
return aggregatedPCollectionForCASegments.parallelDo(new CASegmentsPTableConverter(),
Avros.tableOf(Avros.records(IdDetails.class), Avros.records(CASegmentsForModification.class)));
}
public static PTable<IdDetails, UserPrimaryIdMapping> getPTableForPrimaryIdMapping(PCollection<UserPrimaryIdMapping> pCollectionOfUserPrimaryIdMapping) {
return pCollectionOfUserPrimaryIdMapping.parallelDo(new UserPrimaryIdMappingPTableConverter(),
Avros.tableOf(Avros.records(IdDetails.class), Avros.records(UserPrimaryIdMapping.class)));
}
public static PTable<IdDetails, UserGroupSegments> getPTableForUserGroupSegments(PCollection<UserGroupSegments> pCollectionOfUserGroupSegments) {
return pCollectionOfUserGroupSegments.parallelDo(new UserGroupSegmentsPTableConverter(),
Avros.tableOf(Avros.records(IdDetails.class), Avros.records(UserGroupSegments.class)));
}
How can I implement one generic method of above methods ?
There is a better way using static asPtable method from the PTables util class. Your PCollection has to be of the type Pair and the PTable result will be of the type PTable
public static <K,V> PTable<K,V> asPTable(PCollection<Pair<K,V>> pcollect)
Based on your example you just need to create your DoFn (or extended classes) yo return a Avros.pairs(Avros.records(yourClass.class), Avros.records(yourOtherClass.class)).
Another way is use a predefined MapFn which is ExtractKEyFn and apply it over your collection. You will need to implement the map method to extract the key and generate the key,value output. It is essentially the same idea, after that you can convert the PCollection> into the the PTable
It should save you a lot of boilerplate code.
Just in case, there are other functions which could be useful like FilterFn however we found some bugs when you use MemPipeline for unit testing. The first approach that I suggested should be most safe.
EDIT:
A good balance to save some code would be get your key based on a field name using the field name and call this MapFn for each PCollection.
//we are assuming the key will be in the first level of your record
public class GenericRecordToPair <V extends GenericRecord, K extends GenericRecord> extends MapFn<V, Pair<K, V>> {
String key;
public GenericRecordToPair(String key){
this.key = key;
}
@Override
public Pair<T, TupleN> map(S input) {
return new Pair<K,V> (input.get(key), input);
}
}
From your example you could do something like
PCollection<UserGroupSegments> pCollectionOfUserGroupSegments = ...//comming from somewhere
PCollection<UserPrimaryIdMapping> pCollectionOfUserPrimaryIdMapping = ...//comming from somewhere
PTable<IdDetails, UserGroupSegments> pTable1 = PTables.asPTable(pCollectionOfUserGroupSegments.parallelDo(new GenericRecordToPair("idDetails"), Avros.pairs(Avros.records(IdDetails.class), Avros.records(UserGroupSegments))));
PTable<IdDetails, UserPrimaryIdMapping> pTable2 = PTables.asPTable(pCollectionOfUserPrimaryIdMapping.parallelDo(new GenericRecordToPair("idDetails"), Avros.pairs(Avros.records(IdDetails.class), Avros.records(UserPrimaryIdMapping))));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With