You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi xarray people,
My question is a bit complicated, I'm not sure where the "bug" is whether in xarray, pandas, dask or numpy and its quite a intricate example, I apologize in advance.
I have a very heavy computation that involves groupby that I need to run on huge datasets (its bias-adjustment for climate data). Luckily, the whole thing is "vectorizable", it only acts on the "time" dimension. However, it is so heavy that when using it with dask arrays the number of tasks becomes unbearable and dask simply hangs. With "operational-sized" datasets I had runs with over 5 billions tasks : it never began computing.
A solution is to use apply_ufunc with dask='parallelized, so that dask only sees 1 operation. It now comes down to ~5000 tasks (most of which are the apply_ufunc overhead).
Problem
The more chunks I feed, the slower they get. Through the dashboard, I can see that my "func" takes ~7-8 s to run where there is only 1 chunk, and ~18-22s when there are 3 chunks in parallel (of the same size). This is using a local Client with only one worker and ~20 threads.
After a lot of debugging I believe I can narrow it down to groupby. When using it, execution time of the apply_ufunc task is somewhat inversely proportional to the number of tasks in parallel. This is not true when using many workers and 1 thread. Therefore I am beginning to believe that something in the groupby.map does not release the GIL.
Minimal (lol) example
importnumpyasnpimportxarrayasxrdeffunc_group(da):
# This is some computation that takes time# I've had the same problem when taking "quantiles" instead of polyfitreturnda.polyfit('time', deg=3).polyfit_coefficients.mean('degree')
deffunc_1d(arr, time, dims):
# Need to recreate DataArrays, we receive numpy arraysda=xr.DataArray(arr, dims=(*dims, 'time'), coords={'time': time})
out=da.groupby('time.dayofyear').map(func_group)
returnout.transpose(..., 'dayofyear').valuesdeffunc(da):
dims=list(da.dims)
dims.remove('time')
out=xr.apply_ufunc(
func_1d,
da,
input_core_dims=[['time']],
output_core_dims=[['dayofyear']],
kwargs={'time': da.time.values, 'dims': dims},
dask='parallelized',
output_dtypes=[np.float],
dask_gufunc_kwargs={
'meta': (np.ndarray((), dtype=float)),
'output_sizes': {'dayofyear': 365},
}
)
returnout# My test cases is (lat, lon, time) (240, 360, 32000) with chunks (40, 40, 32000)ds=xr.open_dataset('some huge dataset that has small spatial chunks')
nlon=40# 1 chunkforiinrange(0, 240, 40):
forjinrange(0, 360, nlon):
da=ds.isel(lat=slice(i, i+40), lon=slice(j, j+nlon)).tasminfunc(da).to_netcdf(f'test_{i}_{j}.nc')
print(f'Done {i}x{j}')
My observations:
When using nlon = 40 and a client with 1 worker of 20 threads, the "func_1d" task runs for ~7 s in the dashboard.
When using nlon = 120 and a client with 1 worker of 20 threads, the "func_1d" task runs for ~18 s in the dashboard.
When using nlon = 40 and a client with 20 workers of 1 thread the "func_1d" task runs for ~8 s in the dashboard.
When using nlon = 120 and a client with 20 workers of 1 thread the "func_1d" task runs for ~12 s in the dashboard.
When using nlon = 360 and a client with 20 workers of 1 thread the "func_1d" task runs for ~14 s in the dashboard.
Questions
It seems that somewhere in groupby.map something does not release the GIL. Is this true? Is is a bug?
Are there other ways to wrap huge computation in single tasks for dask?
I'm quite sorry for the long post. I'll hold eternal gratitude if someone can help me out on this.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hi xarray people,
My question is a bit complicated, I'm not sure where the "bug" is whether in xarray, pandas, dask or numpy and its quite a intricate example, I apologize in advance.
I have a very heavy computation that involves
groupby
that I need to run on huge datasets (its bias-adjustment for climate data). Luckily, the whole thing is "vectorizable", it only acts on the "time" dimension. However, it is so heavy that when using it with dask arrays the number of tasks becomes unbearable and dask simply hangs. With "operational-sized" datasets I had runs with over 5 billions tasks : it never began computing.A solution is to use
apply_ufunc
withdask='parallelized
, so that dask only sees 1 operation. It now comes down to ~5000 tasks (most of which are the apply_ufunc overhead).Problem
The more chunks I feed, the slower they get. Through the dashboard, I can see that my "func" takes ~7-8 s to run where there is only 1 chunk, and ~18-22s when there are 3 chunks in parallel (of the same size). This is using a local Client with only one worker and ~20 threads.
After a lot of debugging I believe I can narrow it down to
groupby
. When using it, execution time of the apply_ufunc task is somewhat inversely proportional to the number of tasks in parallel. This is not true when using many workers and 1 thread. Therefore I am beginning to believe that something in thegroupby.map
does not release the GIL.Minimal (lol) example
My observations:
When using
nlon = 40
and a client with 1 worker of 20 threads, the "func_1d" task runs for ~7 s in the dashboard.When using
nlon = 120
and a client with 1 worker of 20 threads, the "func_1d" task runs for ~18 s in the dashboard.When using
nlon = 40
and a client with 20 workers of 1 thread the "func_1d" task runs for ~8 s in the dashboard.When using
nlon = 120
and a client with 20 workers of 1 thread the "func_1d" task runs for ~12 s in the dashboard.When using
nlon = 360
and a client with 20 workers of 1 thread the "func_1d" task runs for ~14 s in the dashboard.Questions
groupby.map
something does not release the GIL. Is this true? Is is a bug?I'm quite sorry for the long post. I'll hold eternal gratitude if someone can help me out on this.
Beta Was this translation helpful? Give feedback.
All reactions