Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Filling data gaps in a stream in Pentaho Data Integration, is it possible?

Tags:

pentaho

kettle

I have a CSV file with currency exchanges EUR-USD. The file was downloaded from the Bank of Canada. I downloaded the CSV with data since Oct 10th, 2013 onwards.

There are, nevertheless, gaps in the data, ie. days without the conversion rates.

I've been fighting (1st day with Spoon Kettle) to find out a simple (but general) way to fill the gaps, say, with the last non-null value. And the only way I've managed to accomplish this is by chaining 4 "Get previous row fields" and the using the NVL in a Calculator to take the first non-null value. But that only works if gaps are not bigger than 4 rows in a stream.

The image represents the transformation:

Painfully filling the gaps

My first question reduces to: Is there a general way to do interpolation/extrapolation in a stream with gaps?

I tried to use the "Modified JavaScript Value" but the API still escapes me. Moreover, it seems that this step only have the Map part of a MapReduce combo, I'd probably need both.

So, my second question is: Is there a way to program a MapReduce combo in a language that is not Java (Scala, Clojure, Jython or JS)?

like image 907
manu Avatar asked Dec 26 '22 04:12

manu


2 Answers

You can use a combination of the following three steps:

1) Analytical query - allows you to fetch the value of a field N rows before or after the current row; In your case, you will want to fetch the date 1 row ahead (the next available date)

2) Calculator - having determined the previous date for the row, use it to calculate Days between dates;

3) Calculate a field number_of_clones as dbd-1 (the number of days missing;

4) Use that field on the Clone Rows step to multiple a row as many times as necessary; Add a clone_number field

5) Add the clone_number as days to the date and you get the day it refers to.

Moreover, the Analytical query step allows you to specify a field as the "group by" field, so that if you have x-rates for USD and then you have x-rates for GBP, the final USD x-rate day will retrieve null as the next value.

Here's a sample KTR file:

Transformation to fill in the gaps

The data grid step generates a few rows with some data gaps in there:

some days are missing

The Analytical query fetches the next date, for the same currency value

enter image description here

Then the calculator step calculates how many rows are missing. Note that the last day of each currency will have null as value, so we need to tweak that and use 0 instead (NVL(A,B) returns B if A is null, A otherwise) enter image description here

Clone rows: takes a row and creates copies. enter image description here

The clone_number field allows us to calculate the actual date the row refers to enter image description here

Finally, here's the data. The fields you want are the new_date, currency and exchange_rate. Use a select values to re-order the field list and get rid of those you don't need anymore. enter image description here

As you can see, now we have data for 2014-01-03 and 2014-01-04, using the previous known value.

like image 123
nsousa Avatar answered May 14 '23 08:05

nsousa


Although this is not exactly what you asked for you can achieve your goal by using the User Defined Java Class component with a generic functionality. Replace your steps Get previous row fields through Non-values in row by a single instance of this component. In the section Classes - Processor of this component insert the following code:

Object[] previousRow;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
    // First, get a row from the default input hop
    Object[] r = getRow();

    // If the row object is null, we are done processing.
    if (r == null) {
      setOutputDone();
      return false;
    }

    // It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
    // enough to handle any new fields you are creating in this step.
    Object[] outputRow = createOutputRow(r, data.outputRowMeta.size());

    // copy all input fields to the output fields

    for (int i=0; i < getInputRowMeta().size(); i++) {
        logBasic(data.inputRowMeta.getString(r, i));
        if (data.inputRowMeta.getString(r, i) == null && (previousRow != null))  {
            // if the current field is empty take it from the previous row
            outputRow[i] = previousRow[i];
        }   
        else {
            // otherwise use the current row
            outputRow[i] = r[i];
        }

    }

    putRow(data.outputRowMeta, outputRow);
    // store the current row as future previous row
    previousRow = data.outputRowMeta.cloneRow(outputRow);

    return true;
}

The Janino class always keeps a copy of the previous row to fill up the empty fields of the current row.

The following test setup demonstrates the use of the component. In the simplest case we process a stream read from a CSV file:

simple test transformation

The input file is configured as follows:

configuration of CSV input file

and contains the following data

NUMBER;STRING;DATE;CURRENCY
1;A;01.02.2014;12,5
2;B;;13,5
;;03.12.2001;
4;;;
5;C;;
6;;20.03.2005;18,2
7;D;;

The configuration of the User Defined Java class component is as follows:

configuration of User Defined Java class component

The output text file contains the enhanced rows "without gaps":

NUMBER;STRING;DATE;CURRENCY
1;A;01.02.2014; 012,50
2;B;01.02.2014; 013,50
2;B;03.12.2001; 013,50
4;B;03.12.2001; 013,50
5;C;03.12.2001; 013,50
6;C;20.03.2005; 018,20
7;D;20.03.2005; 018,20

Note:

  • The component was test for these four data types but in priciple it should work for all.
  • It's independent of the actual number of fields.
  • Once a field is filled it can never be "unfilled" which is fine for your setup (I guess) but this may not be applicable for other setups.
  • The mechanism only works if a field is null. Strings just containing blanks might break it, so make sure that you trim all the strings before piping them into the component.

The code was written by using http://wiki.pentaho.com/display/EAI/User+Defined+Java+Class as a tutorial.

ADDENDUM

The link provided by @manu contains the followng code. It contains a specific handling of the numeric formats. Note this it is not fully generic anymore.

Object[] previousRow;
RowMetaInterface outputMeta;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
    // First, get a row from the default input hop
    Object[] r = getRow();

    // If the row object is null, we are done processing.
    if (r == null) {
        setOutputDone();
        return false;
    }

    if (outputMeta == null) {
        outputMeta = data.outputRowMeta.clone();
    for(int i=0; i < outputMeta.size(); i++) {
        ValueMetaInterface meta = outputMeta.getValueMeta(i);
        if (meta.getType() == ValueMetaInterface.TYPE_NUMBER) {
            meta.setPrecision(4);
            meta.setConversionMask("#.####");
        }
    }
}

// It is always safest to call createOutputRow() to ensure that your output row's Object[] is large
// enough to handle any new fields you are creating in this step.
Object[] outputRow = createOutputRow(r, data.outputRowMeta.size());

// copy all input fields to the output fields

for (int i=0; i < getInputRowMeta().size(); i++) {
    if ((r[i] == null) && (previousRow != null)) {
        // if the current field is empty take it from the previous row
        outputRow[i] = previousRow[i];
    }
    else {
        // otherwise use the current row
        outputRow[i] = r[i];
    }     
}

putRow(outputMeta, outputRow);
// store the current row as future previous row
previousRow = outputMeta.cloneRow(outputRow);

return true;
}
like image 37
Marcus Rickert Avatar answered May 14 '23 07:05

Marcus Rickert