I am trying to execute a simple task(an instance method) using dask(async) framework but it fails with serialization error.
Can someone point me in right direction.
Here is the code that I am running:
from dask.distributed import Client, as_completed
import time
class DaskConnect:
def __init__(self):
print("Initialized:",self.__class__.__name__)
self.scheduler_host="192.168.0.4"
self.scheduler_port="8786"
def connect(self):
self.client = Client(self.scheduler_host+":"+self.scheduler_port)
# self.client = Client()
return self.client
def disconnect(self):
self.client.close()
class TestDask:
def __init__(self):
print("Initialized:",self.__class__.__name__)
self.dask_client=DaskConnect().connect()
def do_task(self,msg):
time.sleep(30)
return msg
def run(self):
tasks=[1]
# tasks = [1, 2, 3, 4, 5]
futures=[]
for task in tasks:
print("Submitting:",task)
future = self.dask_client.submit(self.do_task, "Task:"+str(task))
futures.append(future)
for future in as_completed(futures):
result = future.result()
print("Result",result)
TestDask().run()
Error:
distributed.protocol.pickle - INFO - Failed to serialize main.TestDask object at 0x101c408d0>>. Exception: can't pickle select.kqueue objects Traceback (most recent call last):
Dask Clients aren't currently serializable. Any object that contains a Dask Client will also not be serializable. Generally it is challenge to serialize anything that contains active network connections, locks, etc..
Perhaps there is another approach to your problem?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With