diff --git a/search/constraint.go b/search/constraint.go index 1db078e..e976c2d 100644 --- a/search/constraint.go +++ b/search/constraint.go @@ -21,7 +21,6 @@ import ( "sort" "github.com/parquet-go/parquet-go" - "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus-community/parquet-common/schema" @@ -47,28 +46,48 @@ type Constraint interface { func MatchersToConstraints(matchers ...*labels.Matcher) ([]Constraint, error) { r := make([]Constraint, 0, len(matchers)) for _, matcher := range matchers { + var c Constraint + S: switch matcher.Type { case labels.MatchEqual: - r = append(r, Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(matcher.Value))) + c = Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(matcher.Value)) case labels.MatchNotEqual: - r = append(r, Not(Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(matcher.Value)))) + c = Not(Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(matcher.Value))) case labels.MatchRegexp: - res, err := labels.NewFastRegexMatcher(matcher.Value) + if matcher.GetRegexString() == ".+" { + c = Not(Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(""))) + break S + } + if set := matcher.SetMatches(); len(set) == 1 { + c = Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(set[0])) + break S + } + rc, err := Regex(schema.LabelToColumn(matcher.Name), matcher) if err != nil { - return nil, err + return nil, fmt.Errorf("unable to construct regex matcher: %w", err) } - r = append(r, Regex(schema.LabelToColumn(matcher.Name), res)) + c = rc case labels.MatchNotRegexp: - res, err := labels.NewFastRegexMatcher(matcher.Value) + inverted, err := matcher.Inverse() + if err != nil { + return nil, fmt.Errorf("unable to invert matcher: %w", err) + } + if set := inverted.SetMatches(); len(set) == 1 { + c = Not(Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(set[0]))) + break S + } + rc, err := Regex(schema.LabelToColumn(matcher.Name), inverted) if err != nil { - return nil, err + return nil, fmt.Errorf("unable to construct regex matcher: %w", err) } - r = append(r, Not(Regex(schema.LabelToColumn(matcher.Name), res))) + c = Not(rc) default: return nil, fmt.Errorf("unsupported matcher type %s", matcher.Type) } + r = append(r, c) } return r, nil + } // Initialize prepares the given constraints for use with the specified parquet file. @@ -382,6 +401,7 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool, res = append(res, RowRange{pfrom + int64(off), int64(count)}) } } + parquet.Release(pg) } if len(res) == 0 { @@ -431,15 +451,26 @@ func (ec *equalConstraint) skipByBloomfilter(cc parquet.ColumnChunk) (bool, erro return !ok, nil } -func Regex(path string, r *labels.FastRegexMatcher) Constraint { - return ®exConstraint{pth: path, cache: make(map[parquet.Value]bool), r: r} +func Regex(path string, r *labels.Matcher) (Constraint, error) { + if r.Type != labels.MatchRegexp { + return nil, fmt.Errorf("unsupported matcher type: %s", r.Type) + } + return ®exConstraint{pth: path, cache: make(map[parquet.Value]bool), r: r}, nil } type regexConstraint struct { + f storage.ParquetFileView pth string cache map[parquet.Value]bool - f storage.ParquetFileView - r *labels.FastRegexMatcher + + // if its a "set" or "prefix" regex + // for set, those are minv and maxv of the set, for prefix minv is the prefix, maxv is prefix+max(charset)*16 + minv parquet.Value + maxv parquet.Value + + r *labels.Matcher + + comp func(l, r parquet.Value) int } func (rc *regexConstraint) String() string { @@ -465,13 +496,6 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, } cc := rg.ColumnChunks()[col.ColumnIndex] - pgs, err := rc.f.GetPages(ctx, cc, 0, 0) - if err != nil { - return nil, errors.Wrap(err, "failed to get pages") - } - - defer func() { _ = pgs.Close() }() - oidx, err := cc.OffsetIndex() if err != nil { return nil, fmt.Errorf("unable to read offset index: %w", err) @@ -480,11 +504,13 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, if err != nil { return nil, fmt.Errorf("unable to read column index: %w", err) } - var ( - symbols = new(symbolTable) - res = make([]RowRange, 0) - ) + res := make([]RowRange, 0) + + readPgs := make([]pageToRead, 0, 10) + for i := 0; i < cidx.NumPages(); i++ { + poff, pcsz := uint64(oidx.Offset(i)), oidx.CompressedPageSize(i) + // If page does not intersect from, to; we can immediately discard it pfrom := oidx.FirstRowIndex(i) pcount := rg.NumRows() - pfrom @@ -505,9 +531,56 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, } continue } - // TODO: use setmatches / prefix for statistics + // If we have a special regular expression that works with statistics, we can use them to skip. + // This works for i.e.: 'pod_name=~"thanos-.*"' or 'status_code=~"403|404"' + minv, maxv := cidx.MinValue(i), cidx.MaxValue(i) + if !rc.minv.IsNull() && !rc.maxv.IsNull() { + if !rc.matches(parquet.ValueOf("")) && !maxv.IsNull() && rc.comp(rc.minv, maxv) > 0 { + if cidx.IsDescending() { + break + } + continue + } + if !rc.matches(parquet.ValueOf("")) && !minv.IsNull() && rc.comp(rc.maxv, minv) < 0 { + if cidx.IsAscending() { + break + } + continue + } + } // We cannot discard the page through statistics but we might need to read it to see if it has the value + readPgs = append(readPgs, pageToRead{pfrom: pfrom, pto: pto, idx: i, off: int(poff), csz: int(pcsz)}) + } + + // Did not find any pages + if len(readPgs) == 0 { + return intersectRowRanges(simplify(res), rr), nil + } + + dictOff, dictSz := rc.f.DictionaryPageBounds(rgIdx, col.ColumnIndex) + + minOffset := uint64(readPgs[0].off) + maxOffset := readPgs[len(readPgs)-1].off + readPgs[len(readPgs)-1].csz + + // If the gap between the first page and the dic page is less than PagePartitioningMaxGapSize, + // we include the dic to be read in the single read + if int(minOffset-(dictOff+dictSz)) < rc.f.PagePartitioningMaxGapSize() { + minOffset = dictOff + } + + pgs, err := rc.f.GetPages(ctx, cc, int64(minOffset), int64(maxOffset)) + if err != nil { + return nil, err + } + + defer func() { _ = pgs.Close() }() + + symbols := new(symbolTable) + for _, p := range readPgs { + pfrom := p.pfrom + pto := p.pto + if err := pgs.SeekToRow(pfrom); err != nil { return nil, fmt.Errorf("unable to seek to row: %w", err) } @@ -539,7 +612,9 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool, if count != 0 { res = append(res, RowRange{pfrom + int64(off), int64(count)}) } + parquet.Release(pg) } + if len(res) == 0 { return nil, nil } @@ -556,7 +631,26 @@ func (rc *regexConstraint) init(f storage.ParquetFileView) error { return fmt.Errorf("schema: cannot search value of kind %s in column of kind %s", stringKind, c.Node.Type().Kind()) } rc.cache = make(map[parquet.Value]bool) + rc.comp = c.Node.Type().Compare + + // if applicable compute the minv and maxv of the implied set of matches + rc.minv = parquet.NullValue() + rc.maxv = parquet.NullValue() + if len(rc.r.SetMatches()) > 0 { + sm := make([]parquet.Value, len(rc.r.SetMatches())) + for i, m := range rc.r.SetMatches() { + sm[i] = parquet.ValueOf(m) + } + rc.minv = slices.MinFunc(sm, rc.comp) + rc.maxv = slices.MaxFunc(sm, rc.comp) + } else if len(rc.r.Prefix()) > 0 { + rc.minv = parquet.ValueOf(rc.r.Prefix()) + // 16 is the default prefix length, maybe we should read the actual value from somewhere? + rc.maxv = parquet.ValueOf(append([]byte(rc.r.Prefix()), bytes.Repeat([]byte{0xff}, 16)...)) + } + return nil + } func (rc *regexConstraint) path() string { @@ -566,7 +660,7 @@ func (rc *regexConstraint) path() string { func (rc *regexConstraint) matches(v parquet.Value) bool { accept, seen := rc.cache[v] if !seen { - accept = rc.r.MatchString(util.YoloString(v.ByteArray())) + accept = rc.r.Matches(util.YoloString(v.ByteArray())) rc.cache[v] = accept } return accept diff --git a/search/constraint_test.go b/search/constraint_test.go index f60fe52..518b76b 100644 --- a/search/constraint_test.go +++ b/search/constraint_test.go @@ -62,14 +62,22 @@ func buildFile[T any](t testing.TB, rows []T) storage.ParquetShard { return shard } -func mustNewFastRegexMatcher(t testing.TB, s string) *labels.FastRegexMatcher { - res, err := labels.NewFastRegexMatcher(s) +func mustNewMatcher(t testing.TB, s string) *labels.Matcher { + res, err := labels.NewMatcher(labels.MatchRegexp, "doesntmatter", s) if err != nil { t.Fatalf("unable to build fast regex matcher: %s", err) } return res } +func mustRegexConstraint(t testing.TB, col string, m *labels.Matcher) Constraint { + res, err := Regex(col, m) + if err != nil { + t.Fatalf("unable to build regex constraint: %s", err) + } + return res +} + func BenchmarkConstraints(b *testing.B) { type s struct { A string `parquet:",optional,dict"` @@ -112,14 +120,14 @@ func BenchmarkConstraints(b *testing.B) { c: []Constraint{ Equal("A", parquet.ValueOf(rows[0].A)), Equal("B", parquet.ValueOf(rows[0].B)), - Regex("Random", mustNewFastRegexMatcher(b, rows[0].Random)), + mustRegexConstraint(b, "Random", mustNewMatcher(b, rows[0].Random)), }, }, { c: []Constraint{ Equal("A", parquet.ValueOf(rows[len(rows)-1].A)), Equal("B", parquet.ValueOf(rows[len(rows)-1].B)), - Regex("Random", mustNewFastRegexMatcher(b, rows[len(rows)-1].Random)), + mustRegexConstraint(b, "Random", mustNewMatcher(b, rows[len(rows)-1].Random)), }, }, } @@ -161,7 +169,7 @@ func TestContextCancelled(t *testing.T) { for _, c := range []Constraint{ Equal("A", parquet.ValueOf(rows[len(rows)-1].A)), - Regex("A", mustNewFastRegexMatcher(t, rows[len(rows)-1].A)), + mustRegexConstraint(t, "A", mustNewMatcher(t, rows[len(rows)-1].A)), Not(Equal("A", parquet.ValueOf(rows[len(rows)-1].A))), } { if err := Initialize(shard.LabelsFile(), c); err != nil { @@ -258,7 +266,7 @@ func TestFilter(t *testing.T) { }, { constraints: []Constraint{ - Regex("C", mustNewFastRegexMatcher(t, "a|c|d")), + mustRegexConstraint(t, "C", mustNewMatcher(t, "a|c|d")), }, expect: []RowRange{ {From: 0, Count: 1}, @@ -368,7 +376,7 @@ func TestFilter(t *testing.T) { expectations: []expectation{ { constraints: []Constraint{ - Regex("C", mustNewFastRegexMatcher(t, "f.*")), + mustRegexConstraint(t, "C", mustNewMatcher(t, "f.*")), }, expect: []RowRange{ {From: 0, Count: 1}, @@ -377,7 +385,7 @@ func TestFilter(t *testing.T) { }, { constraints: []Constraint{ - Regex("C", mustNewFastRegexMatcher(t, "b.*")), + mustRegexConstraint(t, "C", mustNewMatcher(t, "b.*")), }, expect: []RowRange{ {From: 1, Count: 1}, @@ -386,7 +394,7 @@ func TestFilter(t *testing.T) { }, { constraints: []Constraint{ - Regex("C", mustNewFastRegexMatcher(t, "f.*|b.*")), + mustRegexConstraint(t, "C", mustNewMatcher(t, "f.*|b.*")), }, expect: []RowRange{ {From: 0, Count: 4}, @@ -440,14 +448,14 @@ func TestFilter(t *testing.T) { { constraints: []Constraint{ Equal("A", parquet.ValueOf("1")), - Regex("None", mustNewFastRegexMatcher(t, "f.*|b.*")), + mustRegexConstraint(t, "None", mustNewMatcher(t, "f.*|b.*")), }, expect: []RowRange{}, }, { constraints: []Constraint{ Equal("A", parquet.ValueOf("1")), - Regex("None", mustNewFastRegexMatcher(t, "f.*|b.*|")), + mustRegexConstraint(t, "None", mustNewMatcher(t, "f.*|b.*|")), }, expect: []RowRange{ {From: 0, Count: 2},