From c83de375e3b0c0e2098ccec10d7f917b485e7e94 Mon Sep 17 00:00:00 2001 From: Daniil Forshev Date: Fri, 17 Oct 2025 13:55:51 +0500 Subject: [PATCH] feat(seq-ql): add histogram pipe --- asyncsearcher/async_searcher.go | 16 +++++ docs/en/05-seq-ql.md | 14 ++++- docs/ru/05-seq-ql.md | 15 ++++- parser/seqql_filter_test.go | 4 +- parser/seqql_pipes.go | 65 ++++++++++++++++++++- parser/seqql_pipes_test.go | 59 ++++++++++--------- proxyapi/grpc_complex_search.go | 2 +- storeapi/grpc_search.go | 29 ++++++--- tests/integration_tests/integration_test.go | 12 ++++ 9 files changed, 177 insertions(+), 39 deletions(-) diff --git a/asyncsearcher/async_searcher.go b/asyncsearcher/async_searcher.go index c283608c..404b4022 100644 --- a/asyncsearcher/async_searcher.go +++ b/asyncsearcher/async_searcher.go @@ -237,6 +237,14 @@ func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fracmanager.Lis } r.Params.AST = ast.Root + // extract parameters from pipes + for _, pipe := range ast.Pipes { + p, ok := pipe.(*parser.PipeHistogram) + if ok && r.Params.HistInterval == 0 { + r.Params.HistInterval = uint64(p.Interval) + } + } + now := timeNow() if r.Retention < minRetention { return fmt.Errorf("retention time should be at least %s, got %s", minRetention, r.Retention) @@ -350,6 +358,14 @@ func (as *AsyncSearcher) doSearch(id string, fracs fracmanager.List) { panic(fmt.Errorf("BUG: search query must be valid: %s", err)) } info.Request.Params.AST = ast.Root + + // extract parameters from pipes + for _, pipe := range ast.Pipes { + p, ok := pipe.(*parser.PipeHistogram) + if ok && info.Request.Params.HistInterval == 0 { + info.Request.Params.HistInterval = uint64(p.Interval) + } + } } fracsByName := make(map[string]frac.Fraction) diff --git a/docs/en/05-seq-ql.md b/docs/en/05-seq-ql.md index b95be949..a43cf977 100644 --- a/docs/en/05-seq-ql.md +++ b/docs/en/05-seq-ql.md @@ -123,7 +123,7 @@ trace_id:in(123e4567-e89b-12d3-a456-426655440000, '123e4567-e89b-12d3-a456-42665 Pipes in seq-ql are used to sequentially process data. Pipes enable data transformation, enrichment, filtering, aggregation, and formatting of results. -#### `fields` pipe +### `fields` pipe The `fields` pipe allows removing unnecessary fields from documents in the output. This is useful when only specific fields are needed, especially when exporting large datasets to reduce network load. @@ -147,6 +147,18 @@ source_type:access* | fields except payload, cookies In this example, the `payload` and `cookies` fields will be excluded from the result. +### `histogram` pipe + +The `histogram` pipe allows to get a histogram by query. + +The pipe takes `interval` parameter that specifies bucket size in time duration format: `10s` for ten seconds, `1h` for one hour etc. + +Example: + +```seq-ql +source_type:access* | histogram 30s +``` + ## Comments Comments are user-provided text that will be ignored when executing a query. diff --git a/docs/ru/05-seq-ql.md b/docs/ru/05-seq-ql.md index ed030b3c..f6c1db88 100644 --- a/docs/ru/05-seq-ql.md +++ b/docs/ru/05-seq-ql.md @@ -116,7 +116,7 @@ trace_id:in(123e4567-e89b-12d3-a456-426655440000, '123e4567-e89b-12d3-a456-42665 Pipes в seq-ql — это механизм для последовательной обработки данных. Pipes позволяют трансформировать, обогащать, фильтровать, агрегировать данные и форматировать результаты. -#### `fields` pipe +### `fields` pipe Pipe `fields` позволяет удалить лишние поля у документов в выдаче. Это может пригодиться, когда нужно оставить только нужные поля, @@ -142,6 +142,19 @@ source_type:access* | fields except payload, cookies В этом примере поля `payload`, `cookies` будут исключены из результата. +### `histogram` pipe + +Pipe `histogram` позволяет получить агрегацию по поисковому запросу. + +Pipe принимает параметр `interval`, который позволяет указать размер временного интервала для группировки данных: +`10s` для десяти секунды, `1h` для одного часа и так далее. + +Пример: + +```seq-ql +source_type:access* | histogram 30s +``` + ## Комментарии Комментарии – текст, предоставленный пользователем, который будет проигнорирован при выполнении запроса. diff --git a/parser/seqql_filter_test.go b/parser/seqql_filter_test.go index 58bd993b..110b0e5d 100644 --- a/parser/seqql_filter_test.go +++ b/parser/seqql_filter_test.go @@ -391,8 +391,10 @@ func TestParseSeqQLError(t *testing.T) { // Test pipes. test(`message:--||`, `unknown pipe: |`) test(`source_type:access* | fields message | fields except login:admin`, `parsing 'fields' pipe: unexpected symbol ":"`) - test(`source_type:access* | fields message | fields login`, `multiple field filters is not allowed`) + test(`source_type:access* | fields message | fields login`, `multiple field filters are not allowed`) test(`* | fields event, `, `parsing 'fields' pipe: trailing comma not allowed`) + test(`* | histogram 1s | histogram 1s`, `multiple histograms are not allowed`) + test(`* | histogram 123`, `parsing 'histogram' pipe: can't parse histogram interval`) } // edited from the original answer https://stackoverflow.com/a/30230552/11750924 diff --git a/parser/seqql_pipes.go b/parser/seqql_pipes.go index 6e005472..19de31ca 100644 --- a/parser/seqql_pipes.go +++ b/parser/seqql_pipes.go @@ -4,6 +4,9 @@ import ( "fmt" "strconv" "strings" + "time" + + "github.com/ozontech/seq-db/util" ) type Pipe interface { @@ -14,6 +17,8 @@ type Pipe interface { func parsePipes(lex *lexer) ([]Pipe, error) { // Counter of 'fields' pipes. fieldFilters := 0 + histograms := 0 + var pipes []Pipe for !lex.IsEnd() { if !lex.IsKeyword("|") { @@ -29,14 +34,25 @@ func parsePipes(lex *lexer) ([]Pipe, error) { } pipes = append(pipes, p) fieldFilters++ + case lex.IsKeyword("histogram"): + p, err := parsePipeHistogram(lex) + if err != nil { + return nil, fmt.Errorf("parsing 'histogram' pipe: %s", err) + } + pipes = append(pipes, p) + histograms++ default: return nil, fmt.Errorf("unknown pipe: %s", lex.Token) } if fieldFilters > 1 { - return nil, fmt.Errorf("multiple field filters is not allowed") + return nil, fmt.Errorf("multiple field filters are not allowed") + } + if histograms > 1 { + return nil, fmt.Errorf("multiple histograms are not allowed") } } + return pipes, nil } @@ -110,6 +126,51 @@ func parseFieldList(lex *lexer) ([]string, error) { return fields, nil } +type PipeHistogram struct { + Interval int64 +} + +func (h *PipeHistogram) Name() string { + return "histogram" +} + +func (h *PipeHistogram) DumpSeqQL(o *strings.Builder) { + o.WriteString("histogram ") + interval := time.Duration(h.Interval) * time.Millisecond + o.WriteString(quoteTokenIfNeeded(interval.String())) +} + +func parsePipeHistogram(lex *lexer) (*PipeHistogram, error) { + if !lex.IsKeyword("histogram") { + return nil, fmt.Errorf("missing 'histogram' keyword") + } + + lex.Next() + + interval, err := parseInterval(lex) + if err != nil { + return nil, fmt.Errorf("can't parse histogram interval") + } + + return &PipeHistogram{ + Interval: interval, + }, nil +} + +func parseInterval(lex *lexer) (int64, error) { + interval, err := parseCompositeTokenReplaceWildcards(lex) + if err != nil { + return 0, err + } + + parsed, err := util.ParseDuration(interval) + if err != nil { + return 0, err + } + + return parsed.Milliseconds(), nil +} + func quoteTokenIfNeeded(token string) string { if !needQuoteToken(token) { return token @@ -149,7 +210,7 @@ var reservedKeywords = uniqueTokens([]string{ "|", // Pipe specific keywords. - "fields", "except", + "fields", "except", "histogram", }) func needQuoteToken(s string) bool { diff --git a/parser/seqql_pipes_test.go b/parser/seqql_pipes_test.go index 2313b20d..eb6397d9 100644 --- a/parser/seqql_pipes_test.go +++ b/parser/seqql_pipes_test.go @@ -3,38 +3,45 @@ package parser import ( "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestParsePipeFields(t *testing.T) { - test := func(q, expected string) { - t.Helper() - query, err := ParseSeqQL(q, nil) - require.NoError(t, err) - require.Equal(t, expected, query.SeqQLString()) - } - - test("* | fields message,error, level", "* | fields message, error, level") - test("* | fields level", "* | fields level") - test("* | fields level", "* | fields level") - test(`* | fields "_id"`, `* | fields _id`) - test(`* | fields "_\\message\\_"`, `* | fields "_\\message\\_"`) - test(`* | fields "_\\message*"`, `* | fields "_\\message\*"`) - test(`* | fields k8s_namespace`, `* | fields k8s_namespace`) + test(t, "* | fields message,error, level", "* | fields message, error, level") + test(t, "* | fields level", "* | fields level") + test(t, "* | fields level", "* | fields level") + test(t, `* | fields "_id"`, `* | fields _id`) + test(t, `* | fields "_\\message\\_"`, `* | fields "_\\message\\_"`) + test(t, `* | fields "_\\message*"`, `* | fields "_\\message\*"`) + test(t, `* | fields k8s_namespace`, `* | fields k8s_namespace`) } func TestParsePipeFieldsExcept(t *testing.T) { - test := func(q, expected string) { - t.Helper() - query, err := ParseSeqQL(q, nil) - require.NoError(t, err) - require.Equal(t, expected, query.SeqQLString()) - } + test(t, "* | fields except message,error, level", "* | fields except message, error, level") + test(t, "* | fields except level", "* | fields except level") + test(t, `* | fields except "_id"`, `* | fields except _id`) + test(t, `* | fields except "_\\message\\_"`, `* | fields except "_\\message\\_"`) + test(t, `* | fields except "_\\message*"`, `* | fields except "_\\message\*"`) + test(t, `* | fields except k8s_namespace`, `* | fields except k8s_namespace`) +} + +func TestParsePipeHistogram(t *testing.T) { + test(t, `* | histogram 1s`, `* | histogram 1s`) + test(t, `* | histogram 60s`, `* | histogram 1m0s`) + test(t, `* | histogram 1m`, `* | histogram 1m0s`) + test(t, `* | histogram 10m`, `* | histogram 10m0s`) + test(t, `* | histogram 2h`, `* | histogram 2h0m0s`) +} + +func TestPipesComposition(t *testing.T) { + test(t, `* | fields level | histogram 1s`, `* | fields level | histogram 1s`) + test(t, `* | histogram 1s | fields level`, `* | histogram 1s | fields level`) +} - test("* | fields except message,error, level", "* | fields except message, error, level") - test("* | fields except level", "* | fields except level") - test(`* | fields except "_id"`, `* | fields except _id`) - test(`* | fields except "_\\message\\_"`, `* | fields except "_\\message\\_"`) - test(`* | fields except "_\\message*"`, `* | fields except "_\\message\*"`) - test(`* | fields except k8s_namespace`, `* | fields except k8s_namespace`) +func test(t *testing.T, q, expected string) { + t.Helper() + query, err := ParseSeqQL(q, nil) + require.NoError(t, err) + assert.Equal(t, expected, query.SeqQLString()) } diff --git a/proxyapi/grpc_complex_search.go b/proxyapi/grpc_complex_search.go index 846a1b38..70e2a6f5 100644 --- a/proxyapi/grpc_complex_search.go +++ b/proxyapi/grpc_complex_search.go @@ -44,7 +44,7 @@ func (g *grpcV1) ComplexSearch( resp.Aggs = makeProtoAggregation(allAggregations) aggTr.Done() } - if req.Hist != nil { + if sResp.qpr.Histogram != nil { histTr := tr.NewChild("histogram") resp.Hist = makeProtoHistogram(sResp.qpr) histTr.Done() diff --git a/storeapi/grpc_search.go b/storeapi/grpc_search.go index 3a3a2baa..d01591e0 100644 --- a/storeapi/grpc_search.go +++ b/storeapi/grpc_search.go @@ -99,7 +99,7 @@ func (g *GrpcV1) doSearch( t := time.Now() parseQueryTr := tr.NewChild("parse query") - ast, err := g.parseQuery(req.Query) + ast, pipes, err := g.parseQuery(req.Query) if err != nil { parseQueryTr.Done() if code, ok := parseStoreError(err); ok { @@ -125,7 +125,7 @@ func (g *GrpcV1) doSearch( zap.String("query", req.Query), ) - parseQuery, err := g.parseQuery(g.config.Filter.Query) + parseQuery, _, err := g.parseQuery(g.config.Filter.Query) if err != nil { parseQueryTr.Done() if code, ok := parseStoreError(err); ok { @@ -152,10 +152,20 @@ func (g *GrpcV1) doSearch( const millisecondsInSecond = float64(time.Second / time.Millisecond) metric.SearchRangesSeconds.Observe(float64(to-from) / millisecondsInSecond) + histInterval := req.Interval + + // extract parameters from pipes + for _, pipe := range pipes { + p, ok := pipe.(*parser.PipeHistogram) + if ok && histInterval == 0 { + histInterval = p.Interval + } + } + searchParams := processor.SearchParams{ AST: ast, AggQ: aggQ, - HistInterval: uint64(req.Interval), + HistInterval: uint64(histInterval), From: from, To: to, Limit: limit, @@ -216,18 +226,23 @@ func (g *GrpcV1) doSearch( return buildSearchResponse(qpr), nil } -func (g *GrpcV1) parseQuery(query string) (*parser.ASTNode, error) { +func (g *GrpcV1) parseQuery(query string) (*parser.ASTNode, []parser.Pipe, error) { if query == "" { query = seq.TokenAll + ":*" } + var ast *parser.ASTNode + var pipes []parser.Pipe + seqql, err := parser.ParseSeqQL(query, g.mappingProvider.GetMapping()) if err != nil { - return nil, status.Errorf(codes.InvalidArgument, "can't parse query %q: %v", query, err) + return nil, nil, status.Errorf(codes.InvalidArgument, "can't parse query %q: %v", query, err) } - ast := seqql.Root - return ast, nil + ast = seqql.Root + pipes = seqql.Pipes + + return ast, pipes, nil } func (g *GrpcV1) earlierThanOldestFrac(from uint64) bool { diff --git a/tests/integration_tests/integration_test.go b/tests/integration_tests/integration_test.go index 131f5e2d..4e79383b 100644 --- a/tests/integration_tests/integration_test.go +++ b/tests/integration_tests/integration_test.go @@ -1307,6 +1307,18 @@ func (s *IntegrationTestSuite) TestAggNoTotal() { histSum += v } assert.Equal(t, uint64(allDocsNum), histSum, "the sum of the histogram should be equal to the number of all documents") + + // histogram from pipe + qpr, _, _, err = env.Search(`service:x* | histogram 60s`, size, setup.NoFetch()) + require.NoError(t, err, "should be no errors") + assert.Equal(t, uint64(allDocsNum), qpr.Total, "we must scann all docs in withTotal=true mode") + assert.Equal(t, size, len(qpr.IDs), "we must get only size ids") + assert.Equal(t, histCnt, len(qpr.Histogram)) + histSum = uint64(0) + for _, v := range qpr.Histogram { + histSum += v + } + assert.Equal(t, uint64(allDocsNum), histSum, "the sum of the histogram should be equal to the number of all documents") } s.T().Run("ActiveFraction", test)