Skip to content

Commit 5573b1e

Browse files
committed
fix: add distinct aggregation limit error messages
1 parent 36016e6 commit 5573b1e

File tree

9 files changed

+183
-93
lines changed

9 files changed

+183
-93
lines changed

api/storeapi/store_api.proto

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,10 @@ message ExplainEntry {
135135
enum SearchErrorCode {
136136
NO_ERROR = 0;
137137
INGESTOR_QUERY_WANTS_OLD_DATA = 1;
138-
TOO_MANY_UNIQ_VALUES = 2;
139138
TOO_MANY_FRACTIONS_HIT = 3;
139+
TOO_MANY_FIELD_TOKENS = 4;
140+
TOO_MANY_GROUP_TOKENS = 5;
141+
TOO_MANY_FRACTION_TOKENS = 6;
140142
}
141143

142144
message StartAsyncSearchRequest {

consts/consts.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ var (
8080
ErrRequestWasRateLimited = errors.New("request was rate limited")
8181
ErrInvalidAggQuery = errors.New("invalid agg query")
8282
ErrInvalidArgument = errors.New("invalid argument")
83-
ErrTooManyUniqValues = errors.New("aggregation has too many unique values")
83+
ErrTooManyFieldTokens = errors.New("aggregation has too many field tokens")
84+
ErrTooManyGroupTokens = errors.New("aggregation has too many group tokens")
85+
ErrTooManyFractionTokens = errors.New("aggregation has too many fraction tokens")
8486
ErrTooManyFractionsHit = errors.New("too many fractions hit")
8587
)

frac/processor/aggregator.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ type SourcedNodeIterator struct {
353353

354354
tokensCache map[uint32]string
355355

356-
uniqSourcesLimit int
356+
uniqSourcesLimit iteratorLimit
357357
countBySource map[uint32]int
358358

359359
lastID uint32
@@ -363,7 +363,7 @@ type SourcedNodeIterator struct {
363363
less node.LessFn
364364
}
365365

366-
func NewSourcedNodeIterator(sourced node.Sourced, ti tokenIndex, tids []uint32, limit int, reverse bool) *SourcedNodeIterator {
366+
func NewSourcedNodeIterator(sourced node.Sourced, ti tokenIndex, tids []uint32, limit iteratorLimit, reverse bool) *SourcedNodeIterator {
367367
lastID, lastSource, has := sourced.NextSourced()
368368
return &SourcedNodeIterator{
369369
sourcedNode: sourced,
@@ -389,14 +389,14 @@ func (s *SourcedNodeIterator) ConsumeTokenSource(lid uint32) (uint32, bool, erro
389389
return 0, false, nil
390390
}
391391

392-
if s.uniqSourcesLimit <= 0 {
392+
if s.uniqSourcesLimit.limit <= 0 {
393393
return s.lastSource, true, nil
394394
}
395395

396396
s.countBySource[s.lastSource]++
397397

398-
if len(s.countBySource) > s.uniqSourcesLimit {
399-
return lid, true, fmt.Errorf("%w: iterator limit is exceeded", consts.ErrTooManyUniqValues)
398+
if len(s.countBySource) > s.uniqSourcesLimit.limit {
399+
return lid, true, fmt.Errorf("%w: iterator limit is exceeded", s.uniqSourcesLimit.err)
400400
}
401401

402402
return s.lastSource, true, nil

frac/processor/aggregator_test.go

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/stretchr/testify/assert"
1212
"github.com/stretchr/testify/require"
1313

14+
"github.com/ozontech/seq-db/consts"
1415
"github.com/ozontech/seq-db/node"
1516
"github.com/ozontech/seq-db/seq"
1617
)
@@ -27,7 +28,7 @@ func TestSingleSourceCountAggregator(t *testing.T) {
2728
}
2829

2930
source := node.BuildORTreeAgg(node.MakeStaticNodes(sources), false)
30-
iter := NewSourcedNodeIterator(source, nil, nil, 0, false)
31+
iter := NewSourcedNodeIterator(source, nil, nil, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false)
3132
agg := NewSingleSourceCountAggregator(iter, provideExtractTimeFunc(nil, nil, 0))
3233
for _, id := range searchDocs {
3334
if err := agg.Next(id); err != nil {
@@ -55,7 +56,7 @@ func TestSingleSourceCountAggregatorWithInterval(t *testing.T) {
5556
}
5657

5758
source := node.BuildORTreeAgg(node.MakeStaticNodes(sources), false)
58-
iter := NewSourcedNodeIterator(source, nil, nil, 0, false)
59+
iter := NewSourcedNodeIterator(source, nil, nil, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false)
5960

6061
agg := NewSingleSourceCountAggregator(iter, func(l seq.LID) seq.MID {
6162
return seq.MID(l) % 3
@@ -90,7 +91,7 @@ func Generate(n int) ([]uint32, uint32) {
9091
func BenchmarkAggDeep(b *testing.B) {
9192
v, _ := Generate(b.N)
9293
src := node.NewSourcedNodeWrapper(node.NewStatic(v, false), 0)
93-
iter := NewSourcedNodeIterator(src, nil, make([]uint32, 1), 0, false)
94+
iter := NewSourcedNodeIterator(src, nil, make([]uint32, 1), iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false)
9495
n := NewSingleSourceCountAggregator(iter, provideExtractTimeFunc(nil, nil, 0))
9596
vals, _ := Generate(b.N)
9697
b.ResetTimer()
@@ -115,7 +116,7 @@ func BenchmarkAggWide(b *testing.B) {
115116

116117
source := node.BuildORTreeAgg(node.MakeStaticNodes(wide), false)
117118

118-
iter := NewSourcedNodeIterator(source, nil, make([]uint32, len(wide)), 0, false)
119+
iter := NewSourcedNodeIterator(source, nil, make([]uint32, len(wide)), iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false)
119120
n := NewSingleSourceCountAggregator(iter, provideExtractTimeFunc(nil, nil, 0))
120121
vals, _ := Generate(b.N)
121122
b.ResetTimer()
@@ -177,8 +178,8 @@ func TestTwoSourceAggregator(t *testing.T) {
177178

178179
fieldTIDs := []uint32{42, 73}
179180
groupByTIDs := []uint32{1, 2}
180-
groupIterator := NewSourcedNodeIterator(groupBy, dp, groupByTIDs, 0, false)
181-
fieldIterator := NewSourcedNodeIterator(field, dp, fieldTIDs, 0, false)
181+
groupIterator := NewSourcedNodeIterator(groupBy, dp, groupByTIDs, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false)
182+
fieldIterator := NewSourcedNodeIterator(field, dp, fieldTIDs, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false)
182183
aggregator := NewGroupAndFieldAggregator(
183184
fieldIterator, groupIterator, provideExtractTimeFunc(nil, nil, 0), true,
184185
)
@@ -229,7 +230,7 @@ func TestSingleTreeCountAggregator(t *testing.T) {
229230
},
230231
}
231232

232-
iter := NewSourcedNodeIterator(field, dp, []uint32{0}, 0, false)
233+
iter := NewSourcedNodeIterator(field, dp, []uint32{0}, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false)
233234
aggregator := NewSingleSourceCountAggregator(iter, provideExtractTimeFunc(nil, nil, 0))
234235

235236
r.NoError(aggregator.Next(1))
@@ -251,3 +252,37 @@ func TestSingleTreeCountAggregator(t *testing.T) {
251252
r.Equal(hist.Total, result.SamplesByBin[token].Total)
252253
}
253254
}
255+
256+
func TestAggregatorLimitExceeded(t *testing.T) {
257+
// For now input for this test is incorrect since we support
258+
// aggregations only for `keyword` index type.
259+
// Will be fixed in #310.
260+
searchDocs := []uint32{2, 3, 5, 8, 10, 12, 15}
261+
sources := [][]uint32{
262+
{2, 3, 5, 8, 10, 12},
263+
{1, 4, 6, 9, 11, 13},
264+
{1, 2, 4, 5, 8, 11, 12},
265+
}
266+
267+
const limit = 1
268+
269+
for _, expectedErr := range []error{consts.ErrTooManyGroupTokens, consts.ErrTooManyFieldTokens} {
270+
source := node.BuildORTreeAgg(node.MakeStaticNodes(sources), false)
271+
iter := NewSourcedNodeIterator(source, nil, nil, iteratorLimit{limit: limit, err: expectedErr}, false)
272+
agg := NewSingleSourceCountAggregator(iter, provideExtractTimeFunc(nil, nil, 0))
273+
274+
var limitErr error
275+
var limitIteration int
276+
277+
for i, id := range searchDocs {
278+
if err := agg.Next(id); err != nil {
279+
limitErr = err
280+
limitIteration = i
281+
break
282+
}
283+
}
284+
285+
assert.Equal(t, limit, limitIteration)
286+
assert.ErrorIs(t, limitErr, expectedErr)
287+
}
288+
}

frac/processor/eval_tree.go

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,14 @@ func buildEvalTree(root *parser.ASTNode, minVal, maxVal uint32, stats *searchSta
5050
}
5151

5252
// evalLeaf finds suitable matching fraction tokens and returns Node that generate corresponding tokens LIDs
53-
func evalLeaf(ti tokenIndex, token parser.Token, sw *stopwatch.Stopwatch, stats *searchStats, minLID, maxLID uint32, order seq.DocsOrder) (node.Node, error) {
53+
func evalLeaf(
54+
ti tokenIndex,
55+
token parser.Token,
56+
sw *stopwatch.Stopwatch,
57+
stats *searchStats,
58+
minLID, maxLID uint32,
59+
order seq.DocsOrder,
60+
) (node.Node, error) {
5461
m := sw.Start("get_tids_by_token_expr")
5562
tids, err := ti.GetTIDsByTokenExpr(token)
5663
m.Stop()
@@ -86,6 +93,13 @@ type AggLimits struct {
8693
MaxTIDsPerFraction int
8794
}
8895

96+
type iteratorLimit struct {
97+
// limit value
98+
limit int
99+
// error to return if limit is exceeded
100+
err error
101+
}
102+
89103
// evalAgg evaluates aggregation with given limits. Returns a suitable aggregator.
90104
func evalAgg(
91105
ti tokenIndex, query AggQuery, sw *stopwatch.Stopwatch,
@@ -96,7 +110,7 @@ func evalAgg(
96110
case seq.AggFuncCount, seq.AggFuncUnique:
97111
groupIterator, err := iteratorFromLiteral(
98112
ti, query.GroupBy, sw, stats, minLID, maxLID,
99-
limits.MaxTIDsPerFraction, limits.MaxGroupTokens, order,
113+
limits.MaxTIDsPerFraction, iteratorLimit{limit: limits.MaxGroupTokens, err: consts.ErrTooManyGroupTokens}, order,
100114
)
101115
if err != nil {
102116
return nil, err
@@ -111,7 +125,7 @@ func evalAgg(
111125
case seq.AggFuncMin, seq.AggFuncMax, seq.AggFuncSum, seq.AggFuncAvg, seq.AggFuncQuantile:
112126
fieldIterator, err := iteratorFromLiteral(
113127
ti, query.Field, sw, stats, minLID, maxLID,
114-
limits.MaxTIDsPerFraction, limits.MaxFieldTokens, order,
128+
limits.MaxTIDsPerFraction, iteratorLimit{limit: limits.MaxFieldTokens, err: consts.ErrTooManyFieldTokens}, order,
115129
)
116130
if err != nil {
117131
return nil, err
@@ -128,7 +142,7 @@ func evalAgg(
128142

129143
groupIterator, err := iteratorFromLiteral(
130144
ti, query.GroupBy, sw, stats, minLID, maxLID,
131-
limits.MaxTIDsPerFraction, limits.MaxGroupTokens, order,
145+
limits.MaxTIDsPerFraction, iteratorLimit{limit: limits.MaxGroupTokens, err: consts.ErrTooManyGroupTokens}, order,
132146
)
133147
if err != nil {
134148
return nil, err
@@ -157,7 +171,16 @@ func haveNotMinMaxQuantiles(quantiles []float64) bool {
157171
return have
158172
}
159173

160-
func iteratorFromLiteral(ti tokenIndex, literal *parser.Literal, sw *stopwatch.Stopwatch, stats *searchStats, minLID, maxLID uint32, maxTIDs, iteratorLimit int, order seq.DocsOrder) (*SourcedNodeIterator, error) {
174+
func iteratorFromLiteral(
175+
ti tokenIndex,
176+
literal *parser.Literal,
177+
sw *stopwatch.Stopwatch,
178+
stats *searchStats,
179+
minLID, maxLID uint32,
180+
maxTIDs int,
181+
iteratorLimit iteratorLimit,
182+
order seq.DocsOrder,
183+
) (*SourcedNodeIterator, error) {
161184
m := sw.Start("get_tids_by_token_expr")
162185
tids, err := ti.GetTIDsByTokenExpr(literal)
163186
m.Stop()
@@ -166,7 +189,10 @@ func iteratorFromLiteral(ti tokenIndex, literal *parser.Literal, sw *stopwatch.S
166189
}
167190

168191
if len(tids) > maxTIDs && maxTIDs > 0 {
169-
return nil, fmt.Errorf("%w: tokens length (%d) of field %q more than %d", consts.ErrTooManyUniqValues, len(tids), literal.Field, maxTIDs)
192+
return nil, fmt.Errorf(
193+
"%w: tokens length (%d) of field %q more than %d",
194+
consts.ErrTooManyFractionTokens, len(tids), literal.Field, maxTIDs,
195+
)
170196
}
171197

172198
m = sw.Start("get_lids_from_tids")

frac/processor/search.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func IndexSearch(
108108
return nil, err
109109
}
110110
if len(aggsResult[i].SamplesByBin) > aggLimits.MaxGroupTokens && aggLimits.MaxGroupTokens > 0 {
111-
return nil, consts.ErrTooManyUniqValues
111+
return nil, consts.ErrTooManyGroupTokens
112112
}
113113
}
114114
m.Stop()

0 commit comments

Comments
 (0)