diff --git a/consts/consts.go b/consts/consts.go index 23f98a61..de1cc9a2 100644 --- a/consts/consts.go +++ b/consts/consts.go @@ -82,7 +82,7 @@ const ( ) var ( - TimeFields = [][]string{{"timestamp"}, {"time"}, {"ts"}} + TimeFields = []string{"timestamp", "time", "ts"} TimeFormats = []string{ESTimeFormat, time.RFC3339Nano, time.RFC3339} ) diff --git a/frac/doc_provider.go b/frac/doc_provider.go index 708b5db0..f6204501 100644 --- a/frac/doc_provider.go +++ b/frac/doc_provider.go @@ -84,11 +84,11 @@ func encodeMeta(buf []byte, tokens []seq.Token, id seq.ID, size int) []byte { // extractDocTime extract time from doc by supported fields and return that field // if fields are absent or values are not parsable, zero time and empty string are returned -func extractDocTime(docRoot *insaneJSON.Root) (time.Time, []string) { +func extractDocTime(docRoot *insaneJSON.Root) (time.Time, string) { var t time.Time var err error for _, field := range consts.TimeFields { - timeNode := docRoot.Dig(field...) + timeNode := docRoot.Dig(field) if timeNode == nil { continue } @@ -102,13 +102,13 @@ func extractDocTime(docRoot *insaneJSON.Root) (time.Time, []string) { } } - return t, nil + return t, "" } // ExtractDocTime extracts timestamp from doc // It searches by one of supported field name and parses by supported formats // If no field was found or not parsable it returns time.Now() -func ExtractDocTime(docRoot *insaneJSON.Root) (time.Time, []string) { +func ExtractDocTime(docRoot *insaneJSON.Root) (time.Time, string) { t, f := extractDocTime(docRoot) if t.IsZero() { t = time.Now() diff --git a/go.mod b/go.mod index 56adbad7..eb6519bc 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( contrib.go.opencensus.io/exporter/jaeger v0.2.1 github.com/c2h5oh/datasize v0.0.0-20200112174442-28bbd4740fee github.com/cep21/circuit/v3 v3.2.2 + github.com/go-faster/jx v1.1.0 github.com/golang/mock v1.6.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 github.com/klauspost/compress v1.17.11 @@ -36,12 +37,14 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/go-faster/errors v0.6.1 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect + github.com/segmentio/asm v1.2.0 // indirect github.com/uber/jaeger-client-go v2.25.0+incompatible // indirect golang.org/x/net v0.33.0 // indirect golang.org/x/sync v0.11.0 // indirect diff --git a/go.sum b/go.sum index bf0f43c2..c85e5dec 100644 --- a/go.sum +++ b/go.sum @@ -105,6 +105,10 @@ github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go. github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-faster/errors v0.6.1 h1:nNIPOBkprlKzkThvS/0YaX8Zs9KewLCOSFQS5BU06FI= +github.com/go-faster/errors v0.6.1/go.mod h1:5MGV2/2T9yvlrbhe9pD9LO5Z/2zCSq2T8j+Jpi2LAyY= +github.com/go-faster/jx v1.1.0 h1:ZsW3wD+snOdmTDy9eIVgQdjUpXRRV4rqW8NS3t+20bg= +github.com/go-faster/jx v1.1.0/go.mod h1:vKDNikrKoyUmpzaJ0OkIkRQClNHFX/nF3dnTJZb3skg= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -248,6 +252,8 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= +github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -316,6 +322,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20230116083435-1de6713980de h1:DBWn//IJw30uYCgERoxCg84hWtA97F4wMiKOIh00Uf0= +golang.org/x/exp v0.0.0-20230116083435-1de6713980de/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= diff --git a/proxy/bulk/indexer.go b/proxy/bulk/indexer.go index 8eb4eb4e..7c546ee6 100644 --- a/proxy/bulk/indexer.go +++ b/proxy/bulk/indexer.go @@ -5,7 +5,7 @@ import ( "slices" "unsafe" - insaneJSON "github.com/ozontech/insane-json" + "github.com/go-faster/jx" "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/seq" @@ -29,13 +29,12 @@ type indexer struct { // we use it to find documents that have this field in search and aggregate requests. // // Careful: any reference to i.metas will be invalidated after the call. -func (i *indexer) Index(node *insaneJSON.Node, id seq.ID, size uint32) { +func (i *indexer) Index(d *jx.Decoder, id seq.ID, size uint32) { // Reset previous state. i.metas = i.metas[:0] i.appendMeta(id, size) - - i.decodeInternal(node, id, nil, 0) + i.decodeObj(d, nil, 0) m := i.metas parent := m[0] @@ -51,18 +50,25 @@ func (i *indexer) Metas() []frac.MetaData { var fieldSeparator = []byte(".") -func (i *indexer) decodeInternal(n *insaneJSON.Node, id seq.ID, name []byte, metaIndex int) { - for _, field := range n.AsFields() { - fieldName := field.AsBytes() +// rawField returns cropped field, in case of strings +// using default d.Raw() will cause returning string value as a quoted string e.g. `"somevalue"` +func rawField(d *jx.Decoder) ([]byte, error) { + switch d.Next() { + case jx.String: + return d.StrBytes() + default: + return d.Raw() + } +} - if len(name) != 0 { - fieldName = bytes.Join([][]byte{name, fieldName}, fieldSeparator) +func (i *indexer) decodeObj(d *jx.Decoder, prevName []byte, metaIndex int) { + _ = d.ObjBytes(func(d *jx.Decoder, fieldName []byte) error { + // first level of nesting + if prevName != nil { + fieldName = bytes.Join([][]byte{prevName, fieldName}, fieldSeparator) } - var ( - mappingTypes seq.MappingTypes - mainType seq.TokenizerType - ) + var mappingTypes seq.MappingTypes switch i.mapping { case nil: @@ -78,35 +84,45 @@ func (i *indexer) decodeInternal(n *insaneJSON.Node, id seq.ID, name []byte, met mappingTypes = i.mapping[string(fieldName)] } - mainType = mappingTypes.Main.TokenizerType - if mainType == seq.TokenizerTypeNoop { - // Field is not in the mapping. - continue - } + switch mappingTypes.Main.TokenizerType { + // Field is not in the mapping. + case seq.TokenizerTypeNoop: + return d.Skip() - if mainType == seq.TokenizerTypeObject && field.AsFieldValue().IsObject() { - i.decodeInternal(field.AsFieldValue(), id, fieldName, metaIndex) - continue - } + case seq.TokenizerTypeObject: + if d.Next() == jx.Object { + i.decodeObj(d, fieldName, metaIndex) + return nil + } - if mainType == seq.TokenizerTypeTags && field.AsFieldValue().IsArray() { - i.decodeTags(field.AsFieldValue(), fieldName, metaIndex) - continue - } + case seq.TokenizerTypeTags: + if d.Next() == jx.Array { + i.decodeTags(d, fieldName, metaIndex) + return nil + } - if mainType == seq.TokenizerTypeNested && field.AsFieldValue().IsArray() { - for _, nested := range field.AsFieldValue().AsArray() { - i.appendNestedMeta() - nestedMetaIndex := len(i.metas) - 1 + case seq.TokenizerTypeNested: + if d.Next() == jx.Array { + _ = d.Arr(func(d *jx.Decoder) error { + i.appendNestedMeta() + nestedMetaIndex := len(i.metas) - 1 - i.decodeInternal(nested, id, fieldName, nestedMetaIndex) + i.decodeObj(d, fieldName, nestedMetaIndex) + return nil + }) + return nil } - continue } - nodeValue := encodeInsaneNode(field.AsFieldValue()) - i.metas[metaIndex].Tokens = i.index(mappingTypes, i.metas[metaIndex].Tokens, fieldName, nodeValue) - } + fieldValue, err := rawField(d) + // in case of error, do not index field + if err != nil { + return err + } + + i.metas[metaIndex].Tokens = i.index(mappingTypes, i.metas[metaIndex].Tokens, fieldName, fieldValue) + return nil + }) } func (i *indexer) index(tokenTypes seq.MappingTypes, tokens []frac.MetaToken, key, value []byte) []frac.MetaToken { @@ -133,13 +149,41 @@ func (i *indexer) index(tokenTypes seq.MappingTypes, tokens []frac.MetaToken, ke return tokens } -func (i *indexer) decodeTags(n *insaneJSON.Node, name []byte, tokensIndex int) { - for _, tag := range n.AsArray() { - fieldName := tag.Dig("key").AsBytes() +func (i *indexer) decodeTags(d *jx.Decoder, name []byte, tokensIndex int) { + _ = d.Arr(func(d *jx.Decoder) error { + var fieldName, fieldValue []byte + err := d.Obj(func(d *jx.Decoder, objKey string) error { + switch objKey { + case "key": + v, err := rawField(d) + if err != nil { + return err + } + fieldName = v + + case "value": + v, err := rawField(d) + if err != nil { + return err + } + fieldValue = v + + default: + return d.Skip() + } + + return nil + }) + // ignoring data in case of parsing error + if err != nil { + return err + } + fieldName = bytes.Join([][]byte{name, fieldName}, fieldSeparator) - nodeValue := encodeInsaneNode(tag.Dig("value")) - i.metas[tokensIndex].Tokens = i.index(i.mapping[string(fieldName)], i.metas[tokensIndex].Tokens, fieldName, nodeValue) - } + i.metas[tokensIndex].Tokens = i.index(i.mapping[string(fieldName)], i.metas[tokensIndex].Tokens, fieldName, fieldValue) + + return nil + }) } // appendMeta increases metas size by 1 and reuses the underlying slices capacity. @@ -165,13 +209,3 @@ func (i *indexer) appendNestedMeta() { const nestedMetadataSize = 0 i.appendMeta(parent.ID, nestedMetadataSize) } - -func encodeInsaneNode(field *insaneJSON.Node) []byte { - if field.IsNil() { - return nil - } - if field.IsArray() || field.IsObject() || field.IsNull() || field.IsTrue() || field.IsFalse() { - return field.Encode(nil) - } - return field.AsBytes() -} diff --git a/proxy/bulk/ingestor.go b/proxy/bulk/ingestor.go index 45e8cfd7..c182fd36 100644 --- a/proxy/bulk/ingestor.go +++ b/proxy/bulk/ingestor.go @@ -270,8 +270,12 @@ func (i *Ingestor) processDocsToCompressor(ctx context.Context, compressor *frac if originalDoc == nil { break } + + poolDoc := bytespool.AcquireReset(len(originalDoc)) + poolDoc.B = append(poolDoc.B, originalDoc...) + parseStart := time.Now() - doc, metas, err := proc.Process(originalDoc, requestTime) + metas, err := proc.Process(poolDoc.B, requestTime) if err != nil { if errors.Is(err, errNotAnObject) { logger.Error("unable to process the document because it is not an object", zap.Any("document", json.RawMessage(originalDoc))) @@ -282,11 +286,13 @@ func (i *Ingestor) processDocsToCompressor(ctx context.Context, compressor *frac } parseDuration += time.Since(parseStart) - binaryDocs.B = binary.LittleEndian.AppendUint32(binaryDocs.B, uint32(len(doc))) - binaryDocs.B = append(binaryDocs.B, doc...) + binaryDocs.B = binary.LittleEndian.AppendUint32(binaryDocs.B, uint32(len(originalDoc))) + binaryDocs.B = append(binaryDocs.B, originalDoc...) for _, meta := range metas { binaryMetas.B = marshalAppendMeta(binaryMetas.B, meta) } + + bytespool.Release(poolDoc) total++ } diff --git a/proxy/bulk/ingestor_test.go b/proxy/bulk/ingestor_test.go index e9f276f8..9b8539fa 100644 --- a/proxy/bulk/ingestor_test.go +++ b/proxy/bulk/ingestor_test.go @@ -493,9 +493,7 @@ func BenchmarkProcessDocuments(b *testing.B) { "level":"error", "timestamp":%q, "message":"невозможно сохранить данные в шарде", - "error":"circuit breaker execute: can't receive bulk acceptance: - host=***REMOVED***, err=rpc error: code = Unavailable desc = connection error: - desc = \"transport: Error while dialing: dial tcp 10.233.140.20:9002: connect: connection refused\"", + "error":"circuit breaker execute: can't receive bulk acceptance:\n host=***REMOVED***, err=rpc error: code = Unavailable desc = connection error:\n desc = \"transport: Error while dialing: dial tcp 10.233.140.20:9002: connect: connection refused\"", "shard":0 }`, time.Now().Format(consts.ESTimeFormat))) diff --git a/proxy/bulk/processor.go b/proxy/bulk/processor.go index 439c7c90..b86df7a7 100644 --- a/proxy/bulk/processor.go +++ b/proxy/bulk/processor.go @@ -4,11 +4,13 @@ import ( "errors" "math" "math/rand/v2" + "slices" "time" - insaneJSON "github.com/ozontech/insane-json" + "github.com/go-faster/jx" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/ozontech/seq-db/consts" "github.com/ozontech/seq-db/frac" "github.com/ozontech/seq-db/seq" @@ -37,12 +39,7 @@ type processor struct { futureDrift time.Duration indexer *indexer - decoder *insaneJSON.Root -} - -func init() { - // Disable cache for the Dig() method. - insaneJSON.MapUseThreshold = math.MaxInt32 + decoder *jx.Decoder } func newBulkProcessor(mapping seq.Mapping, tokenizers map[seq.TokenizerType]tokenizer.Tokenizer, drift, futureDrift time.Duration, index uint64) *processor { @@ -55,34 +52,35 @@ func newBulkProcessor(mapping seq.Mapping, tokenizers map[seq.TokenizerType]toke mapping: mapping, metas: []frac.MetaData{}, }, - decoder: insaneJSON.Spawn(), + decoder: jx.GetDecoder(), } } var errNotAnObject = errors.New("not an object") -func (p *processor) Process(doc []byte, requestTime time.Time) ([]byte, []frac.MetaData, error) { - err := p.decoder.DecodeBytes(doc) - if err != nil { - return nil, nil, err - } - if !p.decoder.IsObject() { - return nil, nil, errNotAnObject +func (p *processor) Process(doc []byte, requestTime time.Time) ([]frac.MetaData, error) { + p.decoder.ResetBytes(doc) + if p.decoder.Next() != jx.Object { + return nil, errNotAnObject } - docTime, timeField := extractDocTime(p.decoder.Node, requestTime) - docDelay := requestTime.Sub(docTime) - if timeField == nil { + + docTime, _, err := extractDocTime(p.decoder) + if err != nil { // couldn't parse given event time parseErrors.Inc() - } else if documentDelayed(docDelay, p.drift, p.futureDrift) { + docTime = requestTime + } + + docDelay := requestTime.Sub(docTime) + if documentDelayed(docDelay, p.drift, p.futureDrift) { docTime = requestTime } id := seq.NewID(docTime, (rand.Uint64()<<16)+p.proxyIndex) - p.indexer.Index(p.decoder.Node, id, uint32(len(doc))) + p.indexer.Index(p.decoder, id, uint32(len(doc))) - return doc, p.indexer.Metas(), nil + return p.indexer.Metas(), nil } func documentDelayed(docDelay, drift, futureDrift time.Duration) bool { @@ -98,31 +96,53 @@ func documentDelayed(docDelay, drift, futureDrift time.Duration) bool { return delayed } -func extractDocTime(node *insaneJSON.Node, requestTime time.Time) (time.Time, []string) { - for _, field := range consts.TimeFields { - timeVal := node.Dig(field...).AsBytes() - if len(timeVal) == 0 { - continue - } +func extractDocTime(d *jx.Decoder) (time.Time, string, error) { + var ( + timeValue time.Time + timeField string + ok bool + ) + + err := d.Capture(func(d *jx.Decoder) error { + return d.ObjBytes(func(d *jx.Decoder, key []byte) error { + pos := slices.Index(consts.TimeFields, util.ByteToStringUnsafe(key)) + if ok || pos == -1 { + return d.Skip() + } - for _, f := range consts.TimeFormats { - var t time.Time - var ok bool - if f == consts.ESTimeFormat { - // Fallback to optimized es time parsing. - t, ok = parseESTime(util.ByteToStringUnsafe(timeVal)) - } else { - var err error - t, err = time.Parse(f, util.ByteToStringUnsafe(timeVal)) - ok = err == nil + timeField = consts.TimeFields[pos] + timeVal, err := d.StrBytes() + if err != nil { + return err } - if ok { - return t, field + + var timeValStr = util.ByteToStringUnsafe(timeVal) + for _, f := range consts.TimeFormats { + if f == consts.ESTimeFormat { + // Fallback to optimized es time parsing. + timeValue, ok = parseESTime(timeValStr) + } else { + timeValue, err = time.Parse(f, timeValStr) + ok = err == nil + } + + if ok { + return nil + } } - } + return nil + }) + }) + + if err != nil { + return time.Time{}, "", errors.New("parse time error") } - defaultTime := requestTime - return defaultTime, nil + + if !ok { + return time.Time{}, "", errors.New("no time fields found") + } + + return timeValue, timeField, nil } // parseESTime parses time in "2006-01-02 15:04:05.999" format. diff --git a/proxy/bulk/processor_test.go b/proxy/bulk/processor_test.go index 9818ea7b..d2d07592 100644 --- a/proxy/bulk/processor_test.go +++ b/proxy/bulk/processor_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - insaneJSON "github.com/ozontech/insane-json" + "github.com/go-faster/jx" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -16,7 +16,7 @@ func TestExtractDocTime(t *testing.T) { title string input []byte expectedTime time.Time - expectedField []string + expectedField string } // time.Parse uses time.Local if possible @@ -35,30 +35,32 @@ func TestExtractDocTime(t *testing.T) { title: "ESTimeFormat, ts field", input: []byte(`{"message": "hello world", "ts": "2024-04-19 18:04:25.999"}`), expectedTime: time.Date(2024, 4, 19, 18, 4, 25, 999000000, time.UTC), - expectedField: []string{"ts"}, + expectedField: "ts", }, { title: "time.RFC3339Nano, time field", input: []byte(`{"message": "hello world", "time": "2024-04-19T18:04:25.999999999+03:00"}`), expectedTime: time.Date(2024, 4, 19, 18, 4, 25, 999999999, newFixedZone(3*60*60)), - expectedField: []string{"time"}, + expectedField: "time", }, { title: "time.RFC3339, timestamp field", input: []byte(`{"message": "hello world", "timestamp": "2024-04-19T18:04:25+03:00"}`), expectedTime: time.Date(2024, 4, 19, 18, 4, 25, 0, newFixedZone(3*60*60)), - expectedField: []string{"timestamp"}, + expectedField: "timestamp", }, } for _, tc := range testCases { t.Run(tc.title, func(t *testing.T) { - root := insaneJSON.Spawn() - defer insaneJSON.Release(root) + d := jx.GetDecoder() + defer jx.PutDecoder(d) - require.NoError(t, root.DecodeBytes(tc.input)) + d.ResetBytes(tc.input) - docTime, timeField := extractDocTime(root.Node, time.Now()) + docTime, timeField, err := extractDocTime(d) + + require.NoError(t, err) assert.Equal(t, tc.expectedTime, docTime) assert.Equal(t, tc.expectedField, timeField) }) @@ -72,15 +74,14 @@ func TestExtractDocTimeUnknownTimeFormat(t *testing.T) { []byte(`{"message": "hello world 3", "timestamp": "2024-04-19T17:08:21.203+0500"}`), } - root := insaneJSON.Spawn() - defer insaneJSON.Release(root) + d := jx.GetDecoder() + defer jx.PutDecoder(d) for _, input := range inputs { - assert.NoError(t, root.DecodeBytes(input)) + d.ResetBytes(input) - docTime, timeField := extractDocTime(root.Node, time.Now()) - assert.Nil(t, timeField) - assert.NotEqual(t, 1, docTime.Year()) + _, _, err := extractDocTime(d) + require.Error(t, err) } } diff --git a/tests/integration_tests/integration_test.go b/tests/integration_tests/integration_test.go index 726b7366..89919c3f 100644 --- a/tests/integration_tests/integration_test.go +++ b/tests/integration_tests/integration_test.go @@ -770,7 +770,6 @@ func (s *IntegrationTestSuite) TestAggStat() { `{"service": "sum1", "v":1}`, `{"service": "sum1", "v":-1}`, `{"service": "sum1", "v":-0}`, - `{"service": "sum1", "v":+0}`, `{"service": "sum1", "v":0}`, `{"service": "sum1"}`, // test negative values @@ -942,7 +941,6 @@ func (s *IntegrationTestSuite) TestAggStat() { `{"v":1, "service":"sum_without_group_by"}`, `{"v":2, "service":"sum_without_group_by"}`, `{"v":-0, "service":"sum_without_group_by"}`, - `{"v":+0, "service":"sum_without_group_by"}`, `{"v":0, "service":"sum_without_group_by"}`, }, SearchQuery: `service:"sum_without_group_by"`,