Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: Reading big MySQL table into DataFrame fails

I'd like to tell in advance that several related questions, like the following, DO NOT address my problems:

  • Spark query running very slow
  • Converting mysql table to dataset is very slow...
  • Spark Will Not Load Large MySql Table
  • Spark MySQL Error while Reading from Database

This one comes close but the stack-trace is different and it is unresolved anyways. So rest assured that I'm posting this question after several days of (failed) solution-hunting.


I'm trying to write a job that moves data (once a day) from MySQL tables to Hive tables stored as Parquet / ORC files on Amazon S3. Some of the tables are quite big: ~ 300M records with 200 GB+ size (as reported by phpMyAdmin).

Currently we are using sqoop for this but we want to move to Spark for the following reasons:

  • To leverage it's capabilities with DataFrame API (in future, we would be performing transformations while moving data)
  • We already have a sizeable framework written in Scala for Spark jobs used elsewhere in the organization

I've been able to achieve this on small MySQL tables without any issue. But the Spark job (that reads data from MySQL into DataFrame) fails if I try to fetch more than ~1.5-2M records at a time. I've shown the relevant portions of stack-trace below, you can find the complete stack-trace here.

...
javax.servlet.ServletException: java.util.NoSuchElementException: None.get
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427)
...
Caused by: java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
...
org.apache.spark.status.api.v1.OneStageResource.taskSummary(OneStageResource.scala:62)
    at sun.reflect.GeneratedMethodAccessor188.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
...
[Stage 27:>                                                       (0 + 30) / 32]18/03/01 01:29:09 WARN TaskSetManager: Lost task 3.0 in stage 27.0 (TID 92, ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal, executor 6): java.sql.SQLException: Incorrect key file for table '/rdsdbdata/tmp/#sql_14ae_5.MYI'; try to repair it
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3973)
...

** This stack-trace was obtained upon failure of moving a 148 GB table containing 186M records

As apparent from (full) stack-trace, the Spark read job starts sulking with the false warnings of None.get error followed by SQLException: Incorrect key for file.. (which is related to MySQL's tmp table becoming full)


Now clearly this can't be a MySQL problem because in that case sqoop should fail as well. As far as Spark is concerned, I'm parallelizing the read operation by setting numPartitions = 32 (we use parallelism of 40 with sqoop).

From my limited knowledge of Spark and BigData, 148 GB shouldn't be overwhelming for Spark by any measure. Moreover since MySQL, Spark (EMR) and S3 all reside in same region (AWS AP-SouthEast), so latency shouldn't be the bottleneck.


My questions are:

  1. Is Spark a suitable tool for this?
  2. Could Spark's Jdbc driver be blamed for this issue?
  3. If answer to above question is
    • Yes: How can I overcome it? (alternate driver, or some other workaround)?
    • No: What could be the possible cause?

Framework Configurations:

  • Hadoop distribution: Amazon 2.8.3
  • Spark 2.2.1
  • Hive 2.3.2
  • Scala 2.11.11

EMR Configurations:

  • EMR 5.12.0
  • 1 Master: r3.xlarge [8 vCore, 30.5 GiB memory, 80 SSD GB storage EBS Storage:32 GiB]
  • 1 Task: r3.xlarge [8 vCore, 30.5 GiB memory, 80 SSD GB storage EBS Storage:none]
  • 1 Core: r3.xlarge [8 vCore, 30.5 GiB memory, 80 SSD GB storage EBS Storage:32 GiB]

** These are the configurations of development cluster; production cluster would be better equipped

like image 448
y2k-shubham Avatar asked Mar 07 '18 10:03

y2k-shubham


People also ask

How do you connect a large table in Spark?

Spark uses SortMerge joins to join large table. It consists of hashing each row on both table and shuffle the rows with the same hash into the same partition. There the keys are sorted on both side and the sortMerge algorithm is applied. That's the best approach as far as I know.

How do I check my Spark data frame size?

Similar to Python Pandas you can get the Size and Shape of the PySpark (Spark with Python) DataFrame by running count() action to get the number of rows on DataFrame and len(df. columns()) to get the number of columns.

Are Spark Dataframes columnar?

DataSet and DataFrame evolved where data is stored in row-based format. Spark 2. x: Support for Vectorized Parquet which is columnar in-memory data is added.

Is Spark DataFrame stored in-memory?

Spark DataFrame or Dataset cache() method by default saves it to storage level ` MEMORY_AND_DISK ` because recomputing the in-memory columnar representation of the underlying table is expensive. Note that this is different from the default cache level of ` RDD.


1 Answers

Spark JDBC API seem to fork to load all data from MySQL table to memory without. So when you try to load a big table, what you should do is use Spark API clone data to HDFS first (JSON should be used to keep schema structure), like this:

spark.read.jdbc(jdbcUrl, tableName, prop)
       .write()
       .json("/fileName.json");

Then you can working on HDFS instead normally.

spark.read().json("/fileName.json")
       .createOrReplaceTempView(tableName);
like image 191
Phung Manh Cuong Avatar answered Oct 21 '22 02:10

Phung Manh Cuong