I have 3 cascading pipes (one to join against other two) described as follows,
Psuedocode as follows, This example involves two joins
IF F1DecidingFactor = YES then Join LHSPipe with RHS Lookup#1 BY (LHSPipe.F1Input = RHS Lookup#1.Join#F1) and set the lookup result (SET LHSPipe.F1Output = Result#F1) Otherwise SET LHSPipe.F1Output = N/A
The same logic applies for F2 computation.
The expected output,
This scenario forced me to go with Custom Join operation as IF-ELSE decides whether to Join or not.
Considering the above scenario, I would like to go for MAP-SIDE join (keeping RHSPipe in memory of MAP task node), I was thinking of the below possible solutions, each has its pros and cons. Need your suggestions on these.
Option#1:
CoGroup - We can build custom join logic using CoGroup with BufferJoiner followed by custom join (operation), but that wouldnt ensure MAP-SIDE join.
Option#2:
HashJoin - It ensures MAP-SIDE join, but as far as I see custom join cannot be built using this.
Please correct my understanding and suggest your opinions to work on this requirement.
Thanks in advance.
The best way to solve this problem (which I can think off) is to modify your smaller dataset. You can add a new field (F1DecidingFactor
) to the smaller dataset. The value of F1Result
can should like:
Sudo code
if F1DecidingFactor == "Yes" then
F1Result = ACTUAL_VALUE
else
F1Result = "N/A"
Result Table
|F1#Join|F1#Result|F1#DecidingFactor|
| Yes| 0| True|
| Yes| 1| False|
| No| 0| N/A|
| No| 1| N/A|
You can do above via cascading as well.
After this, you can do your map side join.
If modifying smaller dataset is not possible, then I have 2 options to get the problem solved.
Option 1
Add new fields to your small pipes which is equivalent to you deciding factor (i.e. F1DecidingFactor_RHS = Yes
). Then include it to your join criteria. Once your join is done, You will have values to only those rows where this condition is matching. Otherwise it will be null/blank. Sample code:
Main Class
import cascading.operation.Insert;
import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.assembly.Discard;
import cascading.pipe.joiner.LeftJoin;
import cascading.tuple.Fields;
public class StackHashJoinTestOption2 {
public StackHashJoinTestOption2() {
Fields f1Input = new Fields("F1Input");
Fields f2Input = new Fields("F2Input");
Fields f1Join = new Fields("F1Join");
Fields f2Join = new Fields("F2Join");
Fields f1DecidingFactor = new Fields("F1DecidingFactor");
Fields f2DecidingFactor = new Fields("F2DecidingFactor");
Fields f1DecidingFactorRhs = new Fields("F1DecidingFactor_RHS");
Fields f2DecidingFactorRhs = new Fields("F2DecidingFactor_RHS");
Fields lhsJoinerOne = f1DecidingFactor.append(f1Input);
Fields lhsJoinerTwo = f2DecidingFactor.append(f2Input);
Fields rhsJoinerOne = f1DecidingFactorRhs.append(f1Join);
Fields rhsJoinerTwo = f2DecidingFactorRhs.append(f2Join);
Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output");
// Large Pipe fields :
// F1DecidingFactor F1Input F2DecidingFactor F2Input
Pipe largePipe = new Pipe("large-pipe");
// Small Pipe 1 Fields :
// F1Join F1Result
Pipe rhsOne = new Pipe("small-pipe-1");
// New field to small pipe. Expected Fields:
// F1Join F1Result F1DecidingFactor_RHS
rhsOne = new Each(rhsOne, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL);
// Small Pipe 2 Fields :
// F2Join F2Result
Pipe rhsTwo = new Pipe("small-pipe-2");
// New field to small pipe. Expected Fields:
// F2Join F2Result F2DecidingFactor_RHS
rhsTwo = new Each(rhsTwo, new Insert(f1DecidingFactorRhs, "Yes"), Fields.ALL);
// Joining first small pipe. Expected fields after join:
// F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS
Pipe resultsOne = new HashJoin(largePipe, lhsJoinerOne, rhsOne, rhsJoinerOne, new LeftJoin());
// Joining second small pipe. Expected fields after join:
// F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F1DecidingFactor_RHS F2Join F2Result F2DecidingFactor_RHS
Pipe resultsTwo = new HashJoin(resultsOne, lhsJoinerTwo, rhsTwo, rhsJoinerTwo, new LeftJoin());
Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE);
result = new Discard(result, f1DecidingFactorRhs);
result = new Discard(result, f2DecidingFactorRhs);
// result Pipe should have expected result
}
}
Option 2
If you want to have default value instead of null/blank, then I would suggest you do the HashJoin
first with default Joiners followed by a function to update tuples with appropriate values. Something like:
Main Class
import cascading.pipe.Each;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.joiner.LeftJoin;
import cascading.tuple.Fields;
public class StackHashJoinTest {
public StackHashJoinTest() {
Fields f1Input = new Fields("F1Input");
Fields f2Input = new Fields("F2Input");
Fields f1Join = new Fields("F1Join");
Fields f2Join = new Fields("F2Join");
Fields functionFields = new Fields("F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output");
// Large Pipe fields :
// F1DecidingFactor F1Input F2DecidingFactor F2Input
Pipe largePipe = new Pipe("large-pipe");
// Small Pipe 1 Fields :
// F1Join F1Result
Pipe rhsOne = new Pipe("small-pipe-1");
// Small Pipe 2 Fields :
// F2Join F2Result
Pipe rhsTwo = new Pipe("small-pipe-2");
// Joining first small pipe.
// Expected fields after join:
// F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result
Pipe resultsOne = new HashJoin(largePipe, f1Input, rhsOne, f1Join, new LeftJoin());
// Joining second small pipe.
// Expected fields after join:
// F1DecidingFactor F1Input F2DecidingFactor F2Input F1Join F1Result F2Join F2Result
Pipe resultsTwo = new HashJoin(resultsOne, f2Input, rhsTwo, f2Join, new LeftJoin());
Pipe result = new Each(resultsTwo, functionFields, new TestFunction(), Fields.REPLACE);
// result Pipe should have expected result
}
}
Update Function
import cascading.flow.FlowProcess;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
public class TestFunction extends BaseOperation<Void> implements Function<Void> {
private static final long serialVersionUID = 1L;
private static final String DECIDING_FACTOR = "No";
private static final String DEFAULT_VALUE = "N/A";
// Expected Fields: "F1DecidingFactor", "F1Output", "F2DecidingFactor", "F2Output"
public TestFunction() {
super(Fields.ARGS);
}
@Override
public void operate(@SuppressWarnings("rawtypes") FlowProcess process, FunctionCall<Void> call) {
TupleEntry arguments = call.getArguments();
TupleEntry result = new TupleEntry(arguments);
if (result.getString("F1DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {
result.setString("F1Output", DEFAULT_VALUE);
}
if (result.getString("F2DecidingFactor").equalsIgnoreCase(DECIDING_FACTOR)) {
result.setString("F2Output", DEFAULT_VALUE);
}
call.getOutputCollector().add(result);
}
}
References
Insert Function
Custom Function
HashJoin
This should solve your problem. Let me know if this helps.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With