I have timestamp dataset which is in format of
And I have written a udf in pyspark to process this dataset and return as Map of key values. But am getting below error message.
Dataset:df_ts_list
+--------------------+ | ts_list| +--------------------+ |[1477411200, 1477...| |[1477238400, 1477...| |[1477022400, 1477...| |[1477224000, 1477...| |[1477256400, 1477...| |[1477346400, 1476...| |[1476986400, 1477...| |[1477321200, 1477...| |[1477306800, 1477...| |[1477062000, 1477...| |[1477249200, 1477...| |[1477040400, 1477...| |[1477090800, 1477...| +--------------------+
Pyspark UDF:
>>> def on_time(ts_list): ... import sys ... import os ... sys.path.append('/usr/lib/python2.7/dist-packages') ... os.system("sudo apt-get install python-numpy -y") ... import numpy as np ... import datetime ... import time ... from datetime import timedelta ... ts = np.array(ts_list) ... if ts.size == 0: ... count = 0 ... duration = 0 ... st = time.mktime(datetime.now()) ... ymd = str(datetime.fromtimestamp(st).date()) ... else: ... ts.sort() ... one_tag = [] ... start = float(ts[0]) ... for i in range(len(ts)): ... if i == (len(ts)) - 1: ... end = float(ts[i]) ... a_round = [start, end] ... one_tag.append(a_round) ... else: ... diff = (datetime.datetime.fromtimestamp(float(ts[i+1])) - datetime.datetime.fromtimestamp(float(ts[i]))) ... if abs(diff.total_seconds()) > 3600: ... end = float(ts[i]) ... a_round = [start, end] ... one_tag.append(a_round) ... start = float(ts[i+1]) ... one_tag = [u for u in one_tag if u[1] - u[0] > 300] ... count = int(len(one_tag)) ... duration = int(np.diff(one_tag).sum()) ... ymd = str(datetime.datetime.fromtimestamp(time.time()).date()) ... return {'count':count,'duration':duration, 'ymd':ymd}
Pyspark code:
>>> on_time=udf(on_time, MapType(StringType(),StringType())) >>> df_ts_list.withColumn("one_tag", on_time("ts_list")).select("one_tag").show()
Error:
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/usr/lib/spark/python/pyspark/worker.py", line 172, in main process() File "/usr/lib/spark/python/pyspark/worker.py", line 167, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/usr/lib/spark/python/pyspark/worker.py", line 106, in <lambda> func = lambda _, it: map(mapper, it) File "/usr/lib/spark/python/pyspark/worker.py", line 92, in <lambda> mapper = lambda a: udf(*a) File "/usr/lib/spark/python/pyspark/worker.py", line 70, in <lambda> return lambda *a: f(*a) File "<stdin>", line 27, in on_time File "/usr/lib/spark/python/pyspark/sql/functions.py", line 39, in _ jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col) AttributeError: 'NoneType' object has no attribute '_jvm'
Any help would be appreciated!
Mariusz answer didn't really help me. So if you like me found this because it's the only result on google and you're new to pyspark (and spark in general), here's what worked for me.
In my case I was getting that error because I was trying to execute pyspark code before the pyspark environment had been set up.
Making sure that pyspark was available and set up before doing calls dependent on pyspark.sql.functions
fixed the issue for me.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With