2 years ago
#34626
Benjamin hansen
distribute max X columns using dask
I have very large hdf files each with a dataset X of for example shape (24000000,8000)
of dtype Int16
.
I need to run a function on a subset of each of these columns, say X[50000:-50000,:]
.
This is way too big for memory, so I need to do something like
result = []
for x in X[50000:-50000,:].T:
result.append(c.compute(myfunc,x)) # block until a worker is ready
basically where I go from 1st to last columns and distribute say 10 rows to workers, and whenever one is done, compute next column, then next etc. This way I will only ever have 10 columns in memory which will fit nicely.
What is the right way to do this using DASK
?
I have come up with this manual solution which seems really super clumsy:
from dask.distributed import Client
import time
from itertools import chain
import numpy as np
client = Client()
def slow_func(x):
wt = np.random.randint(100,22000000)
print(wt)
for i in range(wt):
2+2
return x.sum()
def any_finished(jobs):
return any([True for j in jobs if j.status=='finished'])
def split(jobs):
done, pending = [], []
for j in jobs:
if j.status == 'finished':
done.append(j)
else:
pending.append(j)
return done, pending
jobs = []
res = []
my_simulated_hdf = np.random.randn(20,10)
max_r = 4
for i in range(my_simulated_hdf.shape[1]):
while len(jobs)>=max_r and not any_finished(jobs):
time.sleep(.5)
done, jobs = split(jobs)
if done:
res.append(client.gather(done))
jobs.append( client.submit(slow_func, my_simulated_hdf[:,i]) )
res.append(client.gather(jobs))
print(list(chain(*res)))
and furthermore, the ordering of the results is not even ensured in this clumsy solution...
python-3.x
dask-distributed
dask-delayed
dask-dataframe
0 Answers
Your Answer