Skip to content

search: statistic skips for regex constraint #101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 120 additions & 26 deletions search/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
"sort"

"github.com/parquet-go/parquet-go"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"

"github.com/prometheus-community/parquet-common/schema"
Expand All @@ -47,28 +46,48 @@
func MatchersToConstraints(matchers ...*labels.Matcher) ([]Constraint, error) {
r := make([]Constraint, 0, len(matchers))
for _, matcher := range matchers {
var c Constraint
S:
switch matcher.Type {
case labels.MatchEqual:
r = append(r, Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(matcher.Value)))
c = Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(matcher.Value))
case labels.MatchNotEqual:
r = append(r, Not(Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(matcher.Value))))
c = Not(Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(matcher.Value)))
case labels.MatchRegexp:
res, err := labels.NewFastRegexMatcher(matcher.Value)
if matcher.GetRegexString() == ".+" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also handle .* to skip it

c = Not(Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf("")))
break S
}
if set := matcher.SetMatches(); len(set) == 1 {
c = Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(set[0]))
break S
}
rc, err := Regex(schema.LabelToColumn(matcher.Name), matcher)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to construct regex matcher: %w", err)
}
r = append(r, Regex(schema.LabelToColumn(matcher.Name), res))
c = rc
case labels.MatchNotRegexp:
res, err := labels.NewFastRegexMatcher(matcher.Value)
inverted, err := matcher.Inverse()
if err != nil {
return nil, fmt.Errorf("unable to invert matcher: %w", err)
}
if set := inverted.SetMatches(); len(set) == 1 {
c = Not(Equal(schema.LabelToColumn(matcher.Name), parquet.ValueOf(set[0])))
break S
}
rc, err := Regex(schema.LabelToColumn(matcher.Name), inverted)
if err != nil {
return nil, err
return nil, fmt.Errorf("unable to construct regex matcher: %w", err)
}
r = append(r, Not(Regex(schema.LabelToColumn(matcher.Name), res)))
c = Not(rc)
default:
return nil, fmt.Errorf("unsupported matcher type %s", matcher.Type)
}
r = append(r, c)
}
return r, nil

Check failure on line 90 in search/constraint.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofumpt)

Check failure on line 90 in search/constraint.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofumpt)
}

// Initialize prepares the given constraints for use with the specified parquet file.
Expand Down Expand Up @@ -382,6 +401,7 @@
res = append(res, RowRange{pfrom + int64(off), int64(count)})
}
}
parquet.Release(pg)
}

if len(res) == 0 {
Expand Down Expand Up @@ -431,15 +451,26 @@
return !ok, nil
}

func Regex(path string, r *labels.FastRegexMatcher) Constraint {
return &regexConstraint{pth: path, cache: make(map[parquet.Value]bool), r: r}
func Regex(path string, r *labels.Matcher) (Constraint, error) {
if r.Type != labels.MatchRegexp {
return nil, fmt.Errorf("unsupported matcher type: %s", r.Type)
}
return &regexConstraint{pth: path, cache: make(map[parquet.Value]bool), r: r}, nil
}

type regexConstraint struct {
f storage.ParquetFileView
pth string
cache map[parquet.Value]bool
f storage.ParquetFileView
r *labels.FastRegexMatcher

// if its a "set" or "prefix" regex
// for set, those are minv and maxv of the set, for prefix minv is the prefix, maxv is prefix+max(charset)*16
minv parquet.Value
maxv parquet.Value

r *labels.Matcher

comp func(l, r parquet.Value) int
}

func (rc *regexConstraint) String() string {
Expand All @@ -465,13 +496,6 @@
}
cc := rg.ColumnChunks()[col.ColumnIndex]

pgs, err := rc.f.GetPages(ctx, cc, 0, 0)
if err != nil {
return nil, errors.Wrap(err, "failed to get pages")
}

defer func() { _ = pgs.Close() }()

oidx, err := cc.OffsetIndex()
if err != nil {
return nil, fmt.Errorf("unable to read offset index: %w", err)
Expand All @@ -480,11 +504,13 @@
if err != nil {
return nil, fmt.Errorf("unable to read column index: %w", err)
}
var (
symbols = new(symbolTable)
res = make([]RowRange, 0)
)
res := make([]RowRange, 0)

readPgs := make([]pageToRead, 0, 10)

for i := 0; i < cidx.NumPages(); i++ {
poff, pcsz := uint64(oidx.Offset(i)), oidx.CompressedPageSize(i)

// If page does not intersect from, to; we can immediately discard it
pfrom := oidx.FirstRowIndex(i)
pcount := rg.NumRows() - pfrom
Expand All @@ -505,9 +531,56 @@
}
continue
}
// TODO: use setmatches / prefix for statistics
// If we have a special regular expression that works with statistics, we can use them to skip.
// This works for i.e.: 'pod_name=~"thanos-.*"' or 'status_code=~"403|404"'
minv, maxv := cidx.MinValue(i), cidx.MaxValue(i)
if !rc.minv.IsNull() && !rc.maxv.IsNull() {
if !rc.matches(parquet.ValueOf("")) && !maxv.IsNull() && rc.comp(rc.minv, maxv) > 0 {
if cidx.IsDescending() {
break
}
continue
}
if !rc.matches(parquet.ValueOf("")) && !minv.IsNull() && rc.comp(rc.maxv, minv) < 0 {
if cidx.IsAscending() {
break
}
continue
}
}

// We cannot discard the page through statistics but we might need to read it to see if it has the value
readPgs = append(readPgs, pageToRead{pfrom: pfrom, pto: pto, idx: i, off: int(poff), csz: int(pcsz)})
}

// Did not find any pages
if len(readPgs) == 0 {
return intersectRowRanges(simplify(res), rr), nil
}

dictOff, dictSz := rc.f.DictionaryPageBounds(rgIdx, col.ColumnIndex)

minOffset := uint64(readPgs[0].off)
maxOffset := readPgs[len(readPgs)-1].off + readPgs[len(readPgs)-1].csz

// If the gap between the first page and the dic page is less than PagePartitioningMaxGapSize,
// we include the dic to be read in the single read
if int(minOffset-(dictOff+dictSz)) < rc.f.PagePartitioningMaxGapSize() {
minOffset = dictOff
}

pgs, err := rc.f.GetPages(ctx, cc, int64(minOffset), int64(maxOffset))
if err != nil {
return nil, err
}

defer func() { _ = pgs.Close() }()

symbols := new(symbolTable)
for _, p := range readPgs {
pfrom := p.pfrom
pto := p.pto

if err := pgs.SeekToRow(pfrom); err != nil {
return nil, fmt.Errorf("unable to seek to row: %w", err)
}
Expand Down Expand Up @@ -539,7 +612,9 @@
if count != 0 {
res = append(res, RowRange{pfrom + int64(off), int64(count)})
}
parquet.Release(pg)
}

if len(res) == 0 {
return nil, nil
}
Expand All @@ -556,7 +631,26 @@
return fmt.Errorf("schema: cannot search value of kind %s in column of kind %s", stringKind, c.Node.Type().Kind())
}
rc.cache = make(map[parquet.Value]bool)
rc.comp = c.Node.Type().Compare

// if applicable compute the minv and maxv of the implied set of matches
rc.minv = parquet.NullValue()
rc.maxv = parquet.NullValue()
if len(rc.r.SetMatches()) > 0 {
sm := make([]parquet.Value, len(rc.r.SetMatches()))
for i, m := range rc.r.SetMatches() {
sm[i] = parquet.ValueOf(m)
}
rc.minv = slices.MinFunc(sm, rc.comp)
rc.maxv = slices.MaxFunc(sm, rc.comp)
} else if len(rc.r.Prefix()) > 0 {
rc.minv = parquet.ValueOf(rc.r.Prefix())
// 16 is the default prefix length, maybe we should read the actual value from somewhere?
rc.maxv = parquet.ValueOf(append([]byte(rc.r.Prefix()), bytes.Repeat([]byte{0xff}, 16)...))
}

return nil

}

func (rc *regexConstraint) path() string {
Expand All @@ -566,7 +660,7 @@
func (rc *regexConstraint) matches(v parquet.Value) bool {
accept, seen := rc.cache[v]
if !seen {
accept = rc.r.MatchString(util.YoloString(v.ByteArray()))
accept = rc.r.Matches(util.YoloString(v.ByteArray()))
rc.cache[v] = accept
}
return accept
Expand Down
30 changes: 19 additions & 11 deletions search/constraint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,22 @@ func buildFile[T any](t testing.TB, rows []T) storage.ParquetShard {
return shard
}

func mustNewFastRegexMatcher(t testing.TB, s string) *labels.FastRegexMatcher {
res, err := labels.NewFastRegexMatcher(s)
func mustNewMatcher(t testing.TB, s string) *labels.Matcher {
res, err := labels.NewMatcher(labels.MatchRegexp, "doesntmatter", s)
if err != nil {
t.Fatalf("unable to build fast regex matcher: %s", err)
}
return res
}

func mustRegexConstraint(t testing.TB, col string, m *labels.Matcher) Constraint {
res, err := Regex(col, m)
if err != nil {
t.Fatalf("unable to build regex constraint: %s", err)
}
return res
}

func BenchmarkConstraints(b *testing.B) {
type s struct {
A string `parquet:",optional,dict"`
Expand Down Expand Up @@ -112,14 +120,14 @@ func BenchmarkConstraints(b *testing.B) {
c: []Constraint{
Equal("A", parquet.ValueOf(rows[0].A)),
Equal("B", parquet.ValueOf(rows[0].B)),
Regex("Random", mustNewFastRegexMatcher(b, rows[0].Random)),
mustRegexConstraint(b, "Random", mustNewMatcher(b, rows[0].Random)),
},
},
{
c: []Constraint{
Equal("A", parquet.ValueOf(rows[len(rows)-1].A)),
Equal("B", parquet.ValueOf(rows[len(rows)-1].B)),
Regex("Random", mustNewFastRegexMatcher(b, rows[len(rows)-1].Random)),
mustRegexConstraint(b, "Random", mustNewMatcher(b, rows[len(rows)-1].Random)),
},
},
}
Expand Down Expand Up @@ -161,7 +169,7 @@ func TestContextCancelled(t *testing.T) {

for _, c := range []Constraint{
Equal("A", parquet.ValueOf(rows[len(rows)-1].A)),
Regex("A", mustNewFastRegexMatcher(t, rows[len(rows)-1].A)),
mustRegexConstraint(t, "A", mustNewMatcher(t, rows[len(rows)-1].A)),
Not(Equal("A", parquet.ValueOf(rows[len(rows)-1].A))),
} {
if err := Initialize(shard.LabelsFile(), c); err != nil {
Expand Down Expand Up @@ -258,7 +266,7 @@ func TestFilter(t *testing.T) {
},
{
constraints: []Constraint{
Regex("C", mustNewFastRegexMatcher(t, "a|c|d")),
mustRegexConstraint(t, "C", mustNewMatcher(t, "a|c|d")),
},
expect: []RowRange{
{From: 0, Count: 1},
Expand Down Expand Up @@ -368,7 +376,7 @@ func TestFilter(t *testing.T) {
expectations: []expectation{
{
constraints: []Constraint{
Regex("C", mustNewFastRegexMatcher(t, "f.*")),
mustRegexConstraint(t, "C", mustNewMatcher(t, "f.*")),
},
expect: []RowRange{
{From: 0, Count: 1},
Expand All @@ -377,7 +385,7 @@ func TestFilter(t *testing.T) {
},
{
constraints: []Constraint{
Regex("C", mustNewFastRegexMatcher(t, "b.*")),
mustRegexConstraint(t, "C", mustNewMatcher(t, "b.*")),
},
expect: []RowRange{
{From: 1, Count: 1},
Expand All @@ -386,7 +394,7 @@ func TestFilter(t *testing.T) {
},
{
constraints: []Constraint{
Regex("C", mustNewFastRegexMatcher(t, "f.*|b.*")),
mustRegexConstraint(t, "C", mustNewMatcher(t, "f.*|b.*")),
},
expect: []RowRange{
{From: 0, Count: 4},
Expand Down Expand Up @@ -440,14 +448,14 @@ func TestFilter(t *testing.T) {
{
constraints: []Constraint{
Equal("A", parquet.ValueOf("1")),
Regex("None", mustNewFastRegexMatcher(t, "f.*|b.*")),
mustRegexConstraint(t, "None", mustNewMatcher(t, "f.*|b.*")),
},
expect: []RowRange{},
},
{
constraints: []Constraint{
Equal("A", parquet.ValueOf("1")),
Regex("None", mustNewFastRegexMatcher(t, "f.*|b.*|")),
mustRegexConstraint(t, "None", mustNewMatcher(t, "f.*|b.*|")),
},
expect: []RowRange{
{From: 0, Count: 2},
Expand Down
Loading