Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to do a aggregation transformation in a kiba etl script (kiba gem)?

Tags:

ruby

etl

kiba-etl

I want to write a Kiba Etl script which has a source from a CSV to Destination CSV with a list of transformation rules among which the 2nd transformer is an Aggregation in which operation such as select name, sum(euro) group by name

Kiba ETL Script file

source CsvSource, 'users.csv', col_sep: ';', headers: true, header_converters: :symbol

transform VerifyFieldsPresence, [:name, :euro]

transform AggregateFields, { sum: :euro, group_by: :name}

transform RenameField,from: :euro, to: :total_amount

destination CsvDestination, 'result.csv', [:name, :total_amount]

users.csv

date;euro;name
7/3/2015;10;Jack
7/3/2015;85;Jill
8/3/2015;6;Jack
8/3/2015;12;Jill
9/3/2015;99;Mack

result.csv (expected result)

total_amount;name
16;Jack
97;Jill
99;Mack

As etl transformers execute one after the other on a single row at one time, But my 2nd transformer behavior depends on the entire collection of row which I cant access it in the class which is passed to transform method.

transform AggregateFields, { sum: :euro, group_by: :name }

Is there possibly any which this behavior can be achieved using kiba gem
Thank you in Advance

like image 745
Umar Siddiqui Avatar asked Jun 30 '15 18:06

Umar Siddiqui


1 Answers

EDIT: it's 2020, and Kiba ETL v3 includes a much better way to do this. Check out this article https://thibautbarrere.com/2020/03/05/new-in-kiba-etl-v3 for all the relevant information.

Kiba author here! You can achieve that in many different ways, depending mainly on the data size and your actual needs. Here are a couple of possibilities.

Aggregating using a variable in your Kiba script

require 'awesome_print'

transform do |r|
  r[:amount] = BigDecimal.new(r[:amount])
  r
end

total_amounts = Hash.new(0)

transform do |r|
  total_amounts[r[:name]] += r[:amount]
  r
end

post_process do
  # pretty print here, but you could save to a CSV too
  ap total_amounts
end

This is the simplest way, yet this is quite flexible.

It will keep your aggregates in memory though, so this may be good enough or not, depending on your scenario. Note that currently Kiba is mono-threaded (but "Kiba Pro" will be multi-threaded), so there is no need to add a lock or use a thread-safe structure for the aggregate, for now.

Calling TextQL from post_process blocks

Another quick and easy way to aggregate is to generate a non-aggregated CSV file first, then leverage TextQl to actually do the aggregation, like this:

destination CsvSource, 'non-aggregated-output.csv', [:name, :amount]

post_process do
  query = <<SQL
    select
      name,
      /* apparently sqlite has reduced precision, round to 2 for now */
      round(sum(amount), 2) as total_amount
    from tbl group by name
SQL

  textql('non-aggregated-output.csv', query, 'aggregated-output.csv')
end

With the following helpers defined:

def system!(cmd)
  raise "Failed to run command #{command}" unless system(command)
end

def textql(source_file, query, output_file)
  system! "cat #{source_file} | textql -header -output-header=true -sql \"#{query}\" > #{output_file}"
  # this one uses csvfix to pretty print the table
  system! "cat #{output_file} | csvfix ascii_table"
end

Be careful with the precision though when doing computations.

Writing an in-memory aggregating destination

A useful trick that can work here is to wrap a given destination with a class to do the aggregation. Here is how it could look like:

class InMemoryAggregate
  def initialize(sum:, group_by:, destination:)
    @aggregate = Hash.new(0)
    @sum = sum
    @group_by = group_by
    # this relies a bit on the internals of Kiba, but not too much
    @destination = destination.shift.new(*destination)
  end

  def write(row)
    # do not write, but count here instead
    @aggregate[row[@group_by]] += row[@sum]
  end

  def close
    # use close to actually do the writing
    @aggregate.each do |k,v|
      # reformat BigDecimal additions here
      value = '%0.2f' % v
      @destination.write(@group_by => k, @sum => value)
    end
    @destination.close
  end
end

which you can use this way:

# convert your string into an actual number
transform do |r|
  r[:amount] = BigDecimal.new(r[:amount])
  r
end

destination CsvDestination, 'non-aggregated.csv', [:name, :amount]

destination InMemoryAggregate,
  sum: :amount, group_by: :name,
  destination: [
    CsvDestination, 'aggregated.csv', [:name, :amount]
  ]

post_process do
  system!("cat aggregated.csv | csvfix ascii_table")
end

The nice thing about this version is that you can reuse your aggregator with different destinations (like a database one, or anything else).

Note though that this will keep all the aggregates in memory, like the first version.

Inserting into a store with aggregating capabilities

Another way (especially useful if you have very large volumes) is to send the resulting data into something that will be able to aggregate the data for you. It could be a regular SQL database, Redis, or anything more fancy, which you would then be able to query as needed.

So as I said, the implementation will largely depend on your actual needs. Hope you will find something that works for you here!

like image 188
Thibaut Barrère Avatar answered Sep 25 '22 06:09

Thibaut Barrère