I have a CSV file with currency exchanges EUR-USD. The file was downloaded from the Bank of Canada. I downloaded the CSV with data since Oct 10th, 2013 onwards.
There are, nevertheless, gaps in the data, ie. days without the conversion rates.
I've been fighting (1st day with Spoon Kettle) to find out a simple (but general) way to fill the gaps, say, with the last non-null value. And the only way I've managed to accomplish this is by chaining 4 "Get previous row fields" and the using the NVL in a Calculator to take the first non-null value. But that only works if gaps are not bigger than 4 rows in a stream.
The image represents the transformation:
My first question reduces to: Is there a general way to do interpolation/extrapolation in a stream with gaps?
I tried to use the "Modified JavaScript Value" but the API still escapes me. Moreover, it seems that this step only have the Map part of a MapReduce combo, I'd probably need both.
So, my second question is: Is there a way to program a MapReduce combo in a language that is not Java (Scala, Clojure, Jython or JS)?
You can use a combination of the following three steps:
1) Analytical query - allows you to fetch the value of a field N rows before or after the current row; In your case, you will want to fetch the date 1 row ahead (the next available date)
2) Calculator - having determined the previous date for the row, use it to calculate Days between dates;
3) Calculate a field number_of_clones as dbd-1 (the number of days missing;
4) Use that field on the Clone Rows step to multiple a row as many times as necessary; Add a clone_number field
5) Add the clone_number as days to the date and you get the day it refers to.
Moreover, the Analytical query step allows you to specify a field as the "group by" field, so that if you have x-rates for USD and then you have x-rates for GBP, the final USD x-rate day will retrieve null as the next value.
Here's a sample KTR file:
The data grid step generates a few rows with some data gaps in there:
The Analytical query fetches the next date, for the same currency value
Then the calculator step calculates how many rows are missing. Note that the last day of each currency will have null
as value, so we need to tweak that and use 0 instead (NVL(A,B) returns B if A is null, A otherwise)
Clone rows: takes a row and creates copies.
The clone_number field allows us to calculate the actual date the row refers to
Finally, here's the data. The fields you want are the new_date, currency and exchange_rate. Use a select values to re-order the field list and get rid of those you don't need anymore.
As you can see, now we have data for 2014-01-03 and 2014-01-04, using the previous known value.
Although this is not exactly what you asked for you can achieve your goal by using the User Defined Java Class component with a generic functionality. Replace your steps Get previous row fields
through Non-values in row
by a single instance of this component. In the section Classes - Processor of this component insert the following code:
Object[] previousRow;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
// First, get a row from the default input hop
Object[] r = getRow();
// If the row object is null, we are done processing.
if (r == null) {
setOutputDone();
return false;
}
// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
// enough to handle any new fields you are creating in this step.
Object[] outputRow = createOutputRow(r, data.outputRowMeta.size());
// copy all input fields to the output fields
for (int i=0; i < getInputRowMeta().size(); i++) {
logBasic(data.inputRowMeta.getString(r, i));
if (data.inputRowMeta.getString(r, i) == null && (previousRow != null)) {
// if the current field is empty take it from the previous row
outputRow[i] = previousRow[i];
}
else {
// otherwise use the current row
outputRow[i] = r[i];
}
}
putRow(data.outputRowMeta, outputRow);
// store the current row as future previous row
previousRow = data.outputRowMeta.cloneRow(outputRow);
return true;
}
The Janino class always keeps a copy of the previous row to fill up the empty fields of the current row.
The following test setup demonstrates the use of the component. In the simplest case we process a stream read from a CSV file:
The input file is configured as follows:
and contains the following data
NUMBER;STRING;DATE;CURRENCY
1;A;01.02.2014;12,5
2;B;;13,5
;;03.12.2001;
4;;;
5;C;;
6;;20.03.2005;18,2
7;D;;
The configuration of the User Defined Java class component is as follows:
The output text file contains the enhanced rows "without gaps":
NUMBER;STRING;DATE;CURRENCY
1;A;01.02.2014; 012,50
2;B;01.02.2014; 013,50
2;B;03.12.2001; 013,50
4;B;03.12.2001; 013,50
5;C;03.12.2001; 013,50
6;C;20.03.2005; 018,20
7;D;20.03.2005; 018,20
Note:
null
. Strings just containing blanks might break it, so make sure that you trim all the strings before piping them into the component.The code was written by using http://wiki.pentaho.com/display/EAI/User+Defined+Java+Class as a tutorial.
ADDENDUM
The link provided by @manu contains the followng code. It contains a specific handling of the numeric formats. Note this it is not fully generic anymore.
Object[] previousRow;
RowMetaInterface outputMeta;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
// First, get a row from the default input hop
Object[] r = getRow();
// If the row object is null, we are done processing.
if (r == null) {
setOutputDone();
return false;
}
if (outputMeta == null) {
outputMeta = data.outputRowMeta.clone();
for(int i=0; i < outputMeta.size(); i++) {
ValueMetaInterface meta = outputMeta.getValueMeta(i);
if (meta.getType() == ValueMetaInterface.TYPE_NUMBER) {
meta.setPrecision(4);
meta.setConversionMask("#.####");
}
}
}
// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
// enough to handle any new fields you are creating in this step.
Object[] outputRow = createOutputRow(r, data.outputRowMeta.size());
// copy all input fields to the output fields
for (int i=0; i < getInputRowMeta().size(); i++) {
if ((r[i] == null) && (previousRow != null)) {
// if the current field is empty take it from the previous row
outputRow[i] = previousRow[i];
}
else {
// otherwise use the current row
outputRow[i] = r[i];
}
}
putRow(outputMeta, outputRow);
// store the current row as future previous row
previousRow = outputMeta.cloneRow(outputRow);
return true;
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With