I'd like to tell in advance that several related questions, like the following, DO NOT address my problems:
This one comes close but the stack-trace is different and it is unresolved anyways. So rest assured that I'm posting this question after several days of (failed) solution-hunting.
I'm trying to write a job that moves data (once a day) from MySQL
tables to Hive
tables stored as Parquet
/ ORC
files on Amazon S3
. Some of the tables are quite big: ~ 300M records with 200 GB+ size (as reported by phpMyAdmin
).
Currently we are using sqoop
for this but we want to move to Spark
for the following reasons:
DataFrame API
(in future, we would be performing transformations while moving data)Scala
for Spark
jobs used elsewhere in the organizationI've been able to achieve this on small MySQL
tables without any issue. But the Spark
job (that reads data from MySQL
into DataFrame
) fails if I try to fetch more than ~1.5-2M records at a time. I've shown the relevant portions of stack-trace below, you can find the complete stack-trace here.
...
javax.servlet.ServletException: java.util.NoSuchElementException: None.get
at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489)
at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427)
...
Caused by: java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
...
org.apache.spark.status.api.v1.OneStageResource.taskSummary(OneStageResource.scala:62)
at sun.reflect.GeneratedMethodAccessor188.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
...
[Stage 27:> (0 + 30) / 32]18/03/01 01:29:09 WARN TaskSetManager: Lost task 3.0 in stage 27.0 (TID 92, ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal, executor 6): java.sql.SQLException: Incorrect key file for table '/rdsdbdata/tmp/#sql_14ae_5.MYI'; try to repair it
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3973)
...
** This stack-trace was obtained upon failure of moving a 148 GB table containing 186M records
As apparent from (full) stack-trace, the Spark
read job starts sulking with the false warnings of None.get
error followed by SQLException: Incorrect key for file..
(which is related to MySQL
's tmp table becoming full)
Now clearly this can't be a MySQL
problem because in that case sqoop
should fail as well. As far as Spark
is concerned, I'm parallelizing the read operation by setting numPartitions = 32
(we use parallelism of 40 with sqoop
).
From my limited knowledge of Spark
and BigData
, 148 GB shouldn't be overwhelming for Spark by any measure. Moreover since MySQL
, Spark
(EMR
) and S3
all reside in same region (AWS
AP-SouthEast
), so latency shouldn't be the bottleneck.
My questions are:
Spark
a suitable tool for this?Spark
's Jdbc
driver be blamed for this issue?Framework Configurations:
Hadoop
distribution: Amazon 2.8.3
Spark
2.2.1
Hive
2.3.2
Scala
2.11.11
EMR
Configurations:
EMR
5.12.0
1 Master
: r3.xlarge [8 vCore, 30.5 GiB memory, 80 SSD GB storage EBS Storage:32 GiB]1 Task
: r3.xlarge [8 vCore, 30.5 GiB memory, 80 SSD GB storage EBS Storage:none]1 Core
: r3.xlarge [8 vCore, 30.5 GiB memory, 80 SSD GB storage
EBS Storage:32 GiB]** These are the configurations of development cluster; production cluster would be better equipped
Spark uses SortMerge joins to join large table. It consists of hashing each row on both table and shuffle the rows with the same hash into the same partition. There the keys are sorted on both side and the sortMerge algorithm is applied. That's the best approach as far as I know.
Similar to Python Pandas you can get the Size and Shape of the PySpark (Spark with Python) DataFrame by running count() action to get the number of rows on DataFrame and len(df. columns()) to get the number of columns.
DataSet and DataFrame evolved where data is stored in row-based format. Spark 2. x: Support for Vectorized Parquet which is columnar in-memory data is added.
Spark DataFrame or Dataset cache() method by default saves it to storage level ` MEMORY_AND_DISK ` because recomputing the in-memory columnar representation of the underlying table is expensive. Note that this is different from the default cache level of ` RDD.
Spark JDBC API seem to fork to load all data from MySQL table to memory without. So when you try to load a big table, what you should do is use Spark API clone data to HDFS first (JSON should be used to keep schema structure), like this:
spark.read.jdbc(jdbcUrl, tableName, prop)
.write()
.json("/fileName.json");
Then you can working on HDFS instead normally.
spark.read().json("/fileName.json")
.createOrReplaceTempView(tableName);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With