Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to efficiently update Impala tables whose files are modified very frequently

We have a Hadoop-based solution (CDH 5.15) where we are getting new files in HDFS in some directories. On top os those directories we have 4-5 Impala (2.1) tables. The process writing those files in HDFS is Spark Structured Streaming (2.3.1)

Right now, we are running some DDL queries as soon as we get the files written to HDFS:

  • ALTER TABLE table1 RECOVER PARTITONS to detect new partitions (and their HDFS directories and files) added to the table.

  • REFRESH table1 PARTITIONS (partition1=X, partition2=Y), using all the keys for each partition.

Right now, this DDL is taking a bit too long and they are getting queued in our system, damaging the data availability of the system.

So, my question is: Is there a way to do this data incorporation more efficiently?

We have considered:

  • Using the ALTER TABLE .. RECOVER PARTITONS but as per the documentation, it only refreshes new partitions.

  • Tried to use REFRESH .. PARTITON ... with multiple partitions at once, but the statement syntaxis does not allow to do that.

  • Tried batching the queries but the Hive JDBC drives does not support batching queries.

  • Shall we try to do those updates in parallel given that the system is already busy?

  • Any other way you are aware of?

Thanks!

Victor

Note: The way in which we know what partitions need refreshed is by using HDFS events as with Spark Structured Streaming we don´t know exactly when the files are written.

Note #2: Also, the files written in HDFS are sometimes small, so it would be great if it could be possible to merge those files at the same time.

like image 563
Victor Avatar asked Feb 06 '20 08:02

Victor


People also ask

How do I copy data from one table to another in Impala?

Impala currently supports: Copy data from another table using SELECT query. In Impala 1.2. 1 and higher, you can combine CREATE TABLE and INSERT operations into a single step with the CREATE TABLE AS SELECT syntax, which bypasses the actual INSERT keyword.

How do you drop the partition on an Impala table?

If you want to use \ as the escape character, specify the clause in impala-shell as ESCAPED BY '\\' . To add or drop partitions for a table, the table must already be partitioned (that is, created with a PARTITIONED BY clause).

Can Impala query Hive tables?

Thus, Impala can access tables defined or loaded by Hive, as long as all columns use Impala-supported data types, file formats, and compression codecs. The initial focus on query features and performance means that Impala can read more types of data with the SELECT statement than it can write with the INSERT statement.


1 Answers

Since nobody seems to have the answer for my problem, I would like to share the approach we took to make this processing more efficient, comments are very welcome.

We discovered (doc. is not very clear on this) that some of the information stored in the Spark "checkpoints" in HDFS is a number of metadata files describing when each Parquet file was written and how big was it:

$hdfs dfs -ls -h hdfs://...../my_spark_job/_spark_metadata

w-r--r--   3 hdfs 68K   2020-02-26 20:49 hdfs://...../my_spark_job/_spark_metadata/3248
rw-r--r--  3 hdfs 33.3M 2020-02-26 20:53 hdfs://...../my_spark_job/_spark_metadata/3249.compact
w-r--r--   3 hdfs 68K   2020-02-26 20:54 hdfs://...../my_spark_job/_spark_metadata/3250
...

$hdfs dfs -cat hdfs://...../my_spark_job/_spark_metadata/3250
v1
{"path":"hdfs://.../my_spark_job/../part-00004.c000.snappy.parquet","size":9866555,"isDir":false,"modificationTime":1582750862638,"blockReplication":3,"blockSize":134217728,"action":"add"}
{"path":"hdfs://.../my_spark_job/../part-00004.c001.snappy.parquet","size":526513,"isDir":false,"modificationTime":1582750862834,"blockReplication":3,"blockSize":134217728,"action":"add"}
...

So, what we did was:

  • Build a Spark Streaming Job polling that _spark_metadata folder.
    • We use a fileStream since it allow us to define the file filter to use.
    • Each entry in that stream is one of those JSON lines, which is parsed to extract the file path and size.
  • Group the files by the parent folder (which maps to each Impala partition) they belong to.
  • For each folder:
    • Read a dataframe loading only the targeted Parquet files (to avoid race conditions with the other job writing the files)
    • Calculate how many blocks to write (using the size field in the JSON and a target block size)
    • Coalesce the dataframe to the desired number of partitions and write it back to HDFS
    • Execute the DDL REFRESH TABLE myTable PARTITION ([partition keys derived from the new folder]
  • Finally, delete the source files

What we achieved is:

  • Limit the DDLs, by doing one refresh per partition and batch.

  • By having batch time and block size configurable, we are able to adapt our product to different deployment scenarios with bigger or smaller datasets.

  • The solution is quite flexible, since we can assign more or less resources to the Spark Streaming job (executors, cores, memory, etc.) and also we can start/stop it (using its own checkpointing system).

  • We are also studying the possibily of applying some data repartitioning, while doing this process, to have partitions as close as possible to the most optimum size.

like image 84
Victor Avatar answered Sep 27 '22 15:09

Victor