8
8
import boost_histogram as bh
9
9
import dask .array as da
10
10
import numpy as np
11
- from dask .bag .core import empty_safe_aggregate , partition_all
11
+ from dask .bag .core import empty_safe_aggregate
12
12
from dask .base import DaskMethodsMixin , is_dask_collection , tokenize
13
13
from dask .dataframe .core import partitionwise_graph as partitionwise
14
14
from dask .delayed import Delayed
15
15
from dask .highlevelgraph import HighLevelGraph
16
16
from dask .threaded import get as tget
17
17
from dask .utils import is_dataframe_like , key_split
18
+ from tlz import partition_all
18
19
19
20
if TYPE_CHECKING :
20
21
from numpy .typing import NDArray
21
22
22
- from .typing import DaskCollection
23
+ from dask_histogram .typing import DaskCollection
23
24
24
25
__all__ = (
25
26
"AggHistogram" ,
29
30
)
30
31
31
32
32
- def clone (histref : bh .Histogram = None ) -> bh .Histogram :
33
+ def clone (histref : bh .Histogram | None = None ) -> bh .Histogram :
33
34
"""Create a Histogram object based on another.
34
35
35
36
The axes and storage of the `histref` will be used to create a new
@@ -54,7 +55,7 @@ def clone(histref: bh.Histogram = None) -> bh.Histogram:
54
55
def _blocked_sa (
55
56
data : Any ,
56
57
* ,
57
- histref : bh .Histogram = None ,
58
+ histref : bh .Histogram | None = None ,
58
59
) -> bh .Histogram :
59
60
"""Blocked calculation; single argument; unweighted; no sample."""
60
61
if data .ndim == 1 :
@@ -69,7 +70,7 @@ def _blocked_sa_s(
69
70
data : Any ,
70
71
sample : Any ,
71
72
* ,
72
- histref : bh .Histogram = None ,
73
+ histref : bh .Histogram | None = None ,
73
74
) -> bh .Histogram :
74
75
"""Blocked calculation; single argument; unweighted; with sample."""
75
76
if data .ndim == 1 :
@@ -84,7 +85,7 @@ def _blocked_sa_w(
84
85
data : Any ,
85
86
weights : Any ,
86
87
* ,
87
- histref : bh .Histogram = None ,
88
+ histref : bh .Histogram | None = None ,
88
89
) -> bh .Histogram :
89
90
"""Blocked calculation; single argument; weighted; no sample."""
90
91
if data .ndim == 1 :
@@ -100,7 +101,7 @@ def _blocked_sa_w_s(
100
101
weights : Any ,
101
102
sample : Any ,
102
103
* ,
103
- histref : bh .Histogram = None ,
104
+ histref : bh .Histogram | None = None ,
104
105
) -> bh .Histogram :
105
106
"""Blocked calculation; single argument; weighted; with sample."""
106
107
if data .ndim == 1 :
@@ -113,15 +114,15 @@ def _blocked_sa_w_s(
113
114
114
115
def _blocked_ma (
115
116
* data : Any ,
116
- histref : bh .Histogram = None ,
117
+ histref : bh .Histogram | None = None ,
117
118
) -> bh .Histogram :
118
119
"""Blocked calculation; multiargument; unweighted; no sample."""
119
120
return clone (histref ).fill (* data )
120
121
121
122
122
123
def _blocked_ma_s (
123
124
* data : Any ,
124
- histref : bh .Histogram = None ,
125
+ histref : bh .Histogram | None = None ,
125
126
) -> bh .Histogram :
126
127
"""Blocked calculation; multiargument; unweighted; with sample."""
127
128
sample = data [- 1 ]
@@ -131,7 +132,7 @@ def _blocked_ma_s(
131
132
132
133
def _blocked_ma_w (
133
134
* data : Any ,
134
- histref : bh .Histogram = None ,
135
+ histref : bh .Histogram | None = None ,
135
136
) -> bh .Histogram :
136
137
"""Blocked calculation; multiargument; weighted; no sample."""
137
138
weights = data [- 1 ]
@@ -141,7 +142,7 @@ def _blocked_ma_w(
141
142
142
143
def _blocked_ma_w_s (
143
144
* data : Any ,
144
- histref : bh .Histogram = None ,
145
+ histref : bh .Histogram | None = None ,
145
146
) -> bh .Histogram :
146
147
"""Blocked calculation; multiargument; weighted; with sample."""
147
148
weights = data [- 2 ]
@@ -153,7 +154,7 @@ def _blocked_ma_w_s(
153
154
def _blocked_df (
154
155
data : Any ,
155
156
* ,
156
- histref : bh .Histogram = None ,
157
+ histref : bh .Histogram | None = None ,
157
158
) -> bh .Histogram :
158
159
return clone (histref ).fill (* (data [c ] for c in data .columns ), weight = None )
159
160
@@ -162,7 +163,7 @@ def _blocked_df_s(
162
163
data : Any ,
163
164
sample : Any ,
164
165
* ,
165
- histref : bh .Histogram = None ,
166
+ histref : bh .Histogram | None = None ,
166
167
) -> bh .Histogram :
167
168
return clone (histref ).fill (* (data [c ] for c in data .columns ), sample = sample )
168
169
@@ -171,7 +172,7 @@ def _blocked_df_w(
171
172
data : Any ,
172
173
weights : Any ,
173
174
* ,
174
- histref : bh .Histogram = None ,
175
+ histref : bh .Histogram | None = None ,
175
176
) -> bh .Histogram :
176
177
"""Blocked calculation; single argument; weighted; no sample."""
177
178
return clone (histref ).fill (* (data [c ] for c in data .columns ), weight = weights )
@@ -182,14 +183,18 @@ def _blocked_df_w_s(
182
183
weights : Any ,
183
184
sample : Any ,
184
185
* ,
185
- histref : bh .Histogram = None ,
186
+ histref : bh .Histogram | None = None ,
186
187
) -> bh .Histogram :
187
188
"""Blocked calculation; single argument; weighted; with sample."""
188
189
return clone (histref ).fill (
189
190
* (data [c ] for c in data .columns ), weight = weights , sample = sample
190
191
)
191
192
192
193
194
+ def _blocked_dak (data : Any , * , histref : bh .Histogram | None = None ) -> bh .Histogram :
195
+ return clone (histref ).fill (data )
196
+
197
+
193
198
class AggHistogram (DaskMethodsMixin ):
194
199
"""Aggregated Histogram collection.
195
200
@@ -346,13 +351,13 @@ def to_delayed(self) -> Delayed:
346
351
return Delayed (self .name , dsk , layer = self ._layer )
347
352
348
353
def values (self , flow : bool = False ) -> NDArray [Any ]:
349
- return self .to_boost ().values ()
354
+ return self .to_boost ().values (flow = flow )
350
355
351
356
def variances (self , flow : bool = False ) -> NDArray [Any ] | None :
352
- return self .to_boost ().variances ()
357
+ return self .to_boost ().variances (flow = flow )
353
358
354
359
def counts (self , flow : bool = False ) -> NDArray [Any ]:
355
- return self .to_boost ().counts ()
360
+ return self .to_boost ().counts (flow = flow )
356
361
357
362
def __array__ (self ) -> NDArray [Any ]:
358
363
return self .compute ().__array__ ()
@@ -483,12 +488,15 @@ def histref(self) -> bh.Histogram:
483
488
"""boost_histogram.Histogram: reference histogram."""
484
489
return self ._histref
485
490
486
- def to_agg (self , split_every : int = None ) -> AggHistogram :
491
+ def to_agg (self , split_every : int | None = None ) -> AggHistogram :
487
492
"""Translate into a reduced aggregated histogram."""
488
493
return _reduction (self , split_every = split_every )
489
494
490
495
491
- def _reduction (ph : PartitionedHistogram , split_every : int = None ) -> AggHistogram :
496
+ def _reduction (
497
+ ph : PartitionedHistogram ,
498
+ split_every : int | None = None ,
499
+ ) -> AggHistogram :
492
500
if split_every is None :
493
501
split_every = 4
494
502
if split_every is False :
@@ -568,7 +576,19 @@ def _partitioned_histogram(
568
576
name = f"hist-on-block-{ tokenize (data , histref , weights , sample )} "
569
577
data_is_df = is_dataframe_like (data [0 ])
570
578
_weight_sample_check (* data , weights = weights )
571
- if len (data ) == 1 and not data_is_df :
579
+ if len (data ) == 1 and hasattr (data [0 ], "_typetracer" ):
580
+ from dask_awkward .core import partitionwise_layer as pwlayer
581
+
582
+ x = data [0 ]
583
+ if weights is not None and sample is not None :
584
+ raise NotImplementedError ()
585
+ elif weights is not None and sample is None :
586
+ raise NotImplementedError ()
587
+ elif weights is None and sample is not None :
588
+ raise NotImplementedError ()
589
+ else :
590
+ g = pwlayer (_blocked_dak , name , x , histref = histref )
591
+ elif len (data ) == 1 and not data_is_df :
572
592
x = data [0 ]
573
593
if weights is not None and sample is not None :
574
594
g = partitionwise (
@@ -621,7 +641,6 @@ def _reduced_histogram(
621
641
histref = histref ,
622
642
weights = weights ,
623
643
sample = sample ,
624
- split_every = split_every ,
625
644
)
626
645
return ph .to_agg (split_every = split_every )
627
646
@@ -683,7 +702,11 @@ def to_dask_array(
683
702
684
703
685
704
class BinaryOpAgg :
686
- def __init__ (self , func : Callable [[Any , Any ], Any ], name : str = None ) -> None :
705
+ def __init__ (
706
+ self ,
707
+ func : Callable [[Any , Any ], Any ],
708
+ name : str | None = None ,
709
+ ) -> None :
687
710
self .func = func
688
711
self .__name__ = func .__name__ if name is None else name
689
712
0 commit comments