Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Spark Exception when converting a MySQL table to parquet

I'm trying to convert a MySQL remote table to a parquet file using spark 1.6.2.

The process runs for 10 minutes, filling up memory, than starts with these messages:

WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@dac44da,BlockManagerId(driver, localhost, 46158))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval

at the end fails with this error:

ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriverActorSystem-scheduler-1] shutting down ActorSystem [sparkDriverActorSystem]
java.lang.OutOfMemoryError: GC overhead limit exceeded

I'm running it in a spark-shell with these commands:

spark-shell --packages mysql:mysql-connector-java:5.1.26 org.slf4j:slf4j-simple:1.7.21 --driver-memory 12G

val dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://.../table").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "...").option("user", "...").option("password", "...").load()


I have limits to the max executor memory to 12G. Is there a way to force writing the parquet file in "small" chunks freeing memory?

like image 859
Marco Fedele Avatar asked Oct 27 '16 17:10

Marco Fedele

People also ask

Does Spark support parquet?

Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data. When reading Parquet files, all columns are automatically converted to be nullable for compatibility reasons.

Why is parquet better for Spark?

Parquet has higher execution speed compared to other standard file formats like Avro,JSON etc and it also consumes less disk space in compare to AVRO and JSON.

Does parquet support schema evolution?

Schema evolution is supported by many frameworks or data serialization systems such as Avro, Orc, Protocol Buffer and Parquet.

1 Answers

It seemed like the problem was that you had no partition defined when you read your data with the jdbc connector.

Reading from JDBC isn't distributed by default, so to enable distribution you have to set manual partitioning. You need a column which is a good partitioning key and you have to know distribution up front.

This is what your data looks like apparently :

|-- id: long (nullable = false) 
|-- order_year: string (nullable = false) 
|-- order_number: string (nullable = false) 
|-- row_number: integer (nullable = false) 
|-- product_code: string (nullable = false) 
|-- name: string (nullable = false) 
|-- quantity: integer (nullable = false) 
|-- price: double (nullable = false) 
|-- price_vat: double (nullable = false) 
|-- created_at: timestamp (nullable = true) 
|-- updated_at: timestamp (nullable = true)

order_year seemed like a good candidate to me. (you seem to have ~20 years according to your comments)

import org.apache.spark.sql.SQLContext

val sqlContext: SQLContext = ???

val driver: String = ???
val connectionUrl: String = ???
val query: String = ???
val userName: String = ???
val password: String = ???

// Manual partitioning
val partitionColumn: String = "order_year"

val options: Map[String, String] = Map("driver" -> driver,
  "url" -> connectionUrl,
  "dbtable" -> query,
  "user" -> userName,
  "password" -> password,
  "partitionColumn" -> partitionColumn,
  "lowerBound" -> "0",
  "upperBound" -> "3000",
  "numPartitions" -> "300"

val df = sqlContext.read.format("jdbc").options(options).load()

PS: partitionColumn, lowerBound, upperBound, numPartitions: These options must all be specified if any of them is specified.

Now you can save your DataFrame to parquet.

like image 132
eliasah Avatar answered Sep 20 '22 13:09
