Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ const (
)

var (
TimeFields = [][]string{{"timestamp"}, {"time"}, {"ts"}}
TimeFields = []string{"timestamp", "time", "ts"}
TimeFormats = []string{ESTimeFormat, time.RFC3339Nano, time.RFC3339}
)

Expand Down
8 changes: 4 additions & 4 deletions frac/doc_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ func encodeMeta(buf []byte, tokens []seq.Token, id seq.ID, size int) []byte {

// extractDocTime extract time from doc by supported fields and return that field
// if fields are absent or values are not parsable, zero time and empty string are returned
func extractDocTime(docRoot *insaneJSON.Root) (time.Time, []string) {
func extractDocTime(docRoot *insaneJSON.Root) (time.Time, string) {
var t time.Time
var err error
for _, field := range consts.TimeFields {
timeNode := docRoot.Dig(field...)
timeNode := docRoot.Dig(field)
if timeNode == nil {
continue
}
Expand All @@ -102,13 +102,13 @@ func extractDocTime(docRoot *insaneJSON.Root) (time.Time, []string) {
}
}

return t, nil
return t, ""
}

// ExtractDocTime extracts timestamp from doc
// It searches by one of supported field name and parses by supported formats
// If no field was found or not parsable it returns time.Now()
func ExtractDocTime(docRoot *insaneJSON.Root) (time.Time, []string) {
func ExtractDocTime(docRoot *insaneJSON.Root) (time.Time, string) {
t, f := extractDocTime(docRoot)
if t.IsZero() {
t = time.Now()
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
contrib.go.opencensus.io/exporter/jaeger v0.2.1
github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee
github.com/cep21/circuit/v3 v3.2.2
github.com/go-faster/jx v1.1.0
github.com/golang/mock v1.6.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1
github.com/klauspost/compress v1.17.11
Expand Down Expand Up @@ -36,12 +37,14 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-faster/errors v0.6.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/uber/jaeger-client-go v2.25.0+incompatible // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sync v0.11.0 // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.
github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI=
github.com/go-faster/errors v0.6.1/go.mod h1:5MGV2/2T9yvlrbhe9pD9LO5Z/2zCSq2T8j+Jpi2LAyY=
github.com/go-faster/jx v1.1.0 h1:ZsW3wD+snOdmTDy9eIVgQdjUpXRRV4rqW8NS3t+20bg=
github.com/go-faster/jx v1.1.0/go.mod h1:vKDNikrKoyUmpzaJ0OkIkRQClNHFX/nF3dnTJZb3skg=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
Expand Down Expand Up @@ -248,6 +252,8 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand Down Expand Up @@ -316,6 +322,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20230116083435-1de6713980de h1:DBWn//IJw30uYCgERoxCg84hWtA97F4wMiKOIh00Uf0=
golang.org/x/exp v0.0.0-20230116083435-1de6713980de/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down
136 changes: 85 additions & 51 deletions proxy/bulk/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"slices"
"unsafe"

insaneJSON "github.com/ozontech/insane-json"
"github.com/go-faster/jx"

"github.com/ozontech/seq-db/frac"
"github.com/ozontech/seq-db/seq"
Expand All @@ -29,13 +29,12 @@ type indexer struct {
// we use it to find documents that have this field in search and aggregate requests.
//
// Careful: any reference to i.metas will be invalidated after the call.
func (i *indexer) Index(node *insaneJSON.Node, id seq.ID, size uint32) {
func (i *indexer) Index(d *jx.Decoder, id seq.ID, size uint32) {
// Reset previous state.
i.metas = i.metas[:0]

i.appendMeta(id, size)

i.decodeInternal(node, id, nil, 0)
i.decodeObj(d, nil, 0)

m := i.metas
parent := m[0]
Expand All @@ -51,18 +50,25 @@ func (i *indexer) Metas() []frac.MetaData {

var fieldSeparator = []byte(".")

func (i *indexer) decodeInternal(n *insaneJSON.Node, id seq.ID, name []byte, metaIndex int) {
for _, field := range n.AsFields() {
fieldName := field.AsBytes()
// rawField returns cropped field, in case of strings
// using default d.Raw() will cause returning string value as a quoted string e.g. `"somevalue"`
func rawField(d *jx.Decoder) ([]byte, error) {
switch d.Next() {
case jx.String:
return d.StrBytes()
default:
return d.Raw()
}
}

if len(name) != 0 {
fieldName = bytes.Join([][]byte{name, fieldName}, fieldSeparator)
func (i *indexer) decodeObj(d *jx.Decoder, prevName []byte, metaIndex int) {
_ = d.ObjBytes(func(d *jx.Decoder, fieldName []byte) error {
// first level of nesting
if prevName != nil {
fieldName = bytes.Join([][]byte{prevName, fieldName}, fieldSeparator)
}

var (
mappingTypes seq.MappingTypes
mainType seq.TokenizerType
)
var mappingTypes seq.MappingTypes

switch i.mapping {
case nil:
Expand All @@ -78,35 +84,45 @@ func (i *indexer) decodeInternal(n *insaneJSON.Node, id seq.ID, name []byte, met
mappingTypes = i.mapping[string(fieldName)]
}

mainType = mappingTypes.Main.TokenizerType
if mainType == seq.TokenizerTypeNoop {
// Field is not in the mapping.
continue
}
switch mappingTypes.Main.TokenizerType {
// Field is not in the mapping.
case seq.TokenizerTypeNoop:
return d.Skip()

if mainType == seq.TokenizerTypeObject && field.AsFieldValue().IsObject() {
i.decodeInternal(field.AsFieldValue(), id, fieldName, metaIndex)
continue
}
case seq.TokenizerTypeObject:
if d.Next() == jx.Object {
i.decodeObj(d, fieldName, metaIndex)
return nil
}

if mainType == seq.TokenizerTypeTags && field.AsFieldValue().IsArray() {
i.decodeTags(field.AsFieldValue(), fieldName, metaIndex)
continue
}
case seq.TokenizerTypeTags:
if d.Next() == jx.Array {
i.decodeTags(d, fieldName, metaIndex)
return nil
}

if mainType == seq.TokenizerTypeNested && field.AsFieldValue().IsArray() {
for _, nested := range field.AsFieldValue().AsArray() {
i.appendNestedMeta()
nestedMetaIndex := len(i.metas) - 1
case seq.TokenizerTypeNested:
if d.Next() == jx.Array {
_ = d.Arr(func(d *jx.Decoder) error {
i.appendNestedMeta()
nestedMetaIndex := len(i.metas) - 1

i.decodeInternal(nested, id, fieldName, nestedMetaIndex)
i.decodeObj(d, fieldName, nestedMetaIndex)
return nil
})
return nil
}
continue
}

nodeValue := encodeInsaneNode(field.AsFieldValue())
i.metas[metaIndex].Tokens = i.index(mappingTypes, i.metas[metaIndex].Tokens, fieldName, nodeValue)
}
fieldValue, err := rawField(d)
// in case of error, do not index field
if err != nil {
return err
}

i.metas[metaIndex].Tokens = i.index(mappingTypes, i.metas[metaIndex].Tokens, fieldName, fieldValue)
return nil
})
}

func (i *indexer) index(tokenTypes seq.MappingTypes, tokens []frac.MetaToken, key, value []byte) []frac.MetaToken {
Expand All @@ -133,13 +149,41 @@ func (i *indexer) index(tokenTypes seq.MappingTypes, tokens []frac.MetaToken, ke
return tokens
}

func (i *indexer) decodeTags(n *insaneJSON.Node, name []byte, tokensIndex int) {
for _, tag := range n.AsArray() {
fieldName := tag.Dig("key").AsBytes()
func (i *indexer) decodeTags(d *jx.Decoder, name []byte, tokensIndex int) {
_ = d.Arr(func(d *jx.Decoder) error {
var fieldName, fieldValue []byte
err := d.Obj(func(d *jx.Decoder, objKey string) error {
switch objKey {
case "key":
v, err := rawField(d)
if err != nil {
return err
}
fieldName = v

case "value":
v, err := rawField(d)
if err != nil {
return err
}
fieldValue = v

default:
return d.Skip()
}

return nil
})
// ignoring data in case of parsing error
if err != nil {
return err
}

fieldName = bytes.Join([][]byte{name, fieldName}, fieldSeparator)
nodeValue := encodeInsaneNode(tag.Dig("value"))
i.metas[tokensIndex].Tokens = i.index(i.mapping[string(fieldName)], i.metas[tokensIndex].Tokens, fieldName, nodeValue)
}
i.metas[tokensIndex].Tokens = i.index(i.mapping[string(fieldName)], i.metas[tokensIndex].Tokens, fieldName, fieldValue)

return nil
})
}

// appendMeta increases metas size by 1 and reuses the underlying slices capacity.
Expand All @@ -165,13 +209,3 @@ func (i *indexer) appendNestedMeta() {
const nestedMetadataSize = 0
i.appendMeta(parent.ID, nestedMetadataSize)
}

func encodeInsaneNode(field *insaneJSON.Node) []byte {
if field.IsNil() {
return nil
}
if field.IsArray() || field.IsObject() || field.IsNull() || field.IsTrue() || field.IsFalse() {
return field.Encode(nil)
}
return field.AsBytes()
}
12 changes: 9 additions & 3 deletions proxy/bulk/ingestor.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,12 @@
if originalDoc == nil {
break
}

poolDoc := bytespool.AcquireReset(len(originalDoc))

Check failure on line 274 in proxy/bulk/ingestor.go

View workflow job for this annotation

GitHub Actions / lint

undefined: bytespool.AcquireReset (typecheck)

Check failure on line 274 in proxy/bulk/ingestor.go

View workflow job for this annotation

GitHub Actions / test

undefined: bytespool.AcquireReset
poolDoc.B = append(poolDoc.B, originalDoc...)

parseStart := time.Now()
doc, metas, err := proc.Process(originalDoc, requestTime)
metas, err := proc.Process(poolDoc.B, requestTime)
if err != nil {
if errors.Is(err, errNotAnObject) {
logger.Error("unable to process the document because it is not an object", zap.Any("document", json.RawMessage(originalDoc)))
Expand All @@ -282,11 +286,13 @@
}
parseDuration += time.Since(parseStart)

binaryDocs.B = binary.LittleEndian.AppendUint32(binaryDocs.B, uint32(len(doc)))
binaryDocs.B = append(binaryDocs.B, doc...)
binaryDocs.B = binary.LittleEndian.AppendUint32(binaryDocs.B, uint32(len(originalDoc)))
binaryDocs.B = append(binaryDocs.B, originalDoc...)
for _, meta := range metas {
binaryMetas.B = marshalAppendMeta(binaryMetas.B, meta)
}

bytespool.Release(poolDoc)
total++
}

Expand Down
4 changes: 1 addition & 3 deletions proxy/bulk/ingestor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,9 +493,7 @@ func BenchmarkProcessDocuments(b *testing.B) {
"level":"error",
"timestamp":%q,
"message":"невозможно сохранить данные в шарде",
"error":"circuit breaker execute: can't receive bulk acceptance:
host=***REMOVED***, err=rpc error: code = Unavailable desc = connection error:
desc = \"transport: Error while dialing: dial tcp 10.233.140.20:9002: connect: connection refused\"",
"error":"circuit breaker execute: can't receive bulk acceptance:\n host=***REMOVED***, err=rpc error: code = Unavailable desc = connection error:\n desc = \"transport: Error while dialing: dial tcp 10.233.140.20:9002: connect: connection refused\"",
"shard":0
}`, time.Now().Format(consts.ESTimeFormat)))

Expand Down
Loading
Loading