Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion api/storeapi/store_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ message ExplainEntry {
enum SearchErrorCode {
NO_ERROR = 0;
INGESTOR_QUERY_WANTS_OLD_DATA = 1;
TOO_MANY_UNIQ_VALUES = 2;
TOO_MANY_FRACTIONS_HIT = 3;
TOO_MANY_FIELD_TOKENS = 4;
TOO_MANY_GROUP_TOKENS = 5;
TOO_MANY_FRACTION_TOKENS = 6;
}

message StartAsyncSearchRequest {
Expand Down
4 changes: 3 additions & 1 deletion consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ var (
ErrRequestWasRateLimited = errors.New("request was rate limited")
ErrInvalidAggQuery = errors.New("invalid agg query")
ErrInvalidArgument = errors.New("invalid argument")
ErrTooManyUniqValues = errors.New("aggregation has too many unique values")
ErrTooManyFieldTokens = errors.New("aggregation has too many field tokens")
ErrTooManyGroupTokens = errors.New("aggregation has too many group tokens")
ErrTooManyFractionTokens = errors.New("aggregation has too many fraction tokens")
ErrTooManyFractionsHit = errors.New("too many fractions hit")
)
10 changes: 5 additions & 5 deletions frac/processor/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ type SourcedNodeIterator struct {

tokensCache map[uint32]string

uniqSourcesLimit int
uniqSourcesLimit iteratorLimit
countBySource map[uint32]int

lastID uint32
Expand All @@ -363,7 +363,7 @@ type SourcedNodeIterator struct {
less node.LessFn
}

func NewSourcedNodeIterator(sourced node.Sourced, ti tokenIndex, tids []uint32, limit int, reverse bool) *SourcedNodeIterator {
func NewSourcedNodeIterator(sourced node.Sourced, ti tokenIndex, tids []uint32, limit iteratorLimit, reverse bool) *SourcedNodeIterator {
lastID, lastSource, has := sourced.NextSourced()
return &SourcedNodeIterator{
sourcedNode: sourced,
Expand All @@ -389,14 +389,14 @@ func (s *SourcedNodeIterator) ConsumeTokenSource(lid uint32) (uint32, bool, erro
return 0, false, nil
}

if s.uniqSourcesLimit <= 0 {
if s.uniqSourcesLimit.limit <= 0 {
return s.lastSource, true, nil
}

s.countBySource[s.lastSource]++

if len(s.countBySource) > s.uniqSourcesLimit {
return lid, true, fmt.Errorf("%w: iterator limit is exceeded", consts.ErrTooManyUniqValues)
if len(s.countBySource) > s.uniqSourcesLimit.limit {
return lid, true, fmt.Errorf("%w: iterator limit is exceeded", s.uniqSourcesLimit.err)
}

return s.lastSource, true, nil
Expand Down
49 changes: 42 additions & 7 deletions frac/processor/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ozontech/seq-db/consts"
"github.com/ozontech/seq-db/node"
"github.com/ozontech/seq-db/seq"
)
Expand All @@ -27,7 +28,7 @@ func TestSingleSourceCountAggregator(t *testing.T) {
}

source := node.BuildORTreeAgg(node.MakeStaticNodes(sources), false)
iter := NewSourcedNodeIterator(source, nil, nil, 0, false)
iter := NewSourcedNodeIterator(source, nil, nil, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false)
agg := NewSingleSourceCountAggregator(iter, provideExtractTimeFunc(nil, nil, 0))
for _, id := range searchDocs {
if err := agg.Next(id); err != nil {
Expand Down Expand Up @@ -55,7 +56,7 @@ func TestSingleSourceCountAggregatorWithInterval(t *testing.T) {
}

source := node.BuildORTreeAgg(node.MakeStaticNodes(sources), false)
iter := NewSourcedNodeIterator(source, nil, nil, 0, false)
iter := NewSourcedNodeIterator(source, nil, nil, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false)

agg := NewSingleSourceCountAggregator(iter, func(l seq.LID) seq.MID {
return seq.MID(l) % 3
Expand Down Expand Up @@ -90,7 +91,7 @@ func Generate(n int) ([]uint32, uint32) {
func BenchmarkAggDeep(b *testing.B) {
v, _ := Generate(b.N)
src := node.NewSourcedNodeWrapper(node.NewStatic(v, false), 0)
iter := NewSourcedNodeIterator(src, nil, make([]uint32, 1), 0, false)
iter := NewSourcedNodeIterator(src, nil, make([]uint32, 1), iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false)
n := NewSingleSourceCountAggregator(iter, provideExtractTimeFunc(nil, nil, 0))
vals, _ := Generate(b.N)
b.ResetTimer()
Expand All @@ -115,7 +116,7 @@ func BenchmarkAggWide(b *testing.B) {

source := node.BuildORTreeAgg(node.MakeStaticNodes(wide), false)

iter := NewSourcedNodeIterator(source, nil, make([]uint32, len(wide)), 0, false)
iter := NewSourcedNodeIterator(source, nil, make([]uint32, len(wide)), iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false)
n := NewSingleSourceCountAggregator(iter, provideExtractTimeFunc(nil, nil, 0))
vals, _ := Generate(b.N)
b.ResetTimer()
Expand Down Expand Up @@ -177,8 +178,8 @@ func TestTwoSourceAggregator(t *testing.T) {

fieldTIDs := []uint32{42, 73}
groupByTIDs := []uint32{1, 2}
groupIterator := NewSourcedNodeIterator(groupBy, dp, groupByTIDs, 0, false)
fieldIterator := NewSourcedNodeIterator(field, dp, fieldTIDs, 0, false)
groupIterator := NewSourcedNodeIterator(groupBy, dp, groupByTIDs, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false)
fieldIterator := NewSourcedNodeIterator(field, dp, fieldTIDs, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false)
aggregator := NewGroupAndFieldAggregator(
fieldIterator, groupIterator, provideExtractTimeFunc(nil, nil, 0), true,
)
Expand Down Expand Up @@ -229,7 +230,7 @@ func TestSingleTreeCountAggregator(t *testing.T) {
},
}

iter := NewSourcedNodeIterator(field, dp, []uint32{0}, 0, false)
iter := NewSourcedNodeIterator(field, dp, []uint32{0}, iteratorLimit{limit: 0, err: consts.ErrTooManyGroupTokens}, false)
aggregator := NewSingleSourceCountAggregator(iter, provideExtractTimeFunc(nil, nil, 0))

r.NoError(aggregator.Next(1))
Expand All @@ -251,3 +252,37 @@ func TestSingleTreeCountAggregator(t *testing.T) {
r.Equal(hist.Total, result.SamplesByBin[token].Total)
}
}

func TestAggregatorLimitExceeded(t *testing.T) {
// For now input for this test is incorrect since we support
// aggregations only for `keyword` index type.
// Will be fixed in #310.
searchDocs := []uint32{2, 3, 5, 8, 10, 12, 15}
sources := [][]uint32{
{2, 3, 5, 8, 10, 12},
{1, 4, 6, 9, 11, 13},
{1, 2, 4, 5, 8, 11, 12},
}

const limit = 1

for _, expectedErr := range []error{consts.ErrTooManyGroupTokens, consts.ErrTooManyFieldTokens} {
source := node.BuildORTreeAgg(node.MakeStaticNodes(sources), false)
iter := NewSourcedNodeIterator(source, nil, nil, iteratorLimit{limit: limit, err: expectedErr}, false)
agg := NewSingleSourceCountAggregator(iter, provideExtractTimeFunc(nil, nil, 0))

var limitErr error
var limitIteration int

for i, id := range searchDocs {
if err := agg.Next(id); err != nil {
limitErr = err
limitIteration = i
break
}
}

assert.Equal(t, limit, limitIteration)
assert.ErrorIs(t, limitErr, expectedErr)
}
}
38 changes: 32 additions & 6 deletions frac/processor/eval_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,14 @@ func buildEvalTree(root *parser.ASTNode, minVal, maxVal uint32, stats *searchSta
}

// evalLeaf finds suitable matching fraction tokens and returns Node that generate corresponding tokens LIDs
func evalLeaf(ti tokenIndex, token parser.Token, sw *stopwatch.Stopwatch, stats *searchStats, minLID, maxLID uint32, order seq.DocsOrder) (node.Node, error) {
func evalLeaf(
ti tokenIndex,
token parser.Token,
sw *stopwatch.Stopwatch,
stats *searchStats,
minLID, maxLID uint32,
order seq.DocsOrder,
) (node.Node, error) {
m := sw.Start("get_tids_by_token_expr")
tids, err := ti.GetTIDsByTokenExpr(token)
m.Stop()
Expand Down Expand Up @@ -86,6 +93,13 @@ type AggLimits struct {
MaxTIDsPerFraction int
}

type iteratorLimit struct {
// limit value
limit int
// error to return if limit is exceeded
err error
}

// evalAgg evaluates aggregation with given limits. Returns a suitable aggregator.
func evalAgg(
ti tokenIndex, query AggQuery, sw *stopwatch.Stopwatch,
Expand All @@ -96,7 +110,7 @@ func evalAgg(
case seq.AggFuncCount, seq.AggFuncUnique:
groupIterator, err := iteratorFromLiteral(
ti, query.GroupBy, sw, stats, minLID, maxLID,
limits.MaxTIDsPerFraction, limits.MaxGroupTokens, order,
limits.MaxTIDsPerFraction, iteratorLimit{limit: limits.MaxGroupTokens, err: consts.ErrTooManyGroupTokens}, order,
)
if err != nil {
return nil, err
Expand All @@ -111,7 +125,7 @@ func evalAgg(
case seq.AggFuncMin, seq.AggFuncMax, seq.AggFuncSum, seq.AggFuncAvg, seq.AggFuncQuantile:
fieldIterator, err := iteratorFromLiteral(
ti, query.Field, sw, stats, minLID, maxLID,
limits.MaxTIDsPerFraction, limits.MaxFieldTokens, order,
limits.MaxTIDsPerFraction, iteratorLimit{limit: limits.MaxFieldTokens, err: consts.ErrTooManyFieldTokens}, order,
)
if err != nil {
return nil, err
Expand All @@ -128,7 +142,7 @@ func evalAgg(

groupIterator, err := iteratorFromLiteral(
ti, query.GroupBy, sw, stats, minLID, maxLID,
limits.MaxTIDsPerFraction, limits.MaxGroupTokens, order,
limits.MaxTIDsPerFraction, iteratorLimit{limit: limits.MaxGroupTokens, err: consts.ErrTooManyGroupTokens}, order,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -157,7 +171,16 @@ func haveNotMinMaxQuantiles(quantiles []float64) bool {
return have
}

func iteratorFromLiteral(ti tokenIndex, literal *parser.Literal, sw *stopwatch.Stopwatch, stats *searchStats, minLID, maxLID uint32, maxTIDs, iteratorLimit int, order seq.DocsOrder) (*SourcedNodeIterator, error) {
func iteratorFromLiteral(
ti tokenIndex,
literal *parser.Literal,
sw *stopwatch.Stopwatch,
stats *searchStats,
minLID, maxLID uint32,
maxTIDs int,
iteratorLimit iteratorLimit,
order seq.DocsOrder,
) (*SourcedNodeIterator, error) {
m := sw.Start("get_tids_by_token_expr")
tids, err := ti.GetTIDsByTokenExpr(literal)
m.Stop()
Expand All @@ -166,7 +189,10 @@ func iteratorFromLiteral(ti tokenIndex, literal *parser.Literal, sw *stopwatch.S
}

if len(tids) > maxTIDs && maxTIDs > 0 {
return nil, fmt.Errorf("%w: tokens length (%d) of field %q more than %d", consts.ErrTooManyUniqValues, len(tids), literal.Field, maxTIDs)
return nil, fmt.Errorf(
"%w: tokens length (%d) of field %q more than %d",
consts.ErrTooManyFractionTokens, len(tids), literal.Field, maxTIDs,
)
}

m = sw.Start("get_lids_from_tids")
Expand Down
2 changes: 1 addition & 1 deletion frac/processor/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func IndexSearch(
return nil, err
}
if len(aggsResult[i].SamplesByBin) > aggLimits.MaxGroupTokens && aggLimits.MaxGroupTokens > 0 {
return nil, consts.ErrTooManyUniqValues
return nil, consts.ErrTooManyGroupTokens
}
}
m.Stop()
Expand Down
Loading
Loading