I'm trying to convert a MySQL remote table to a parquet file using spark 1.6.2.
The process runs for 10 minutes, filling up memory, than starts with these messages:
WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@dac44da,BlockManagerId(driver, localhost, 46158))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
at the end fails with this error:
ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriverActorSystem-scheduler-1] shutting down ActorSystem [sparkDriverActorSystem]
java.lang.OutOfMemoryError: GC overhead limit exceeded
I'm running it in a spark-shell with these commands:
spark-shell --packages mysql:mysql-connector-java:5.1.26 org.slf4j:slf4j-simple:1.7.21 --driver-memory 12G
val dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://.../table").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "...").option("user", "...").option("password", "...").load()
dataframe_mysql.saveAsParquetFile("name.parquet")
I have limits to the max executor memory to 12G. Is there a way to force writing the parquet file in "small" chunks freeing memory?
Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data. When reading Parquet files, all columns are automatically converted to be nullable for compatibility reasons.
Parquet has higher execution speed compared to other standard file formats like Avro,JSON etc and it also consumes less disk space in compare to AVRO and JSON.
Schema evolution is supported by many frameworks or data serialization systems such as Avro, Orc, Protocol Buffer and Parquet.
It seemed like the problem was that you had no partition defined when you read your data with the jdbc connector.
Reading from JDBC isn't distributed by default, so to enable distribution you have to set manual partitioning. You need a column which is a good partitioning key and you have to know distribution up front.
This is what your data looks like apparently :
root
|-- id: long (nullable = false)
|-- order_year: string (nullable = false)
|-- order_number: string (nullable = false)
|-- row_number: integer (nullable = false)
|-- product_code: string (nullable = false)
|-- name: string (nullable = false)
|-- quantity: integer (nullable = false)
|-- price: double (nullable = false)
|-- price_vat: double (nullable = false)
|-- created_at: timestamp (nullable = true)
|-- updated_at: timestamp (nullable = true)
order_year
seemed like a good candidate to me. (you seem to have ~20 years according to your comments)
import org.apache.spark.sql.SQLContext
val sqlContext: SQLContext = ???
val driver: String = ???
val connectionUrl: String = ???
val query: String = ???
val userName: String = ???
val password: String = ???
// Manual partitioning
val partitionColumn: String = "order_year"
val options: Map[String, String] = Map("driver" -> driver,
"url" -> connectionUrl,
"dbtable" -> query,
"user" -> userName,
"password" -> password,
"partitionColumn" -> partitionColumn,
"lowerBound" -> "0",
"upperBound" -> "3000",
"numPartitions" -> "300"
)
val df = sqlContext.read.format("jdbc").options(options).load()
PS: partitionColumn
, lowerBound
, upperBound
, numPartitions
:
These options must all be specified if any of them is specified.
Now you can save your DataFrame
to parquet.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With