I've been spending a fair amount of time reading through some questions with the pyspark and spark-dataframe tags and very often I find that posters don't provide enough information to truly understand their question. I usually comment asking them to post an MCVE but sometimes getting them to show some sample input/output data is like pulling teeth.
Perhaps part of the problem is that people just don't know how to easily create an MCVE for spark-dataframes. I think it would be useful to have a spark-dataframe version of this pandas question as a guide that can be linked.
So how does one go about creating a good, reproducible example?
At the very least, posters should provide a couple of rows and columns on their dataframe and code that can be used to easily create it. By easy, I mean cut and paste. Make it as small as possible to demonstrate your problem.
I have the following dataframe:
+-----+---+-----+----------+
|index| X|label| date|
+-----+---+-----+----------+
| 1| 1| A|2017-01-01|
| 2| 3| B|2017-01-02|
| 3| 5| A|2017-01-03|
| 4| 7| B|2017-01-04|
+-----+---+-----+----------+
which can be created with this code:
df = sqlCtx.createDataFrame(
[
(1, 1, 'A', '2017-01-01'),
(2, 3, 'B', '2017-01-02'),
(3, 5, 'A', '2017-01-03'),
(4, 7, 'B', '2017-01-04')
],
('index', 'X', 'label', 'date')
)
Ask your specific question and show us your desired output.
How can I create a new column 'is_divisible'
that has the value 'yes'
if the day of month of the 'date'
plus 7 days is divisible by the value in column'X'
, and 'no'
otherwise?
Desired output:
+-----+---+-----+----------+------------+
|index| X|label| date|is_divisible|
+-----+---+-----+----------+------------+
| 1| 1| A|2017-01-01| yes|
| 2| 3| B|2017-01-02| yes|
| 3| 5| A|2017-01-03| yes|
| 4| 7| B|2017-01-04| no|
+-----+---+-----+----------+------------+
Explain, in great detail, how you get your desired output. It helps to show an example calculation.
For instance in row 1, the X = 1 and date = 2017-01-01. Adding 7 days to date yields 2017-01-08. The day of the month is 8 and since 8 is divisible by 1, the answer is 'yes'.
Likewise, for the last row X = 7 and the date = 2017-01-04. Adding 7 to the date yields 11 as the day of the month. Since 11 % 7 is not 0, the answer is 'no'.
Show us what you have done or tried, including all* of the code even if it does not work. Tell us where you are getting stuck and if you receive an error, please include the error message.
(*You can leave out the code to create the spark context, but you should include all imports.)
I know how to add a new column that is date
plus 7 days but I'm having trouble getting the day of the month as an integer.
from pyspark.sql import functions as f
df.withColumn("next_week", f.date_add("date", 7))
If the question is related to performance tuning please include following information.
It is best to include extended execution plan. In Python:
df.explain(True)
In Scala:
df.explain(true)
or extended execution plan with statistics. In Python:
print(df._jdf.queryExecution().stringWithStats())
in Scala:
df.queryExecution.stringWithStats
mode
- local
, client
, `cluster.slow is relative, especially when you port non-distributed application or you expect low latency. Exact timings for different tasks and stages, can be retrieved from Spark UI (sc.uiWebUrl
) jobs
or Spark REST UI.
Using established names for each context allows us to quickly reproduce the problem.
sc
- for SparkContext
.sqlContext
- for SQLContext
.spark
- for SparkSession
.Powerful type inference is one of the most useful features of Scala, but it makes hard to analyze code taken out of context. Even if type is obvious from the context it is better to annotate the variables. Prefer
val lines: RDD[String] = sc.textFile("path")
val words: RDD[String] = lines.flatMap(_.split(" "))
over
val lines = sc.textFile("path")
val words = lines.flatMap(_.split(" "))
Commonly used tools can assist you:
spark-shell
/ Scala shell
use :t
scala> val rdd = sc.textFile("README.md")
rdd: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24
scala> :t rdd
org.apache.spark.rdd.RDD[String]
InteliJ Idea
Use Alt + =
Some additional suggestions to what has been already offered:
Spark is still evolving, although not so rapidly as in the days of 1.x. It is always (but especially if you are using a somewhat older version) a good idea to include your working version. Personally, I always start my answers with:
spark.version
# u'2.2.0'
or
sc.version
# u'2.2.0'
Including your Python version, too, is never a bad idea.
If your question is not strictly about Spark SQL & dataframes, e.g. if you intend to use your dataframe in some machine learning operation, be explicit about your imports - see this question, where the imports were added in the OP only after extensive exchange in the (now removed) comments (and turned out that these wrong imports were the root cause of the problem).
Why is this necessary? Because, for example, this LDA
from pyspark.mllib.clustering import LDA
is different from this LDA:
from pyspark.ml.clustering import LDA
the first coming from the old, RDD-based API (formerly Spark MLlib), while the second one from the new, dataframe-based API (Spark ML).
OK, I'll confess this is subjective: I believe that PySpark questions should not be tagged as python
by default; the thing is, python
tag gives automatically code highlighting (and I believe this is a main reason for those who use it for PySpark questions). Anyway, if you happen to agree, and you still would like a nice, highlighted code, simply include the relevant markdown directive:
<!-- language-all: lang-python -->
somewhere in your post, before your first code snippet.
[UPDATE: I have requested automatic syntax highlighting for pyspark
and sparkr
tags, which has been implemented indeed]
This small helper function might help to parse Spark output files into DataFrame:
PySpark:
from pyspark.sql.functions import *
def read_spark_output(file_path):
step1 = spark.read \
.option("header","true") \
.option("inferSchema","true") \
.option("delimiter","|") \
.option("parserLib","UNIVOCITY") \
.option("ignoreLeadingWhiteSpace","true") \
.option("ignoreTrailingWhiteSpace","true") \
.option("comment","+") \
.csv("file://{}".format(file_path))
# select not-null columns
step2 = t.select([c for c in t.columns if not c.startswith("_")])
# deal with 'null' string in column
return step2.select(*[when(~col(col_name).eqNullSafe("null"), col(col_name)).alias(col_name) for col_name in step2.columns])
Scala:
// read Spark Output Fixed width table:
def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = {
val step1 = spark.read
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", "|")
.option("parserLib", "UNIVOCITY")
.option("ignoreLeadingWhiteSpace", "true")
.option("ignoreTrailingWhiteSpace", "true")
.option("comment", "+")
.csv(filePath)
val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*)
val columns = step2.columns
columns.foldLeft(step2)((acc, c) => acc.withColumn(c, when(col(c) =!= "null", col(c))))
}
Usage:
df = read_spark_output("file:///tmp/spark.out")
PS: For pyspark, eqNullSafe
is available from spark 2.3
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With