@@ -21,7 +21,6 @@ import (
21
21
"sort"
22
22
23
23
"github.com/parquet-go/parquet-go"
24
- "github.com/pkg/errors"
25
24
"github.com/prometheus/prometheus/model/labels"
26
25
27
26
"github.com/prometheus-community/parquet-common/schema"
@@ -47,28 +46,48 @@ type Constraint interface {
47
46
func MatchersToConstraints (matchers ... * labels.Matcher ) ([]Constraint , error ) {
48
47
r := make ([]Constraint , 0 , len (matchers ))
49
48
for _ , matcher := range matchers {
49
+ var c Constraint
50
+ S:
50
51
switch matcher .Type {
51
52
case labels .MatchEqual :
52
- r = append ( r , Equal (schema .LabelToColumn (matcher .Name ), parquet .ValueOf (matcher .Value ) ))
53
+ c = Equal (schema .LabelToColumn (matcher .Name ), parquet .ValueOf (matcher .Value ))
53
54
case labels .MatchNotEqual :
54
- r = append ( r , Not (Equal (schema .LabelToColumn (matcher .Name ), parquet .ValueOf (matcher .Value ) )))
55
+ c = Not (Equal (schema .LabelToColumn (matcher .Name ), parquet .ValueOf (matcher .Value )))
55
56
case labels .MatchRegexp :
56
- res , err := labels .NewFastRegexMatcher (matcher .Value )
57
+ if matcher .GetRegexString () == ".+" {
58
+ c = Not (Equal (schema .LabelToColumn (matcher .Name ), parquet .ValueOf ("" )))
59
+ break S
60
+ }
61
+ if set := matcher .SetMatches (); len (set ) == 1 {
62
+ c = Equal (schema .LabelToColumn (matcher .Name ), parquet .ValueOf (set [0 ]))
63
+ break S
64
+ }
65
+ rc , err := Regex (schema .LabelToColumn (matcher .Name ), matcher )
57
66
if err != nil {
58
- return nil , err
67
+ return nil , fmt . Errorf ( "unable to construct regex matcher: %w" , err )
59
68
}
60
- r = append ( r , Regex ( schema . LabelToColumn ( matcher . Name ), res ))
69
+ c = rc
61
70
case labels .MatchNotRegexp :
62
- res , err := labels .NewFastRegexMatcher (matcher .Value )
71
+ inverted , err := matcher .Inverse ()
72
+ if err != nil {
73
+ return nil , fmt .Errorf ("unable to invert matcher: %w" , err )
74
+ }
75
+ if set := inverted .SetMatches (); len (set ) == 1 {
76
+ c = Not (Equal (schema .LabelToColumn (matcher .Name ), parquet .ValueOf (set [0 ])))
77
+ break S
78
+ }
79
+ rc , err := Regex (schema .LabelToColumn (matcher .Name ), inverted )
63
80
if err != nil {
64
- return nil , err
81
+ return nil , fmt . Errorf ( "unable to construct regex matcher: %w" , err )
65
82
}
66
- r = append ( r , Not (Regex ( schema . LabelToColumn ( matcher . Name ), res )) )
83
+ c = Not (rc )
67
84
default :
68
85
return nil , fmt .Errorf ("unsupported matcher type %s" , matcher .Type )
69
86
}
87
+ r = append (r , c )
70
88
}
71
89
return r , nil
90
+
72
91
}
73
92
74
93
// Initialize prepares the given constraints for use with the specified parquet file.
@@ -382,6 +401,7 @@ func (ec *equalConstraint) filter(ctx context.Context, rgIdx int, primary bool,
382
401
res = append (res , RowRange {pfrom + int64 (off ), int64 (count )})
383
402
}
384
403
}
404
+ parquet .Release (pg )
385
405
}
386
406
387
407
if len (res ) == 0 {
@@ -431,15 +451,26 @@ func (ec *equalConstraint) skipByBloomfilter(cc parquet.ColumnChunk) (bool, erro
431
451
return ! ok , nil
432
452
}
433
453
434
- func Regex (path string , r * labels.FastRegexMatcher ) Constraint {
435
- return & regexConstraint {pth : path , cache : make (map [parquet.Value ]bool ), r : r }
454
+ func Regex (path string , r * labels.Matcher ) (Constraint , error ) {
455
+ if r .Type != labels .MatchRegexp {
456
+ return nil , fmt .Errorf ("unsupported matcher type: %s" , r .Type )
457
+ }
458
+ return & regexConstraint {pth : path , cache : make (map [parquet.Value ]bool ), r : r }, nil
436
459
}
437
460
438
461
type regexConstraint struct {
462
+ f storage.ParquetFileView
439
463
pth string
440
464
cache map [parquet.Value ]bool
441
- f storage.ParquetFileView
442
- r * labels.FastRegexMatcher
465
+
466
+ // if its a "set" or "prefix" regex
467
+ // for set, those are minv and maxv of the set, for prefix minv is the prefix, maxv is prefix+max(charset)*16
468
+ minv parquet.Value
469
+ maxv parquet.Value
470
+
471
+ r * labels.Matcher
472
+
473
+ comp func (l , r parquet.Value ) int
443
474
}
444
475
445
476
func (rc * regexConstraint ) String () string {
@@ -465,13 +496,6 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool,
465
496
}
466
497
cc := rg .ColumnChunks ()[col .ColumnIndex ]
467
498
468
- pgs , err := rc .f .GetPages (ctx , cc , 0 , 0 )
469
- if err != nil {
470
- return nil , errors .Wrap (err , "failed to get pages" )
471
- }
472
-
473
- defer func () { _ = pgs .Close () }()
474
-
475
499
oidx , err := cc .OffsetIndex ()
476
500
if err != nil {
477
501
return nil , fmt .Errorf ("unable to read offset index: %w" , err )
@@ -480,11 +504,13 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool,
480
504
if err != nil {
481
505
return nil , fmt .Errorf ("unable to read column index: %w" , err )
482
506
}
483
- var (
484
- symbols = new ( symbolTable )
485
- res = make ([]RowRange , 0 )
486
- )
507
+ res := make ([] RowRange , 0 )
508
+
509
+ readPgs : = make ([]pageToRead , 0 , 10 )
510
+
487
511
for i := 0 ; i < cidx .NumPages (); i ++ {
512
+ poff , pcsz := uint64 (oidx .Offset (i )), oidx .CompressedPageSize (i )
513
+
488
514
// If page does not intersect from, to; we can immediately discard it
489
515
pfrom := oidx .FirstRowIndex (i )
490
516
pcount := rg .NumRows () - pfrom
@@ -505,9 +531,56 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool,
505
531
}
506
532
continue
507
533
}
508
- // TODO: use setmatches / prefix for statistics
534
+ // If we have a special regular expression that works with statistics, we can use them to skip.
535
+ // This works for i.e.: 'pod_name=~"thanos-.*"' or 'status_code=~"403|404"'
536
+ minv , maxv := cidx .MinValue (i ), cidx .MaxValue (i )
537
+ if ! rc .minv .IsNull () && ! rc .maxv .IsNull () {
538
+ if ! rc .matches (parquet .ValueOf ("" )) && ! maxv .IsNull () && rc .comp (rc .minv , maxv ) > 0 {
539
+ if cidx .IsDescending () {
540
+ break
541
+ }
542
+ continue
543
+ }
544
+ if ! rc .matches (parquet .ValueOf ("" )) && ! minv .IsNull () && rc .comp (rc .maxv , minv ) < 0 {
545
+ if cidx .IsAscending () {
546
+ break
547
+ }
548
+ continue
549
+ }
550
+ }
509
551
510
552
// We cannot discard the page through statistics but we might need to read it to see if it has the value
553
+ readPgs = append (readPgs , pageToRead {pfrom : pfrom , pto : pto , idx : i , off : int (poff ), csz : int (pcsz )})
554
+ }
555
+
556
+ // Did not find any pages
557
+ if len (readPgs ) == 0 {
558
+ return intersectRowRanges (simplify (res ), rr ), nil
559
+ }
560
+
561
+ dictOff , dictSz := rc .f .DictionaryPageBounds (rgIdx , col .ColumnIndex )
562
+
563
+ minOffset := uint64 (readPgs [0 ].off )
564
+ maxOffset := readPgs [len (readPgs )- 1 ].off + readPgs [len (readPgs )- 1 ].csz
565
+
566
+ // If the gap between the first page and the dic page is less than PagePartitioningMaxGapSize,
567
+ // we include the dic to be read in the single read
568
+ if int (minOffset - (dictOff + dictSz )) < rc .f .PagePartitioningMaxGapSize () {
569
+ minOffset = dictOff
570
+ }
571
+
572
+ pgs , err := rc .f .GetPages (ctx , cc , int64 (minOffset ), int64 (maxOffset ))
573
+ if err != nil {
574
+ return nil , err
575
+ }
576
+
577
+ defer func () { _ = pgs .Close () }()
578
+
579
+ symbols := new (symbolTable )
580
+ for _ , p := range readPgs {
581
+ pfrom := p .pfrom
582
+ pto := p .pto
583
+
511
584
if err := pgs .SeekToRow (pfrom ); err != nil {
512
585
return nil , fmt .Errorf ("unable to seek to row: %w" , err )
513
586
}
@@ -539,7 +612,9 @@ func (rc *regexConstraint) filter(ctx context.Context, rgIdx int, primary bool,
539
612
if count != 0 {
540
613
res = append (res , RowRange {pfrom + int64 (off ), int64 (count )})
541
614
}
615
+ parquet .Release (pg )
542
616
}
617
+
543
618
if len (res ) == 0 {
544
619
return nil , nil
545
620
}
@@ -556,7 +631,26 @@ func (rc *regexConstraint) init(f storage.ParquetFileView) error {
556
631
return fmt .Errorf ("schema: cannot search value of kind %s in column of kind %s" , stringKind , c .Node .Type ().Kind ())
557
632
}
558
633
rc .cache = make (map [parquet.Value ]bool )
634
+ rc .comp = c .Node .Type ().Compare
635
+
636
+ // if applicable compute the minv and maxv of the implied set of matches
637
+ rc .minv = parquet .NullValue ()
638
+ rc .maxv = parquet .NullValue ()
639
+ if len (rc .r .SetMatches ()) > 0 {
640
+ sm := make ([]parquet.Value , len (rc .r .SetMatches ()))
641
+ for i , m := range rc .r .SetMatches () {
642
+ sm [i ] = parquet .ValueOf (m )
643
+ }
644
+ rc .minv = slices .MinFunc (sm , rc .comp )
645
+ rc .maxv = slices .MaxFunc (sm , rc .comp )
646
+ } else if len (rc .r .Prefix ()) > 0 {
647
+ rc .minv = parquet .ValueOf (rc .r .Prefix ())
648
+ // 16 is the default prefix length, maybe we should read the actual value from somewhere?
649
+ rc .maxv = parquet .ValueOf (append ([]byte (rc .r .Prefix ()), bytes .Repeat ([]byte {0xff }, 16 )... ))
650
+ }
651
+
559
652
return nil
653
+
560
654
}
561
655
562
656
func (rc * regexConstraint ) path () string {
@@ -566,7 +660,7 @@ func (rc *regexConstraint) path() string {
566
660
func (rc * regexConstraint ) matches (v parquet.Value ) bool {
567
661
accept , seen := rc .cache [v ]
568
662
if ! seen {
569
- accept = rc .r .MatchString (util .YoloString (v .ByteArray ()))
663
+ accept = rc .r .Matches (util .YoloString (v .ByteArray ()))
570
664
rc .cache [v ] = accept
571
665
}
572
666
return accept
0 commit comments