Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions asyncsearcher/async_searcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,14 @@ func (as *AsyncSearcher) StartSearch(r AsyncSearchRequest, fracs fracmanager.Lis
}
r.Params.AST = ast.Root

// extract parameters from pipes
for _, pipe := range ast.Pipes {
p, ok := pipe.(*parser.PipeHistogram)
if ok && r.Params.HistInterval == 0 {
r.Params.HistInterval = uint64(p.Interval)
}
}

now := timeNow()
if r.Retention < minRetention {
return fmt.Errorf("retention time should be at least %s, got %s", minRetention, r.Retention)
Expand Down Expand Up @@ -350,6 +358,14 @@ func (as *AsyncSearcher) doSearch(id string, fracs fracmanager.List) {
panic(fmt.Errorf("BUG: search query must be valid: %s", err))
}
info.Request.Params.AST = ast.Root

// extract parameters from pipes
for _, pipe := range ast.Pipes {
p, ok := pipe.(*parser.PipeHistogram)
if ok && info.Request.Params.HistInterval == 0 {
info.Request.Params.HistInterval = uint64(p.Interval)
}
}
}

fracsByName := make(map[string]frac.Fraction)
Expand Down
14 changes: 13 additions & 1 deletion docs/en/05-seq-ql.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ trace_id:in(123e4567-e89b-12d3-a456-426655440000, '123e4567-e89b-12d3-a456-42665
Pipes in seq-ql are used to sequentially process data.
Pipes enable data transformation, enrichment, filtering, aggregation, and formatting of results.

#### `fields` pipe
### `fields` pipe

The `fields` pipe allows removing unnecessary fields from documents in the output.
This is useful when only specific fields are needed, especially when exporting large datasets to reduce network load.
Expand All @@ -147,6 +147,18 @@ source_type:access* | fields except payload, cookies

In this example, the `payload` and `cookies` fields will be excluded from the result.

### `histogram` pipe

The `histogram` pipe allows to get a histogram by query.

The pipe takes `interval` parameter that specifies bucket size in time duration format: `10s` for ten seconds, `1h` for one hour etc.

Example:

```seq-ql
source_type:access* | histogram 30s
```

## Comments

Comments are user-provided text that will be ignored when executing a query.
Expand Down
15 changes: 14 additions & 1 deletion docs/ru/05-seq-ql.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ trace_id:in(123e4567-e89b-12d3-a456-426655440000, '123e4567-e89b-12d3-a456-42665
Pipes в seq-ql — это механизм для последовательной обработки данных.
Pipes позволяют трансформировать, обогащать, фильтровать, агрегировать данные и форматировать результаты.

#### `fields` pipe
### `fields` pipe

Pipe `fields` позволяет удалить лишние поля у документов в выдаче.
Это может пригодиться, когда нужно оставить только нужные поля,
Expand All @@ -142,6 +142,19 @@ source_type:access* | fields except payload, cookies

В этом примере поля `payload`, `cookies` будут исключены из результата.

### `histogram` pipe

Pipe `histogram` позволяет получить агрегацию по поисковому запросу.

Pipe принимает параметр `interval`, который позволяет указать размер временного интервала для группировки данных:
`10s` для десяти секунды, `1h` для одного часа и так далее.

Пример:

```seq-ql
source_type:access* | histogram 30s
```

## Комментарии

Комментарии – текст, предоставленный пользователем, который будет проигнорирован при выполнении запроса.
Expand Down
4 changes: 3 additions & 1 deletion parser/seqql_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,10 @@ func TestParseSeqQLError(t *testing.T) {
// Test pipes.
test(`message:--||`, `unknown pipe: |`)
test(`source_type:access* | fields message | fields except login:admin`, `parsing 'fields' pipe: unexpected symbol ":"`)
test(`source_type:access* | fields message | fields login`, `multiple field filters is not allowed`)
test(`source_type:access* | fields message | fields login`, `multiple field filters are not allowed`)
test(`* | fields event, `, `parsing 'fields' pipe: trailing comma not allowed`)
test(`* | histogram 1s | histogram 1s`, `multiple histograms are not allowed`)
test(`* | histogram 123`, `parsing 'histogram' pipe: can't parse histogram interval`)
}

// edited from the original answer https://stackoverflow.com/a/30230552/11750924
Expand Down
65 changes: 63 additions & 2 deletions parser/seqql_pipes.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"fmt"
"strconv"
"strings"
"time"

"github.com/ozontech/seq-db/util"
)

type Pipe interface {
Expand All @@ -14,6 +17,8 @@ type Pipe interface {
func parsePipes(lex *lexer) ([]Pipe, error) {
// Counter of 'fields' pipes.
fieldFilters := 0
histograms := 0

var pipes []Pipe
for !lex.IsEnd() {
if !lex.IsKeyword("|") {
Expand All @@ -29,14 +34,25 @@ func parsePipes(lex *lexer) ([]Pipe, error) {
}
pipes = append(pipes, p)
fieldFilters++
case lex.IsKeyword("histogram"):
p, err := parsePipeHistogram(lex)
if err != nil {
return nil, fmt.Errorf("parsing 'histogram' pipe: %s", err)
}
pipes = append(pipes, p)
histograms++
default:
return nil, fmt.Errorf("unknown pipe: %s", lex.Token)
}

if fieldFilters > 1 {
return nil, fmt.Errorf("multiple field filters is not allowed")
return nil, fmt.Errorf("multiple field filters are not allowed")
}
if histograms > 1 {
return nil, fmt.Errorf("multiple histograms are not allowed")
}
}

return pipes, nil
}

Expand Down Expand Up @@ -110,6 +126,51 @@ func parseFieldList(lex *lexer) ([]string, error) {
return fields, nil
}

type PipeHistogram struct {
Interval int64
}

func (h *PipeHistogram) Name() string {
return "histogram"
}

func (h *PipeHistogram) DumpSeqQL(o *strings.Builder) {
o.WriteString("histogram ")
interval := time.Duration(h.Interval) * time.Millisecond
o.WriteString(quoteTokenIfNeeded(interval.String()))
}

func parsePipeHistogram(lex *lexer) (*PipeHistogram, error) {
if !lex.IsKeyword("histogram") {
return nil, fmt.Errorf("missing 'histogram' keyword")
}

lex.Next()

interval, err := parseInterval(lex)
if err != nil {
return nil, fmt.Errorf("can't parse histogram interval")
}

return &PipeHistogram{
Interval: interval,
}, nil
}

func parseInterval(lex *lexer) (int64, error) {
interval, err := parseCompositeTokenReplaceWildcards(lex)
if err != nil {
return 0, err
}

parsed, err := util.ParseDuration(interval)
if err != nil {
return 0, err
}

return parsed.Milliseconds(), nil
}

func quoteTokenIfNeeded(token string) string {
if !needQuoteToken(token) {
return token
Expand Down Expand Up @@ -149,7 +210,7 @@ var reservedKeywords = uniqueTokens([]string{
"|",

// Pipe specific keywords.
"fields", "except",
"fields", "except", "histogram",
})

func needQuoteToken(s string) bool {
Expand Down
59 changes: 33 additions & 26 deletions parser/seqql_pipes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,45 @@ package parser
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestParsePipeFields(t *testing.T) {
test := func(q, expected string) {
t.Helper()
query, err := ParseSeqQL(q, nil)
require.NoError(t, err)
require.Equal(t, expected, query.SeqQLString())
}

test("* | fields message,error, level", "* | fields message, error, level")
test("* | fields level", "* | fields level")
test("* | fields level", "* | fields level")
test(`* | fields "_id"`, `* | fields _id`)
test(`* | fields "_\\message\\_"`, `* | fields "_\\message\\_"`)
test(`* | fields "_\\message*"`, `* | fields "_\\message\*"`)
test(`* | fields k8s_namespace`, `* | fields k8s_namespace`)
test(t, "* | fields message,error, level", "* | fields message, error, level")
test(t, "* | fields level", "* | fields level")
test(t, "* | fields level", "* | fields level")
test(t, `* | fields "_id"`, `* | fields _id`)
test(t, `* | fields "_\\message\\_"`, `* | fields "_\\message\\_"`)
test(t, `* | fields "_\\message*"`, `* | fields "_\\message\*"`)
test(t, `* | fields k8s_namespace`, `* | fields k8s_namespace`)
}

func TestParsePipeFieldsExcept(t *testing.T) {
test := func(q, expected string) {
t.Helper()
query, err := ParseSeqQL(q, nil)
require.NoError(t, err)
require.Equal(t, expected, query.SeqQLString())
}
test(t, "* | fields except message,error, level", "* | fields except message, error, level")
test(t, "* | fields except level", "* | fields except level")
test(t, `* | fields except "_id"`, `* | fields except _id`)
test(t, `* | fields except "_\\message\\_"`, `* | fields except "_\\message\\_"`)
test(t, `* | fields except "_\\message*"`, `* | fields except "_\\message\*"`)
test(t, `* | fields except k8s_namespace`, `* | fields except k8s_namespace`)
}

func TestParsePipeHistogram(t *testing.T) {
test(t, `* | histogram 1s`, `* | histogram 1s`)
test(t, `* | histogram 60s`, `* | histogram 1m0s`)
test(t, `* | histogram 1m`, `* | histogram 1m0s`)
test(t, `* | histogram 10m`, `* | histogram 10m0s`)
test(t, `* | histogram 2h`, `* | histogram 2h0m0s`)
}

func TestPipesComposition(t *testing.T) {
test(t, `* | fields level | histogram 1s`, `* | fields level | histogram 1s`)
test(t, `* | histogram 1s | fields level`, `* | histogram 1s | fields level`)
}

test("* | fields except message,error, level", "* | fields except message, error, level")
test("* | fields except level", "* | fields except level")
test(`* | fields except "_id"`, `* | fields except _id`)
test(`* | fields except "_\\message\\_"`, `* | fields except "_\\message\\_"`)
test(`* | fields except "_\\message*"`, `* | fields except "_\\message\*"`)
test(`* | fields except k8s_namespace`, `* | fields except k8s_namespace`)
func test(t *testing.T, q, expected string) {
t.Helper()
query, err := ParseSeqQL(q, nil)
require.NoError(t, err)
assert.Equal(t, expected, query.SeqQLString())
}
2 changes: 1 addition & 1 deletion proxyapi/grpc_complex_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (g *grpcV1) ComplexSearch(
resp.Aggs = makeProtoAggregation(allAggregations)
aggTr.Done()
}
if req.Hist != nil {
if sResp.qpr.Histogram != nil {
histTr := tr.NewChild("histogram")
resp.Hist = makeProtoHistogram(sResp.qpr)
histTr.Done()
Expand Down
29 changes: 22 additions & 7 deletions storeapi/grpc_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (g *GrpcV1) doSearch(
t := time.Now()

parseQueryTr := tr.NewChild("parse query")
ast, err := g.parseQuery(req.Query)
ast, pipes, err := g.parseQuery(req.Query)
if err != nil {
parseQueryTr.Done()
if code, ok := parseStoreError(err); ok {
Expand All @@ -125,7 +125,7 @@ func (g *GrpcV1) doSearch(
zap.String("query", req.Query),
)

parseQuery, err := g.parseQuery(g.config.Filter.Query)
parseQuery, _, err := g.parseQuery(g.config.Filter.Query)
if err != nil {
parseQueryTr.Done()
if code, ok := parseStoreError(err); ok {
Expand All @@ -152,10 +152,20 @@ func (g *GrpcV1) doSearch(
const millisecondsInSecond = float64(time.Second / time.Millisecond)
metric.SearchRangesSeconds.Observe(float64(to-from) / millisecondsInSecond)

histInterval := req.Interval

// extract parameters from pipes
for _, pipe := range pipes {
p, ok := pipe.(*parser.PipeHistogram)
if ok && histInterval == 0 {
histInterval = p.Interval
}
}

searchParams := processor.SearchParams{
AST: ast,
AggQ: aggQ,
HistInterval: uint64(req.Interval),
HistInterval: uint64(histInterval),
From: from,
To: to,
Limit: limit,
Expand Down Expand Up @@ -216,18 +226,23 @@ func (g *GrpcV1) doSearch(
return buildSearchResponse(qpr), nil
}

func (g *GrpcV1) parseQuery(query string) (*parser.ASTNode, error) {
func (g *GrpcV1) parseQuery(query string) (*parser.ASTNode, []parser.Pipe, error) {
if query == "" {
query = seq.TokenAll + ":*"
}

var ast *parser.ASTNode
var pipes []parser.Pipe

seqql, err := parser.ParseSeqQL(query, g.mappingProvider.GetMapping())
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "can't parse query %q: %v", query, err)
return nil, nil, status.Errorf(codes.InvalidArgument, "can't parse query %q: %v", query, err)
}
ast := seqql.Root

return ast, nil
ast = seqql.Root
pipes = seqql.Pipes

return ast, pipes, nil
}

func (g *GrpcV1) earlierThanOldestFrac(from uint64) bool {
Expand Down
12 changes: 12 additions & 0 deletions tests/integration_tests/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,18 @@ func (s *IntegrationTestSuite) TestAggNoTotal() {
histSum += v
}
assert.Equal(t, uint64(allDocsNum), histSum, "the sum of the histogram should be equal to the number of all documents")

// histogram from pipe
qpr, _, _, err = env.Search(`service:x* | histogram 60s`, size, setup.NoFetch())
require.NoError(t, err, "should be no errors")
assert.Equal(t, uint64(allDocsNum), qpr.Total, "we must scann all docs in withTotal=true mode")
assert.Equal(t, size, len(qpr.IDs), "we must get only size ids")
assert.Equal(t, histCnt, len(qpr.Histogram))
histSum = uint64(0)
for _, v := range qpr.Histogram {
histSum += v
}
assert.Equal(t, uint64(allDocsNum), histSum, "the sum of the histogram should be equal to the number of all documents")
}

s.T().Run("ActiveFraction", test)
Expand Down
Loading