I'd like to tell in advance that several related questions, like the following, DO NOT address my problems:
This one comes close but the stack-trace is different and it is unresolved anyways. So rest assured that I'm posting this question after several days of (failed) solution-hunting.
I'm trying to write a job that moves data (once a day) from MySQL tables to Hive tables stored as Parquet / ORC files on Amazon S3. Some of the tables are quite big: ~ 300M records with 200 GB+ size (as reported by phpMyAdmin).
Currently we are using sqoop for this but we want to move to Spark for the following reasons:
DataFrame API (in future, we would be performing transformations while moving data)Scala for Spark jobs used elsewhere in the organizationI've been able to achieve this on small MySQL tables without any issue. But the Spark job (that reads data from MySQL into DataFrame) fails if I try to fetch more than ~1.5-2M records at a time. I've shown the relevant portions of stack-trace below, you can find the complete stack-trace here.
...
javax.servlet.ServletException: java.util.NoSuchElementException: None.get
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:489)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:427)
...
Caused by: java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
...
org.apache.spark.status.api.v1.OneStageResource.taskSummary(OneStageResource.scala:62)
    at sun.reflect.GeneratedMethodAccessor188.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
...
[Stage 27:>                                                       (0 + 30) / 32]18/03/01 01:29:09 WARN TaskSetManager: Lost task 3.0 in stage 27.0 (TID 92, ip-xxx-xx-xx-xxx.ap-southeast-1.compute.internal, executor 6): java.sql.SQLException: Incorrect key file for table '/rdsdbdata/tmp/#sql_14ae_5.MYI'; try to repair it
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:964)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3973)
...
** This stack-trace was obtained upon failure of moving a 148 GB table containing 186M records
As apparent from (full) stack-trace, the Spark read job starts sulking with the false warnings of None.get error followed by SQLException: Incorrect key for file.. (which is related to MySQL's tmp table becoming full)
Now clearly this can't be a MySQL problem because in that case sqoop should fail as well. As far as Spark is concerned, I'm parallelizing the read operation by setting numPartitions = 32 (we use parallelism of 40 with sqoop).
From my limited knowledge of Spark and BigData, 148 GB shouldn't be overwhelming for Spark by any measure. Moreover since MySQL, Spark (EMR) and S3 all reside in same region (AWS AP-SouthEast), so latency shouldn't be the bottleneck.
My questions are:
Spark a suitable tool for this?Spark's Jdbc driver be blamed for this issue?Framework Configurations:
Hadoop distribution: Amazon 2.8.3
Spark 2.2.1
Hive 2.3.2
Scala 2.11.11
EMR Configurations:
EMR 5.12.0
1 Master: r3.xlarge [8 vCore, 30.5 GiB memory, 80 SSD GB storage EBS Storage:32 GiB]1 Task: r3.xlarge [8 vCore, 30.5 GiB memory, 80 SSD GB storage EBS Storage:none]1 Core: r3.xlarge [8 vCore, 30.5 GiB memory, 80 SSD GB storage
EBS Storage:32 GiB]** These are the configurations of development cluster; production cluster would be better equipped
Spark uses SortMerge joins to join large table. It consists of hashing each row on both table and shuffle the rows with the same hash into the same partition. There the keys are sorted on both side and the sortMerge algorithm is applied. That's the best approach as far as I know.
Similar to Python Pandas you can get the Size and Shape of the PySpark (Spark with Python) DataFrame by running count() action to get the number of rows on DataFrame and len(df. columns()) to get the number of columns.
DataSet and DataFrame evolved where data is stored in row-based format. Spark 2. x: Support for Vectorized Parquet which is columnar in-memory data is added.
Spark DataFrame or Dataset cache() method by default saves it to storage level ` MEMORY_AND_DISK ` because recomputing the in-memory columnar representation of the underlying table is expensive. Note that this is different from the default cache level of ` RDD.
Spark JDBC API seem to fork to load all data from MySQL table to memory without. So when you try to load a big table, what you should do is use Spark API clone data to HDFS first (JSON should be used to keep schema structure), like this:
spark.read.jdbc(jdbcUrl, tableName, prop)
       .write()
       .json("/fileName.json");
Then you can working on HDFS instead normally.
spark.read().json("/fileName.json")
       .createOrReplaceTempView(tableName);
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With