Skip to content

Commit 2141063

Browse files
committed
WIP: Split out apply pandas parallel function
1 parent b18ef75 commit 2141063

File tree

2 files changed

+196
-60
lines changed

2 files changed

+196
-60
lines changed

docs/tutorials/pandas_accessor_tutorial.py

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,9 @@ def create_df(
175175
)
176176
small_ts
177177

178+
# %%
179+
small_ts.ct.differentiate(progress=True)
180+
178181
# %% [markdown]
179182
# Then we can use standard Continuous timeseries APIs,
180183
# e.g. plotting.
@@ -201,6 +204,15 @@ def create_df(
201204
)
202205
ax.legend()
203206

207+
# %%
208+
# TODO: move this to ops section
209+
ax = (
210+
small_ts.loc[pix.isin(variable="variable_0", run=0)]
211+
.ct.differentiate()
212+
.ct.plot(label="scenario", continuous_plot_kwargs=dict(alpha=0.9))
213+
)
214+
ax.legend()
215+
204216
# %% [markdown]
205217
# If we have a bigger `pd.DataFrame`, the conversion process can be much slower.
206218

@@ -260,7 +272,7 @@ def create_df(
260272
# [here](https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods)).
261273

262274
# %%
263-
bigger_df.ct.to_timeseries(
275+
bigger_ts = bigger_df.ct.to_timeseries(
264276
time_units="yr",
265277
interpolation=ct.InterpolationOption.Linear,
266278
n_processes=n_processes,
@@ -272,6 +284,46 @@ def create_df(
272284
progress_nested=True,
273285
mp_context=multiprocessing.get_context("fork"),
274286
)
287+
bigger_ts
288+
289+
# %% [markdown]
290+
# The same logic can be applied to other operations.
291+
292+
# %%
293+
diff_ts = create_df(
294+
n_scenarios=50,
295+
n_variables=1,
296+
n_runs=600,
297+
timepoints=np.arange(75) + 2025.0,
298+
).ct.to_timeseries(
299+
time_units="yr",
300+
interpolation=ct.InterpolationOption.Linear,
301+
n_processes=n_processes,
302+
progress=True,
303+
progress_nested=True,
304+
mp_context=multiprocessing.get_context("fork"),
305+
)
306+
diff_ts
307+
308+
# %%
309+
diff_ts.ct.differentiate(progress=True)
310+
311+
# %%
312+
diff_ts.ct.differentiate(n_processes=n_processes)
313+
314+
# %%
315+
diff_ts.ct.differentiate(n_processes=n_processes, progress=True)
316+
317+
# %%
318+
diff_ts.ct.differentiate(
319+
n_processes=n_processes,
320+
progress=True,
321+
progress_nested=True,
322+
mp_context=multiprocessing.get_context("fork"),
323+
)
324+
325+
# %% [markdown]
326+
# Demonstrate how to control parallel etc. with global config.
275327

276328
# %% [markdown]
277329
# On big `pd.DataFrame`'s the combination with
@@ -306,7 +358,7 @@ def create_df(
306358
# %%
307359
sns_df = small_ts.loc[
308360
pix.isin(scenario=[f"scenario_{i}" for i in range(2)])
309-
# Rename to `to_tidy_df`
361+
# Rename to `to_tidy_df`
310362
].ct.to_sns_df(increase_resolution=100)
311363
sns_df
312364

@@ -330,9 +382,15 @@ def create_df(
330382
)
331383

332384
fig, ax = plt.subplots()
333-
for scenario, s_ts in small_ts.loc[pix.isin(variable="variable_0")].groupby("scenario", observed=True):
385+
for scenario, s_ts in small_ts.loc[pix.isin(variable="variable_0")].groupby(
386+
"scenario", observed=True
387+
):
334388
for quantiles, alpha in quantiles_plumes:
335-
s_quants = s_ts.ct.to_df(increase_resolution=increase_resolution).groupby(small_ts.index.names.difference(plumes_over), observed=True).quantile(quantiles)
389+
s_quants = (
390+
s_ts.ct.to_df(increase_resolution=increase_resolution)
391+
.groupby(small_ts.index.names.difference(plumes_over), observed=True)
392+
.quantile(quantiles)
393+
)
336394
if isinstance(quantiles, tuple):
337395
ax.fill_between(
338396
s_quants.columns.values.squeeze(),
@@ -355,14 +413,12 @@ def create_df(
355413

356414
# %%
357415
(
358-
small_ts
359-
.ct.to_df(increase_resolution=5)
416+
small_ts.ct.to_df(increase_resolution=5)
360417
.groupby(small_ts.index.names.difference(["run"]), observed=True)
361418
.quantile([0.05, 0.5, 0.95])
362419
)
363420

364421
# %% [markdown]
365-
# - other operations, also with progress, parallel, parallel with progress
366422
# - plot with basic control over labels
367423
# - plot with grouping and plumes for ranges (basically reproduce scmdata API)
368424
# - convert with more fine-grained control over interpolation

src/continuous_timeseries/pandas_accessors.py

Lines changed: 133 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import concurrent.futures
88
from collections.abc import Iterator
9+
from functools import partial
910
from multiprocessing.context import BaseContext
1011
from typing import TYPE_CHECKING, Any, TypeVar
1112

@@ -23,6 +24,94 @@
2324
P = TypeVar("P", bound=pd.DataFrame | pd.Series[Any])
2425

2526

27+
def apply_pandas_op_parallel(
28+
obj,
29+
op,
30+
n_processes: int,
31+
progress: bool = False,
32+
progress_nested: bool = False,
33+
mp_context: BaseContext | None = None,
34+
):
35+
iterator = get_chunks(obj, n_chunks=n_processes)
36+
if progress:
37+
try:
38+
from tqdm.auto import tqdm
39+
except ImportError as exc:
40+
raise MissingOptionalDependencyError( # noqa: TRY003
41+
"apply_pandas_op_parallel(..., progress=True)", requirement="tdqm"
42+
) from exc
43+
44+
iterator = tqdm(iterator, desc="submitting to pool")
45+
46+
with concurrent.futures.ProcessPoolExecutor(
47+
max_workers=n_processes, mp_context=mp_context
48+
) as pool:
49+
futures = [
50+
pool.submit(
51+
op,
52+
chunk,
53+
progress=progress_nested,
54+
progress_bar_position=i,
55+
)
56+
for i, chunk in enumerate(iterator)
57+
]
58+
59+
iterator_results = concurrent.futures.as_completed(futures)
60+
if progress:
61+
iterator_results = tqdm(
62+
iterator_results,
63+
desc="Retrieving parallel results",
64+
total=len(futures),
65+
)
66+
67+
res_l = [future.result() for future in iterator_results]
68+
69+
# Late import to avoid hard dependency on pandas
70+
try:
71+
import pandas as pd
72+
except ImportError as exc:
73+
raise MissingOptionalDependencyError(
74+
"apply_pandas_op_parallel", requirement="pandas"
75+
) from exc
76+
77+
# This assumes that the index isn't mangled.
78+
# Using pix.concat might be safer,
79+
# or we make the concatenation injectable.
80+
res = pd.concat(res_l)
81+
82+
return res
83+
84+
85+
def differentiate_parallel_helper(
86+
series: pd.Series[Timeseries],
87+
progress: bool = False,
88+
progress_bar_position: int = 0,
89+
) -> pd.Series[Timeseries]:
90+
if progress:
91+
try:
92+
from tqdm.auto import tqdm
93+
except ImportError as exc:
94+
raise MissingOptionalDependencyError( # noqa: TRY003
95+
"dist(..., progress=True)", requirement="tdqm"
96+
) from exc
97+
98+
tqdm_kwargs = dict(position=progress_bar_position)
99+
tqdm.pandas(**tqdm_kwargs)
100+
meth_to_call = "progress_map"
101+
# No-one knows why this is needed, but it is in jupyter notebooks
102+
print(end=" ")
103+
104+
else:
105+
meth_to_call = "map"
106+
107+
res = getattr(series, meth_to_call)(
108+
lambda x: x.differentiate(),
109+
# name="injectable?",
110+
)
111+
112+
return res
113+
114+
26115
class SeriesCTAccessor:
27116
"""
28117
[`pd.Series`][pandas.Series] accessors
@@ -89,7 +178,7 @@ def to_df(self, increase_resolution: int | None = None) -> pd.DataFrame:
89178
return df
90179

91180
# TODO: add this to DataFrame accessor to allow for time filtering in the middle
92-
def to_sns_df(self, increase_resolution: int = 100):
181+
def to_sns_df(self, increase_resolution: int = 100) -> pd.DataFrame:
93182
# TODO: progress bar and parallelisation
94183
# TODO: time_units and out_units passing
95184
return (
@@ -102,6 +191,33 @@ def to_sns_df(self, increase_resolution: int = 100):
102191
.reset_index()
103192
)
104193

194+
def differentiate(
195+
self,
196+
# res_name: str = "ts",
197+
progress: bool = False,
198+
progress_nested: bool = False,
199+
n_processes: int = 1,
200+
mp_context: BaseContext | None = None,
201+
) -> pd.Series[Timeseries]: # type: ignore
202+
if n_processes == 1:
203+
res = differentiate_parallel_helper(
204+
self._series,
205+
progress=progress,
206+
)
207+
208+
return res
209+
210+
res = apply_pandas_op_parallel(
211+
self._series,
212+
op=differentiate_parallel_helper,
213+
n_processes=n_processes,
214+
progress=progress,
215+
progress_nested=progress_nested,
216+
mp_context=mp_context,
217+
)
218+
219+
return res
220+
105221
def plot(
106222
self,
107223
label: str | tuple[str, ...] | None = None,
@@ -215,8 +331,7 @@ def get_timeseries_parallel_helper(
215331
tqdm_kwargs = dict(position=progress_bar_position)
216332
tqdm.pandas(**tqdm_kwargs)
217333
meth_to_call = "progress_apply"
218-
# No-one knows why this is needed, but it is
219-
# jupyter notebooks
334+
# No-one knows why this is needed, but it is in jupyter notebooks
220335
print(end=" ")
221336

222337
else:
@@ -288,56 +403,21 @@ def to_timeseries( # noqa: PLR0913
288403

289404
return res
290405

291-
# I think it should be possible to split out a
292-
# `apply_pandas_op_parallel` or similar function.
293-
iterator = get_chunks(self._df, n_chunks=n_processes)
294-
if progress:
295-
try:
296-
from tqdm.auto import tqdm
297-
except ImportError as exc:
298-
raise MissingOptionalDependencyError( # noqa: TRY003
299-
"to_timeseries(..., progress=True)", requirement="tdqm"
300-
) from exc
301-
302-
iterator = tqdm(iterator, desc="submitting to pool")
303-
304-
with concurrent.futures.ProcessPoolExecutor(
305-
max_workers=n_processes, mp_context=mp_context
306-
) as pool:
307-
futures = [
308-
pool.submit(
309-
get_timeseries_parallel_helper,
310-
chunk,
311-
interpolation=interpolation,
312-
time_units=time_units,
313-
units_col=units_col,
314-
idx_separator=idx_separator,
315-
ur=ur,
316-
progress=progress_nested,
317-
progress_bar_position=i,
318-
)
319-
for i, chunk in enumerate(iterator)
320-
]
321-
322-
iterator_results = concurrent.futures.as_completed(futures)
323-
if progress:
324-
iterator_results = tqdm(
325-
iterator_results,
326-
desc="Retrieving parallel results",
327-
total=len(futures),
328-
)
329-
330-
res_l = [future.result() for future in iterator_results]
331-
332-
# Late import to avoid hard dependency on pandas
333-
try:
334-
import pandas as pd
335-
except ImportError as exc:
336-
raise MissingOptionalDependencyError(
337-
"interpolate", requirement="pandas"
338-
) from exc
339-
340-
res = pd.concat(res_l)
406+
res = apply_pandas_op_parallel(
407+
self._df,
408+
op=partial(
409+
get_timeseries_parallel_helper,
410+
interpolation=interpolation,
411+
time_units=time_units,
412+
units_col=units_col,
413+
idx_separator=idx_separator,
414+
ur=ur,
415+
),
416+
n_processes=n_processes,
417+
progress=progress,
418+
progress_nested=progress_nested,
419+
mp_context=mp_context,
420+
)
341421

342422
return res
343423

0 commit comments

Comments
 (0)