Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dask: Submit continuously, work on all submitted data

Having 500, continously growing DataFrames, I would like to submit operations on the (for each DataFrame indipendent) data to dask. My main question is: Can dask hold the continously submitted data, so I can submit a function on all the submitted data - not just the newly submitted?

But lets explain it on an example:

Creating a dask_server.py:

from dask.distributed import Client, LocalCluster
HOST = '127.0.0.1'
SCHEDULER_PORT = 8711
DASHBOARD_PORT = ':8710'

def run_cluster():
    cluster = LocalCluster(dashboard_address=DASHBOARD_PORT, scheduler_port=SCHEDULER_PORT, n_workers=8)
    print("DASK Cluster Dashboard = http://%s%s/status" % (HOST, DASHBOARD_PORT))
    client = Client(cluster)
    print(client)
    print("Press Enter to quit ...")
    input()

if __name__ == '__main__':
    run_cluster()

Now I can connect from my my_stream.py and start to submit and gather data:

DASK_CLIENT_IP = '127.0.0.1'
dask_con_string = 'tcp://%s:%s' % (DASK_CLIENT_IP, DASK_CLIENT_PORT)
dask_client = Client(self.dask_con_string)

def my_dask_function(lines):
    return lines['a'].mean() + lines['b'].mean

def async_stream_redis_to_d(max_chunk_size = 1000):
    while 1:

        # This is a redis queue, but can be any queueing/file-stream/syslog or whatever
        lines = self.queue_IN.get(block=True, max_chunk_size=max_chunk_size)

        futures = []
        df = pd.DataFrame(data=lines, columns=['a','b','c'])
        futures.append(dask_client.submit(my_dask_function, df))

        result = self.dask_client.gather(futures)
        print(result)

        time sleep(0.1)

if __name__ == '__main__':
    max_chunk_size = 1000
    thread_stream_data_from_redis = threading.Thread(target=streamer.async_stream_redis_to_d, args=[max_chunk_size])
    #thread_stream_data_from_redis.setDaemon(True)
    thread_stream_data_from_redis.start()
    # Lets go

This works as expected and it is really quick!!!

But next, I would like to actually append the lines first before the computation takes place - And wonder if this is possible? So in our example here, I would like to calculate the mean over all lines which have been submitted, not only the last submitted ones.

Questions / Approaches:

  1. Is this cummulative calculation possible?
  2. Bad Alternative 1: I cache all lines locally and submit all the data to the cluster every time a new row arrives. This is like an exponential overhead. Tried it, it works, but it is slow!
  3. Golden Option: Python Program 1 pushes the data. Than it would be possible to connect with another client (from another python program) to that cummulated data and move the analysis logic away from the inserting logic. I think Published DataSets are the way to go, but are there applicable for this high-speed appends?

Maybe related: Distributed Variables, Actors Worker

like image 826
gies0r Avatar asked Nov 07 '22 07:11

gies0r


1 Answers

Assigning a list of futures to a published dataset seems ideal to me. This is relatively cheap (everything is metadata) and you'll be up-to-date as of a few milliseconds

client.datasets["x"] = list_of_futures

def worker_function(...):
    futures = get_client().datasets["x"]
    data = get_client.gather(futures)
    ... work with data

As you mention there are other systems like PubSub or Actors. From what you say though I suspect that Futures + Published datasets are simpler and a more pragmatic option.

like image 120
MRocklin Avatar answered Nov 22 '22 10:11

MRocklin