Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reading large file in Spark issue - python

I have spark installed in local, with python, and when running the following code:

data=sc.textFile('C:\\Users\\xxxx\\Desktop\\train.csv')
data.first()

I get the following error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-11-fca93c6aedeb> in <module>()
----> 1 data.first()

C:\Spark\python\pyspark\rdd.pyc in first(self)
   1313         ValueError: RDD is empty
   1314         """
-> 1315         rs = self.take(1)
   1316         if rs:
   1317             return rs[0]

C:\Spark\python\pyspark\rdd.pyc in take(self, num)
   1295 
   1296             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1297             res = self.context.runJob(self, takeUpToNumLeft, p)
   1298 
   1299             items += res

C:\Spark\python\pyspark\context.pyc in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    937         # SparkContext#runJob.
    938         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 939         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
    940         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
    941 

C:\Anaconda2\lib\site-packages\py4j\java_gateway.pyc in __call__(self, *args)
   1024         answer = self.gateway_client.send_command(command)
   1025         return_value = get_return_value(
-> 1026             answer, self.gateway_client, self.target_id, self.name)
   1027 
   1028         for temp_arg in temp_args:

C:\Spark\python\pyspark\sql\utils.pyc in deco(*a, **kw)
     43     def deco(*a, **kw):
     44         try:
---> 45             return f(*a, **kw)
     46         except py4j.protocol.Py4JJavaError as e:
     47             s = e.java_exception.toString()

C:\Anaconda2\lib\site-packages\py4j\protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
    314                 raise Py4JJavaError(
    315                     "An error occurred while calling {0}{1}{2}.\n".
--> 316                     format(target_id, ".", name), value)
    317             else:
    318                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.net.SocketException: Connection reset by peer: socket write error
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(Unknown Source)
    at java.net.SocketOutputStream.write(Unknown Source)
    at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
    at java.io.BufferedOutputStream.write(Unknown Source)
    at java.io.DataOutputStream.write(Unknown Source)
    at java.io.FilterOutputStream.write(Unknown Source)
    at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:393)
    at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketException: Connection reset by peer: socket write error
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(Unknown Source)
    at java.net.SocketOutputStream.write(Unknown Source)
    at java.io.BufferedOutputStream.flushBuffer(Unknown Source)
    at java.io.BufferedOutputStream.write(Unknown Source)
    at java.io.DataOutputStream.write(Unknown Source)
    at java.io.FilterOutputStream.write(Unknown Source)
    at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:622)
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:442)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1765)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

I am sure the path is correct, as I have tried with other files in the same folder. I think the issue is with the size of the file, which is around 3.4 gigabytes.

Any help, please?

like image 276
Escachator Avatar asked Jun 29 '16 17:06

Escachator


People also ask

Can we read 50 GB of data into Spark at a time is it possible?

No. Spark's operators spill data to disk if it does not fit in memory, allowing it to run well on any sized data. Likewise, cached datasets that do not fit in memory are either spilled to disk or recomputed on the fly when needed, as determined by the RDD's storage level.

How do I read large CSV files in PySpark?

1. PySpark Read CSV File into DataFrame. Using csv("path") or format("csv"). load("path") of DataFrameReader, you can read a CSV file into a PySpark DataFrame, These methods take a file path to read from as an argument.


1 Answers

Whether you are using Spark in the standalone mode or the cluster mode, the spark.driver.memory and spark.executor.memory defaults to 1GB of memory. You can add more memory to both the driver and executors by changing this configuration while starting your Jupyter notebook or in the Spark Conf file. With this you should be able to read the 3.4GB CSV file, provided you have the necessary RAM on your machine.

like image 106
KartikKannapur Avatar answered Oct 09 '22 11:10

KartikKannapur