2 years ago

#34626

test-img

Benjamin hansen

distribute max X columns using dask

I have very large hdf files each with a dataset X of for example shape (24000000,8000) of dtype Int16. I need to run a function on a subset of each of these columns, say X[50000:-50000,:]. This is way too big for memory, so I need to do something like

result = []
for x in X[50000:-50000,:].T:
    result.append(c.compute(myfunc,x)) # block until a worker is ready

basically where I go from 1st to last columns and distribute say 10 rows to workers, and whenever one is done, compute next column, then next etc. This way I will only ever have 10 columns in memory which will fit nicely.

What is the right way to do this using DASK?

I have come up with this manual solution which seems really super clumsy:

from dask.distributed import Client
import time
from itertools import chain
import numpy as np

client = Client()

def slow_func(x):
    wt = np.random.randint(100,22000000)
    print(wt)
    for i in range(wt):
        2+2
    return x.sum()
def any_finished(jobs):
    return any([True for j in jobs if j.status=='finished'])
def split(jobs):
    done, pending = [], []
    for j in jobs:
        if j.status == 'finished':
            done.append(j)
        else:
            pending.append(j)
    return done, pending
jobs = []
res = []
my_simulated_hdf = np.random.randn(20,10)
max_r = 4
for i in range(my_simulated_hdf.shape[1]):
    while len(jobs)>=max_r and not any_finished(jobs):
        time.sleep(.5)
    done, jobs = split(jobs)
    if done:
        res.append(client.gather(done))
    jobs.append( client.submit(slow_func, my_simulated_hdf[:,i]) )

res.append(client.gather(jobs))
print(list(chain(*res)))

and furthermore, the ordering of the results is not even ensured in this clumsy solution...

python-3.x

dask-distributed

dask-delayed

dask-dataframe

0 Answers

Your Answer

Accepted video resources