Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a generic way of converting PCollection to PTable in Apache Crunch?

I have these method in a util class which are converting specific PCollection to specific PTable.

public static PTable<IdDetails, CASegmentsForModification> getPTableForCASegments(PCollection<CASegmentsForModification> aggregatedPCollectionForCASegments) {
    return aggregatedPCollectionForCASegments.parallelDo(new CASegmentsPTableConverter(),
            Avros.tableOf(Avros.records(IdDetails.class), Avros.records(CASegmentsForModification.class)));
}

public static PTable<IdDetails, UserPrimaryIdMapping> getPTableForPrimaryIdMapping(PCollection<UserPrimaryIdMapping> pCollectionOfUserPrimaryIdMapping) {
    return pCollectionOfUserPrimaryIdMapping.parallelDo(new UserPrimaryIdMappingPTableConverter(),
            Avros.tableOf(Avros.records(IdDetails.class), Avros.records(UserPrimaryIdMapping.class)));
}

public static PTable<IdDetails, UserGroupSegments> getPTableForUserGroupSegments(PCollection<UserGroupSegments> pCollectionOfUserGroupSegments) {
    return pCollectionOfUserGroupSegments.parallelDo(new UserGroupSegmentsPTableConverter(),
            Avros.tableOf(Avros.records(IdDetails.class), Avros.records(UserGroupSegments.class)));
}

How can I implement one generic method of above methods ?

like image 458
Vivek Rai Avatar asked Oct 18 '22 07:10

Vivek Rai


1 Answers

There is a better way using static asPtable method from the PTables util class. Your PCollection has to be of the type Pair and the PTable result will be of the type PTable

    public static <K,V> PTable<K,V> asPTable(PCollection<Pair<K,V>> pcollect)

Based on your example you just need to create your DoFn (or extended classes) yo return a Avros.pairs(Avros.records(yourClass.class), Avros.records(yourOtherClass.class)).

Another way is use a predefined MapFn which is ExtractKEyFn and apply it over your collection. You will need to implement the map method to extract the key and generate the key,value output. It is essentially the same idea, after that you can convert the PCollection> into the the PTable

It should save you a lot of boilerplate code.

Just in case, there are other functions which could be useful like FilterFn however we found some bugs when you use MemPipeline for unit testing. The first approach that I suggested should be most safe.

EDIT:

A good balance to save some code would be get your key based on a field name using the field name and call this MapFn for each PCollection.

//we are assuming the key will be in the first level of your record
public class GenericRecordToPair <V extends GenericRecord, K extends GenericRecord> extends MapFn<V, Pair<K, V>> {
    String key;

    public GenericRecordToPair(String key){
        this.key = key;
    }

    @Override
    public Pair<T, TupleN> map(S input) {
        return new Pair<K,V> (input.get(key), input);
    }

}

From your example you could do something like

PCollection<UserGroupSegments> pCollectionOfUserGroupSegments = ...//comming from somewhere
PCollection<UserPrimaryIdMapping> pCollectionOfUserPrimaryIdMapping = ...//comming from somewhere
PTable<IdDetails, UserGroupSegments> pTable1 = PTables.asPTable(pCollectionOfUserGroupSegments.parallelDo(new GenericRecordToPair("idDetails"), Avros.pairs(Avros.records(IdDetails.class), Avros.records(UserGroupSegments))));
PTable<IdDetails, UserPrimaryIdMapping> pTable2 = PTables.asPTable(pCollectionOfUserPrimaryIdMapping.parallelDo(new GenericRecordToPair("idDetails"), Avros.pairs(Avros.records(IdDetails.class), Avros.records(UserPrimaryIdMapping))));
like image 125
hlagos Avatar answered Oct 21 '22 08:10

hlagos