Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark: Create New Column And Fill In Based on Conditions of Two Other Columns

I have the following data frame:

+---+---+------+
| id| ts|days_r|
+---+---+------+
|123|  T|    32|
|342|  I|     3|
|349|  L|    10|
+---+---+------+

I want to create a new column and fill in the values depending on if certain conditions are met on the "ts" column and "days_r" columns.

This is my desired data frame:

+---+---+------+----------+
| id| ts|days_r|0to2_count|
+---+---+------+----------+
|123|  T|    32|         1|
|342|  I|     3|         0|
|349|  L|    10|         0|
+---+---+------+----------+

I tried the following code in pyspark:

df = df.withColumn('0to2_count', F.when((F.col("ts") == 'I') & (F.col('days_r') >=0) & (F.col('days_r') <= 2), 1) \
    .otherwise(F.when((F.col("ts") == 'T') & (F.col('days_r') >=0) & (F.col('days_r') <= 48), 1) \
    .otherwise(F.when((F.col("ts") == 'L') & (F.col('days_r') >=0 & F.col('days_r') <= 7), 1) \
    .otherwise(0))))

I get the error below:

Traceback (most recent call last):
  File "perc_0to2", line 1, in <module>
  File "perc_0to2", line 9, in perc_0to2
  File "/tmp/conda-4df0bea5-3a72-444c-b3c5-f1562d678953/real/envs/conda-env/lib/python2.7/site-packages/pyspark/sql/column.py", line 115, in _
    njc = getattr(self._jc, name)(jc)
  File "/tmp/conda-4df0bea5-3a72-444c-b3c5-f1562d678953/real/envs/conda-env/lib/python2.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/tmp/conda-4df0bea5-3a72-444c-b3c5-f1562d678953/real/envs/conda-env/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/tmp/conda-4df0bea5-3a72-444c-b3c5-f1562d678953/real/envs/conda-env/lib/python2.7/site-packages/py4j/protocol.py", line 332, in get_return_value
    format(target_id, ".", name, value))
Py4JError: An error occurred while calling o826.and. Trace:
py4j.Py4JException: Method and([class java.lang.Integer]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.sendCommand(ClientServerConnection.java:244)
    at py4j.CallbackClient.sendCommand(CallbackClient.java:384)
    at py4j.CallbackClient.sendCommand(CallbackClient.java:356)
    at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:106)
    at com.sun.proxy.$Proxy94.execute(Unknown Source)
    at com.palantir.arrow.module.compute.DelegatedComputeService.lambda$execute$0(DelegatedComputeService.java:63)
    at com.palantir.foundry.spark.api.SparkAuthorization.runAsUserInternal(SparkAuthorization.java:164)
    at com.palantir.foundry.spark.api.SparkAuthorization.runAsUser(SparkAuthorization.java:105)
    at com.palantir.arrow.module.compute.DelegatedComputeService.execute(DelegatedComputeService.java:62)
    at com.palantir.arrow.module.ArrowSparkModuleResource.lambda$executeAsync$0(ArrowSparkModuleResource.java:106)
    at com.palantir.remoting3.tracing.DeferredTracer.withTrace(DeferredTracer.java:43)
    at com.palantir.remoting3.tracing.Tracers$TracingAwareCallable.call(Tracers.java:219)
    at com.codahale.metrics.InstrumentedExecutorService$InstrumentedCallable.call(InstrumentedExecutorService.java:197)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
like image 821
PineNuts0 Avatar asked Jul 27 '18 20:07

PineNuts0


People also ask

How do you create a new column based on multiple conditions PySpark?

Add Multiple Columns using Map You can add multiple columns to PySpark DataFrame in several ways if you wanted to add a known set of columns you can easily do it by chaining withColumn() or using select() .

How do I match two columns in PySpark?

PySpark Concatenate Using concat() concat() function of Pyspark SQL is used to concatenate multiple DataFrame columns into a single column. It can also be used to concatenate column types string, binary, and compatible array columns.

How do you update a PySpark DataFrame with new values from another DataFrame?

You can do update a PySpark DataFrame Column using withColum(), select() and sql(), since DataFrame's are distributed immutable collection you can't really change the column values however when you change the value using withColumn() or any approach, PySpark returns a new Dataframe with updated values.


1 Answers

Your code has a bug- you are missing a set of parentheses on the third line. Here is a way to fix your code, and use chained when() statements instead of using multiple otherwise() statements:

df = df.withColumn(
    '0to2_count',
    F.when((F.col("ts") == 'I') & (F.col("days_r") >=0) & (F.col("days_r") <= 2), 1)\
    .when((F.col("ts") == 'T') & (F.col('days_r') >=0) & (F.col('days_r') <= 48), 1)\
    .when((F.col("ts") == 'L') & (F.col('days_r') >=0) & (F.col('days_r') <= 7), 1)\
    .otherwise(0)
)

An even better way to write this logic is to use pyspark.sql.Column.between():

df = df.withColumn(
    '0to2_count',
    F.when((F.col("ts") == 'I') & F.col("days_r").between(0, 2), 1)\
    .when((F.col("ts") == 'T') & F.col('days_r').between(0,48), 1)\
    .when((F.col("ts") == 'L') & F.col('days_r').between(0,7), 1)\
    .otherwise(0)
)
df.show()
#+---+---+------+----------+
#| id| ts|days_r|0to2_count|
#+---+---+------+----------+
#|123|  T|    32|         1|
#|342|  I|     3|         0|
#|349|  L|    10|         0|
#+---+---+------+----------+

Of course since the first three conditions return the same value, you could further simplify this into one Boolean logic condition.

like image 61
pault Avatar answered Sep 21 '22 12:09

pault