Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions readme-cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ Clusters:
GracePeriod: 10s
Compress: false
TimeZone: UTC
AddTimestamp: true
```

## 详细说明
Expand Down Expand Up @@ -177,6 +178,9 @@ Offset: first
#### TimeZone
默认值为UTC,世界标准时间

#### AddTimestamp
自动为每个文档添加 @timestamp 时间戳字段(如果不存在)。时间戳使用 ISO 8601 格式(例如:2022-01-01T00:00:00.000Z)。默认值为 true。设置为 false 可禁用自动时间戳生成。




Expand Down
4 changes: 4 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ Clusters:
GracePeriod: 10s
Compress: false
TimeZone: UTC
AddTimestamp: true
```

## Details
Expand Down Expand Up @@ -180,6 +181,9 @@ Offset: first
#### TimeZone
* Default value is UTC, Universal Standard Time

#### AddTimestamp
* Automatically add a @timestamp field to each document if it doesn't exist. The timestamp uses ISO 8601 format (e.g., 2022-01-01T00:00:00.000Z). Default is true. Set to false to disable automatic timestamp generation.

## ES performance write test


Expand Down
1 change: 1 addition & 0 deletions stash/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type (
Compress bool `json:",default=false"`
Username string `json:",optional"`
Password string `json:",optional"`
AddTimestamp bool `json:",default=true"` // Add @timestamp field to documents
}

Filter struct {
Expand Down
1 change: 1 addition & 0 deletions stash/etc/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ Clusters:
- http://172.16.141.4:9200
- http://172.16.141.5:9200
Index: "{.event}-{{yyyy-MM-dd}}"
AddTimestamp: true
20 changes: 20 additions & 0 deletions stash/filter/timestampfilter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package filter

import (
"time"
)

const (
timestampFormat = "2006-01-02T15:04:05.000Z"
timestampKey = "@timestamp"
)

// TimestampFilter adds a @timestamp field to the message if it doesn't exist
func TimestampFilter() FilterFunc {
return func(m map[string]interface{}) map[string]interface{} {
if _, ok := m[timestampKey]; !ok {
m[timestampKey] = time.Now().UTC().Format(timestampFormat)
}
return m
}
}
49 changes: 49 additions & 0 deletions stash/filter/timestampfilter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package filter

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestTimestampFilter(t *testing.T) {
tests := []struct {
name string
input map[string]interface{}
hasField bool
}{
{
name: "add timestamp when not present",
input: map[string]interface{}{"field1": "value1"},
hasField: false,
},
{
name: "preserve existing timestamp",
input: map[string]interface{}{"@timestamp": "2022-01-01T00:00:00.000Z", "field1": "value1"},
hasField: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
filter := TimestampFilter()
result := filter(tt.input)

assert.NotNil(t, result)
assert.Contains(t, result, "@timestamp")

timestamp, ok := result["@timestamp"].(string)
assert.True(t, ok)
assert.NotEmpty(t, timestamp)

if tt.hasField {
assert.Equal(t, "2022-01-01T00:00:00.000Z", timestamp)
} else {
// Verify the timestamp format is valid
_, err := time.Parse(timestampFormat, timestamp)
assert.NoError(t, err)
}
})
}
}
65 changes: 65 additions & 0 deletions stash/handler/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package handler

import (
"testing"
"time"

jsoniter "github.com/json-iterator/go"
"github.com/kevwan/go-stash/stash/filter"
"github.com/stretchr/testify/assert"
)

func TestTimestampFilterIntegration(t *testing.T) {
tests := []struct {
name string
input string
expectField bool
expectValue string
}{
{
name: "message without timestamp should get one",
input: `{"message":"test","level":"info"}`,
expectField: true,
},
{
name: "message with timestamp should keep it",
input: `{"message":"test","@timestamp":"2022-01-01T00:00:00.000Z"}`,
expectField: true,
expectValue: "2022-01-01T00:00:00.000Z",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Apply the timestamp filter like it would be in production
timestampFilter := filter.TimestampFilter()

var m map[string]interface{}
err := jsoniter.Unmarshal([]byte(tt.input), &m)
assert.NoError(t, err)

// Apply filter
result := timestampFilter(m)
assert.NotNil(t, result)

// Verify timestamp field exists
if tt.expectField {
timestamp, exists := result["@timestamp"]
assert.True(t, exists, "@timestamp field should exist")
assert.NotEmpty(t, timestamp)

// If we expect a specific value, check it
if tt.expectValue != "" {
assert.Equal(t, tt.expectValue, timestamp)
} else {
// Verify it's a valid timestamp format (ISO 8601)
const iso8601Format = "2006-01-02T15:04:05.000Z"
timestampStr, ok := timestamp.(string)
assert.True(t, ok)
_, err := time.Parse(iso8601Format, timestampStr)
assert.NoError(t, err, "Timestamp should be in ISO 8601 format")
}
}
})
}
}
5 changes: 5 additions & 0 deletions stash/stash.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ func main() {
}
indexer := es.NewIndex(client, processor.Output.ElasticSearch.Index, loc)
handle := handler.NewHandler(writer, indexer)
// Add timestamp filter first if enabled (default is true)
// This ensures documents have timestamps before other filters process them
if processor.Output.ElasticSearch.AddTimestamp {
handle.AddFilters(filter.TimestampFilter())
}
handle.AddFilters(filters...)
handle.AddFilters(filter.AddUriFieldFilter("url", "uri"))
for _, k := range toKqConf(processor.Input.Kafka) {
Expand Down