diff --git a/readme-cn.md b/readme-cn.md index 8e6934f..1dd56c3 100644 --- a/readme-cn.md +++ b/readme-cn.md @@ -85,6 +85,7 @@ Clusters: GracePeriod: 10s Compress: false TimeZone: UTC + AddTimestamp: true ``` ## 详细说明 @@ -177,6 +178,9 @@ Offset: first #### TimeZone 默认值为UTC,世界标准时间 +#### AddTimestamp + 自动为每个文档添加 @timestamp 时间戳字段(如果不存在)。时间戳使用 ISO 8601 格式(例如:2022-01-01T00:00:00.000Z)。默认值为 true。设置为 false 可禁用自动时间戳生成。 + diff --git a/readme.md b/readme.md index 7e80d32..1a6de43 100644 --- a/readme.md +++ b/readme.md @@ -88,6 +88,7 @@ Clusters: GracePeriod: 10s Compress: false TimeZone: UTC + AddTimestamp: true ``` ## Details @@ -180,6 +181,9 @@ Offset: first #### TimeZone * Default value is UTC, Universal Standard Time +#### AddTimestamp +* Automatically add a @timestamp field to each document if it doesn't exist. The timestamp uses ISO 8601 format (e.g., 2022-01-01T00:00:00.000Z). Default is true. Set to false to disable automatic timestamp generation. + ## ES performance write test diff --git a/stash/config/config.go b/stash/config/config.go index e03ce46..1e1620b 100644 --- a/stash/config/config.go +++ b/stash/config/config.go @@ -23,6 +23,7 @@ type ( Compress bool `json:",default=false"` Username string `json:",optional"` Password string `json:",optional"` + AddTimestamp bool `json:",default=true"` // Add @timestamp field to documents } Filter struct { diff --git a/stash/etc/config.yaml b/stash/etc/config.yaml index be23a2f..528305b 100644 --- a/stash/etc/config.yaml +++ b/stash/etc/config.yaml @@ -44,3 +44,4 @@ Clusters: - http://172.16.141.4:9200 - http://172.16.141.5:9200 Index: "{.event}-{{yyyy-MM-dd}}" + AddTimestamp: true diff --git a/stash/filter/timestampfilter.go b/stash/filter/timestampfilter.go new file mode 100644 index 0000000..dbf7fc8 --- /dev/null +++ b/stash/filter/timestampfilter.go @@ -0,0 +1,20 @@ +package filter + +import ( + "time" +) + +const ( + timestampFormat = "2006-01-02T15:04:05.000Z" + timestampKey = "@timestamp" +) + +// TimestampFilter adds a @timestamp field to the message if it doesn't exist +func TimestampFilter() FilterFunc { + return func(m map[string]interface{}) map[string]interface{} { + if _, ok := m[timestampKey]; !ok { + m[timestampKey] = time.Now().UTC().Format(timestampFormat) + } + return m + } +} diff --git a/stash/filter/timestampfilter_test.go b/stash/filter/timestampfilter_test.go new file mode 100644 index 0000000..e827dbf --- /dev/null +++ b/stash/filter/timestampfilter_test.go @@ -0,0 +1,49 @@ +package filter + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestTimestampFilter(t *testing.T) { + tests := []struct { + name string + input map[string]interface{} + hasField bool + }{ + { + name: "add timestamp when not present", + input: map[string]interface{}{"field1": "value1"}, + hasField: false, + }, + { + name: "preserve existing timestamp", + input: map[string]interface{}{"@timestamp": "2022-01-01T00:00:00.000Z", "field1": "value1"}, + hasField: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + filter := TimestampFilter() + result := filter(tt.input) + + assert.NotNil(t, result) + assert.Contains(t, result, "@timestamp") + + timestamp, ok := result["@timestamp"].(string) + assert.True(t, ok) + assert.NotEmpty(t, timestamp) + + if tt.hasField { + assert.Equal(t, "2022-01-01T00:00:00.000Z", timestamp) + } else { + // Verify the timestamp format is valid + _, err := time.Parse(timestampFormat, timestamp) + assert.NoError(t, err) + } + }) + } +} diff --git a/stash/handler/handler_test.go b/stash/handler/handler_test.go new file mode 100644 index 0000000..5efb310 --- /dev/null +++ b/stash/handler/handler_test.go @@ -0,0 +1,65 @@ +package handler + +import ( + "testing" + "time" + + jsoniter "github.com/json-iterator/go" + "github.com/kevwan/go-stash/stash/filter" + "github.com/stretchr/testify/assert" +) + +func TestTimestampFilterIntegration(t *testing.T) { + tests := []struct { + name string + input string + expectField bool + expectValue string + }{ + { + name: "message without timestamp should get one", + input: `{"message":"test","level":"info"}`, + expectField: true, + }, + { + name: "message with timestamp should keep it", + input: `{"message":"test","@timestamp":"2022-01-01T00:00:00.000Z"}`, + expectField: true, + expectValue: "2022-01-01T00:00:00.000Z", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Apply the timestamp filter like it would be in production + timestampFilter := filter.TimestampFilter() + + var m map[string]interface{} + err := jsoniter.Unmarshal([]byte(tt.input), &m) + assert.NoError(t, err) + + // Apply filter + result := timestampFilter(m) + assert.NotNil(t, result) + + // Verify timestamp field exists + if tt.expectField { + timestamp, exists := result["@timestamp"] + assert.True(t, exists, "@timestamp field should exist") + assert.NotEmpty(t, timestamp) + + // If we expect a specific value, check it + if tt.expectValue != "" { + assert.Equal(t, tt.expectValue, timestamp) + } else { + // Verify it's a valid timestamp format (ISO 8601) + const iso8601Format = "2006-01-02T15:04:05.000Z" + timestampStr, ok := timestamp.(string) + assert.True(t, ok) + _, err := time.Parse(iso8601Format, timestampStr) + assert.NoError(t, err, "Timestamp should be in ISO 8601 format") + } + } + }) + } +} diff --git a/stash/stash.go b/stash/stash.go index 49aeee5..f544d6d 100644 --- a/stash/stash.go +++ b/stash/stash.go @@ -72,6 +72,11 @@ func main() { } indexer := es.NewIndex(client, processor.Output.ElasticSearch.Index, loc) handle := handler.NewHandler(writer, indexer) + // Add timestamp filter first if enabled (default is true) + // This ensures documents have timestamps before other filters process them + if processor.Output.ElasticSearch.AddTimestamp { + handle.AddFilters(filter.TimestampFilter()) + } handle.AddFilters(filters...) handle.AddFilters(filter.AddUriFieldFilter("url", "uri")) for _, k := range toKqConf(processor.Input.Kafka) {