Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a data architecture for efficient joins in Spark (a la RedShift)?

I have data that I would like to do a lot of analytic queries on and I'm trying to figure out if there is a mechanism I can use to store it so that Spark can efficiently do joins on it. I have a solution using RedShift, but would ideally prefer to have something that is based on files in S3 instead of having a whole RedShift cluster up 24/7.

Introduction to the data

This is a simplified example. We have 2 initial CSV files.

  • Person records
  • Event records

The two tables are linked via the person_id field. person_id is unique in the Person table. Events have a many-to-one relationship with person.

The goal

I'd like to understand how to set up the data so I can efficiently perform the following query. I will need to perform many queries like this (all queries are evaluated on a per person basis):

The query is to produce a data frame with 4 columns, and 1 row for every person.

  • person_id - person_id for each person in the data set
  • age - "age" field from the person record
  • cost - The sum of the "cost" field for all event records for that person where "date" is during the month of 6/2013

All current solutions I have with Spark to this problem involve reshuffling all the data, which ends up making the process slow for large amounts (hundreds of millions of people). I am happy with a solution that requires me to reshuffle the data and write it to a different format once if that can then speed up later queries.

The solution using RedShift

I can accomplish this solution using RedShift in a fairly straightforward way:

Each both files are loaded in as RedShift tables, with DISTKEY person_id, SORTKEY person_id. This distributes the data so that all the data for a person is on a single node. The following query will produce the desired data frame:

select person_id, age, e.cost from person 
    left join (select person_id, sum(cost) as cost from events 
       where date between '2013-06-01' and '2013-06-30' 
       group by person_id) as e using (person_id)

The solution using Spark/Parquet

I have thought of several potential ways to handle this in Spark, but none accomplishes what I need. My ideas and the issues are listed below:

  • Spark Dataset write 'bucketBy' - Read the CSV files and then rewrite them out as parquet files using "bucketBy". Queries on these parquet files could then be very fast. This would produce a data setup similar to RedShift, but parquet files don't support bucketBy.
  • Spark parquet partitioning - Parquet does support partitioning. Because parquet creates a separate set of files for each partition key, you have to create a computed column to partition on and use a hash of person_id to create the partitionKey. However, when you later join these tables in spark based on "partition_key" and "person_id", the query plan still does a full hash partition. So this approach is no better than just reading the CSVs and shuffling every time.
  • Stored in some other data format besides parquet - I am open to this, but don't know of another data source that will work.
  • Using a compound record format - Parquet supports hierarchical data formats, so can prejoin both tables into a hierarchical record (where a person record has an "events" field which is an array of struct elements) and then do processing on that. When you have a hierarchical record, there are two approaches that to processing it:
    • ** Use explode to create separate records ** - Using this approach you explode array fields into full rows, then use standard data frame operations to do analytics, and then join them back to the main table. Unfortunately, I've been unable to get this approach to efficiently compile queries.
    • ** Use UDFs to perform operations on subrecords ** - This preserves the structure and executes without shuffles, but is an awkward and verbose way to program. Also, it requires lots of UDFs which aren't great for performance (although they beat large scale shuffling of data).

For my use cases, Spark has advantages over RedShift which aren't obvious in this simple example, so I'd prefer to do this with Spark. Please let me know if I am missing something and there is a good approach to this.

like image 466
Dave DeCaprio Avatar asked Oct 29 '22 10:10

Dave DeCaprio


1 Answers

Edited per comment.

Assumptions:

  • Using parquet

Here's what I would try:

val eventAgg = spark.sql("""select person_id, sum(cost) as cost 
                            from events 
                            where date between '2013-06-01' and '2013-06-30' 
                            group by person_id""")
eventAgg.cache.count
val personDF = spark.sql("""SELECT person_id, age from person""")
personDF.cache.count // cache is less important here, so feel free to omit
eventAgg.join(personDF, "person_id", "left")

I just did this with some of my data and here's how it went (9 node/140 vCPUs cluster, ~600GB RAM):

27,000,000,000 "events" (aggregated to 14,331,487 "people")

64,000,000 "people" (~20 columns)

aggregated events building and caching took ~3 min

people caching took ~30 seconds (pulling from network, not parquet)

left joining took several seconds

Not caching the "people" led to the join taking a few seconds longer. Then forcing spark to broadcast the couple hundred MB aggregated events made the join take under 1 second.

like image 161
Garren S Avatar answered Nov 15 '22 02:11

Garren S