I have the following data frame:
+---+---+------+
| id| ts|days_r|
+---+---+------+
|123| T| 32|
|342| I| 3|
|349| L| 10|
+---+---+------+
I want to create a new column and fill in the values depending on if certain conditions are met on the "ts" column and "days_r" columns.
This is my desired data frame:
+---+---+------+----------+
| id| ts|days_r|0to2_count|
+---+---+------+----------+
|123| T| 32| 1|
|342| I| 3| 0|
|349| L| 10| 0|
+---+---+------+----------+
I tried the following code in pyspark:
df = df.withColumn('0to2_count', F.when((F.col("ts") == 'I') & (F.col('days_r') >=0) & (F.col('days_r') <= 2), 1) \
.otherwise(F.when((F.col("ts") == 'T') & (F.col('days_r') >=0) & (F.col('days_r') <= 48), 1) \
.otherwise(F.when((F.col("ts") == 'L') & (F.col('days_r') >=0 & F.col('days_r') <= 7), 1) \
.otherwise(0))))
I get the error below:
Traceback (most recent call last):
File "perc_0to2", line 1, in <module>
File "perc_0to2", line 9, in perc_0to2
File "/tmp/conda-4df0bea5-3a72-444c-b3c5-f1562d678953/real/envs/conda-env/lib/python2.7/site-packages/pyspark/sql/column.py", line 115, in _
njc = getattr(self._jc, name)(jc)
File "/tmp/conda-4df0bea5-3a72-444c-b3c5-f1562d678953/real/envs/conda-env/lib/python2.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/tmp/conda-4df0bea5-3a72-444c-b3c5-f1562d678953/real/envs/conda-env/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/tmp/conda-4df0bea5-3a72-444c-b3c5-f1562d678953/real/envs/conda-env/lib/python2.7/site-packages/py4j/protocol.py", line 332, in get_return_value
format(target_id, ".", name, value))
Py4JError: An error occurred while calling o826.and. Trace:
py4j.Py4JException: Method and([class java.lang.Integer]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:244)
at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
at com.sun.proxy.$Proxy94.execute(Unknown Source)
at com.palantir.arrow.module.compute.DelegatedComputeService.lambda$execute$0(DelegatedComputeService.java:63)
at com.palantir.foundry.spark.api.SparkAuthorization.runAsUserInternal(SparkAuthorization.java:164)
at com.palantir.foundry.spark.api.SparkAuthorization.runAsUser(SparkAuthorization.java:105)
at com.palantir.arrow.module.compute.DelegatedComputeService.execute(DelegatedComputeService.java:62)
at com.palantir.arrow.module.ArrowSparkModuleResource.lambda$executeAsync$0(ArrowSparkModuleResource.java:106)
at com.palantir.remoting3.tracing.DeferredTracer.withTrace(DeferredTracer.java:43)
at com.palantir.remoting3.tracing.Tracers$TracingAwareCallable.call(Tracers.java:219)
at com.codahale.metrics.InstrumentedExecutorService$InstrumentedCallable.call(InstrumentedExecutorService.java:197)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Add Multiple Columns using Map You can add multiple columns to PySpark DataFrame in several ways if you wanted to add a known set of columns you can easily do it by chaining withColumn() or using select() .
PySpark Concatenate Using concat() concat() function of Pyspark SQL is used to concatenate multiple DataFrame columns into a single column. It can also be used to concatenate column types string, binary, and compatible array columns.
You can do update a PySpark DataFrame Column using withColum(), select() and sql(), since DataFrame's are distributed immutable collection you can't really change the column values however when you change the value using withColumn() or any approach, PySpark returns a new Dataframe with updated values.
Your code has a bug- you are missing a set of parentheses on the third line. Here is a way to fix your code, and use chained when()
statements instead of using multiple otherwise()
statements:
df = df.withColumn(
'0to2_count',
F.when((F.col("ts") == 'I') & (F.col("days_r") >=0) & (F.col("days_r") <= 2), 1)\
.when((F.col("ts") == 'T') & (F.col('days_r') >=0) & (F.col('days_r') <= 48), 1)\
.when((F.col("ts") == 'L') & (F.col('days_r') >=0) & (F.col('days_r') <= 7), 1)\
.otherwise(0)
)
An even better way to write this logic is to use pyspark.sql.Column.between()
:
df = df.withColumn(
'0to2_count',
F.when((F.col("ts") == 'I') & F.col("days_r").between(0, 2), 1)\
.when((F.col("ts") == 'T') & F.col('days_r').between(0,48), 1)\
.when((F.col("ts") == 'L') & F.col('days_r').between(0,7), 1)\
.otherwise(0)
)
df.show()
#+---+---+------+----------+
#| id| ts|days_r|0to2_count|
#+---+---+------+----------+
#|123| T| 32| 1|
#|342| I| 3| 0|
#|349| L| 10| 0|
#+---+---+------+----------+
Of course since the first three conditions return the same value, you could further simplify this into one Boolean logic condition.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With