From 058505c5b88f75058a1baba9299967ec28ad1310 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 11 Jun 2025 16:58:56 +0700 Subject: [PATCH 01/19] add plugin event_to_metrics --- cmd/file.d/file.d.go | 1 + .../event_to_metrics/event_to_metrics.go | 81 +++++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 plugin/action/event_to_metrics/event_to_metrics.go diff --git a/cmd/file.d/file.d.go b/cmd/file.d/file.d.go index 7d83102a1..cd152166d 100644 --- a/cmd/file.d/file.d.go +++ b/cmd/file.d/file.d.go @@ -25,6 +25,7 @@ import ( _ "github.com/ozontech/file.d/plugin/action/debug" _ "github.com/ozontech/file.d/plugin/action/decode" _ "github.com/ozontech/file.d/plugin/action/discard" + _ "github.com/ozontech/file.d/plugin/action/event_to_metrics" _ "github.com/ozontech/file.d/plugin/action/flatten" _ "github.com/ozontech/file.d/plugin/action/hash" _ "github.com/ozontech/file.d/plugin/action/join" diff --git a/plugin/action/event_to_metrics/event_to_metrics.go b/plugin/action/event_to_metrics/event_to_metrics.go new file mode 100644 index 000000000..e0ecd790f --- /dev/null +++ b/plugin/action/event_to_metrics/event_to_metrics.go @@ -0,0 +1,81 @@ +package event_to_metrics + +import ( + "github.com/ozontech/file.d/fd" + "github.com/ozontech/file.d/pipeline" + insaneJSON "github.com/ozontech/insane-json" + "go.uber.org/zap" +) + +/*{ introduction +Get metric from event +}*/ + +type Plugin struct { + config *Config + logger *zap.Logger + pluginController pipeline.ActionPluginController +} + +// ! config-params +// ^ config-params +type Config struct { + Metrics []Metric +} + +type Metric struct { + Name string `json:"name"` + Value string `json:"value"` + Labels map[string]string `json:"labels"` +} + +func init() { + fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{ + Type: "event_to_metrics", + Factory: factory, + }) +} + +func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { + return &Plugin{}, &Config{} +} + +func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) { + p.config = config.(*Config) + p.logger = params.Logger.Desugar() + p.pluginController = params.Controller +} + +func (p *Plugin) Stop() { +} + +func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { + children := make([]*insaneJSON.Node, 0, len(p.config.Metrics)) + for _, metric := range p.config.Metrics { + elem := new(insaneJSON.Node) + object := elem.MutateToObject() + + object.AddField("name").MutateToBytes([]byte(metric.Name)) + object.AddField("value").MutateToInt(1) + + if len(metric.Labels) > 0 { + labelsObject := object.AddField("labels").MutateToObject() + + for labelName, labelValue := range metric.Labels { + node := event.Root.Dig(labelValue) + value := node.AsString() + labelsObject.AddField(labelName).MutateToBytes([]byte(value)) + } + } + + children = append(children, elem) + } + + if len(children) == 0 { + // zero array or an array that does not contain objects + return pipeline.ActionPass + } + + p.pluginController.Spawn(event, children) + return pipeline.ActionBreak +} From 3233ca14a3143e375793c6bf4c4b097eb4bbc9a8 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Mon, 16 Jun 2025 19:20:27 +0700 Subject: [PATCH 02/19] sample of prometheus output plugin --- cmd/file.d/file.d.go | 1 + go.mod | 10 +- go.sum | 26 +- .../event_to_metrics/event_to_metrics.go | 7 +- plugin/output/prometheus/prometheus.go | 362 ++++++++++++++++++ 5 files changed, 391 insertions(+), 15 deletions(-) create mode 100644 plugin/output/prometheus/prometheus.go diff --git a/cmd/file.d/file.d.go b/cmd/file.d/file.d.go index cd152166d..9e7e3e8e3 100644 --- a/cmd/file.d/file.d.go +++ b/cmd/file.d/file.d.go @@ -60,6 +60,7 @@ import ( _ "github.com/ozontech/file.d/plugin/output/kafka" _ "github.com/ozontech/file.d/plugin/output/loki" _ "github.com/ozontech/file.d/plugin/output/postgres" + _ "github.com/ozontech/file.d/plugin/output/prometheus" _ "github.com/ozontech/file.d/plugin/output/s3" _ "github.com/ozontech/file.d/plugin/output/splunk" _ "github.com/ozontech/file.d/plugin/output/stdout" diff --git a/go.mod b/go.mod index 468b32619..7bb9d7d53 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/bitly/go-simplejson v0.5.1 github.com/bmatcuk/doublestar/v4 v4.0.2 github.com/bufbuild/protocompile v0.13.0 + github.com/castai/promwrite v0.5.0 github.com/cenkalti/backoff/v4 v4.3.0 github.com/cespare/xxhash/v2 v2.3.0 github.com/dominikbraun/graph v0.23.0 @@ -67,11 +68,11 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cilium/ebpf v0.9.1 // indirect github.com/containerd/cgroups/v3 v3.0.1 // indirect - github.com/coreos/go-systemd/v22 v22.3.2 // indirect + github.com/coreos/go-systemd/v22 v22.4.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dmarkham/enumer v1.5.8 // indirect - github.com/docker/go-units v0.4.0 // indirect + github.com/docker/go-units v0.5.0 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/go-faster/city v1.0.1 // indirect github.com/go-faster/errors v0.6.1 // indirect @@ -85,6 +86,7 @@ require ( github.com/godbus/dbus/v5 v5.0.4 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -98,7 +100,7 @@ require ( github.com/hashicorp/go-sockaddr v1.0.7 // indirect github.com/hashicorp/go-version v1.6.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect - github.com/imdario/mergo v0.3.11 // indirect + github.com/imdario/mergo v0.3.12 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect @@ -122,6 +124,7 @@ require ( github.com/pascaldekloe/name v1.0.1 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/prometheus v0.40.3 // indirect github.com/ryanuber/go-glob v1.0.0 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/sirupsen/logrus v1.8.1 // indirect @@ -150,7 +153,6 @@ require ( golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect google.golang.org/appengine v1.6.7 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gopkg.in/ini.v1 v1.62.0 // indirect k8s.io/klog/v2 v2.110.1 // indirect k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect diff --git a/go.sum b/go.sum index bb0a9091c..fe5ed79f7 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,8 @@ github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/bufbuild/protocompile v0.13.0 h1:6cwUB0Y2tSvmNxsbunwzmIto3xOlJOV7ALALuVOs92M= github.com/bufbuild/protocompile v0.13.0/go.mod h1:dr++fGGeMPWHv7jPeT06ZKukm45NJscd7rUxQVzEKRk= +github.com/castai/promwrite v0.5.0 h1:AxpHvaeWPqk+GLqLix0JkALzwLk5ZIMUemqvL4AAv5k= +github.com/castai/promwrite v0.5.0/go.mod h1:PCwrucOaNJAcKdR8Tktz+/pQEXOnCWFL+2Yk7c9DmEU= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -47,8 +49,8 @@ github.com/containerd/cgroups/v3 v3.0.1 h1:4hfGvu8rfGIwVIDd+nLzn/B9ZXx4BcCjzt5To github.com/containerd/cgroups/v3 v3.0.1/go.mod h1:/vtwk1VXrtoa5AaZLkypuOJgA/6DyPMZHJPGQNtlHnw= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= -github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= -github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/coreos/go-systemd/v22 v22.4.0 h1:y9YHcjnjynCd/DVbg5j9L/33jQM3MxJlbj/zWskzfGU= +github.com/coreos/go-systemd/v22 v22.4.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -58,8 +60,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dmarkham/enumer v1.5.8 h1:fIF11F9l5jyD++YYvxcSH5WgHfeaSGPaN/T4kOQ4qEM= github.com/dmarkham/enumer v1.5.8/go.mod h1:d10o8R3t/gROm2p3BXqTkMt2+HMuxEmWCXzorAruYak= -github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= -github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= +github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo= github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc= github.com/elliotchance/orderedmap/v2 v2.4.0 h1:6tUmMwD9F998FNpwFxA5E6NQvSpk2PVw7RKsVq3+2Cw= @@ -113,6 +115,8 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -121,8 +125,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= -github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20221102093814-76f304f74e5e h1:F1LLQqQ8WoIbyoxLUY+JUZe1kuHdxThM6CPUATzE6Io= +github.com/google/pprof v0.0.0-20221102093814-76f304f74e5e/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -155,8 +159,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/vault/api v1.16.0 h1:nbEYGJiAPGzT9U4oWgaaB0g+Rj8E59QuHKyA5LhwQN4= github.com/hashicorp/vault/api v1.16.0/go.mod h1:KhuUhzOD8lDSk29AtzNjgAu2kxRA9jL9NAbkFlqvkBA= -github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA= -github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= +github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= +github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= @@ -295,6 +299,8 @@ github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/prometheus/prometheus v0.40.3 h1:oMw1vVyrxHTigXAcFY6QHrGUnQEbKEOKo737cPgYBwY= +github.com/prometheus/prometheus v0.40.3/go.mod h1:/UhsWkOXkO11wqTW2Bx5YDOwRweSDcaFBlTIzFe7P0Y= github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI= github.com/redis/go-redis/v9 v9.8.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/rjeczalik/notify v0.9.3 h1:6rJAzHTGKXGj76sbRgDiDcYj/HniypXmSJo1SWakZeY= @@ -540,8 +546,8 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= -gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU= -gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/ini.v1 v1.66.6 h1:LATuAqN/shcYAOkv3wl2L4rkaKqkcgTBQjOyYDvcPKI= +gopkg.in/ini.v1 v1.66.6/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/plugin/action/event_to_metrics/event_to_metrics.go b/plugin/action/event_to_metrics/event_to_metrics.go index e0ecd790f..9657c3ecf 100644 --- a/plugin/action/event_to_metrics/event_to_metrics.go +++ b/plugin/action/event_to_metrics/event_to_metrics.go @@ -56,7 +56,12 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { object := elem.MutateToObject() object.AddField("name").MutateToBytes([]byte(metric.Name)) - object.AddField("value").MutateToInt(1) + if len(metric.Value) == 0 { + object.AddField("value").MutateToInt(1) + } else { + valueNode := event.Root.Dig(metric.Value).AsFloat() + object.AddField("value").MutateToFloat(valueNode) + } if len(metric.Labels) > 0 { labelsObject := object.AddField("labels").MutateToObject() diff --git a/plugin/output/prometheus/prometheus.go b/plugin/output/prometheus/prometheus.go new file mode 100644 index 000000000..2cdee1203 --- /dev/null +++ b/plugin/output/prometheus/prometheus.go @@ -0,0 +1,362 @@ +package prometheus + +import ( + "context" + "errors" + "time" + + "github.com/castai/promwrite" + "github.com/ozontech/file.d/cfg" + "github.com/ozontech/file.d/fd" + "github.com/ozontech/file.d/metric" + "github.com/ozontech/file.d/pipeline" + + insaneJSON "github.com/ozontech/insane-json" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +/*{ introduction +It sends the logs batches to Loki using HTTP API. +}*/ + +var errUnixNanoFormat = errors.New("please send time in UnixNano format or add a convert_date action") + +const ( + outPluginType = "prometheus" +) + +type data struct { + outBuf []byte +} + +type Label struct { + Label string `json:"label" required:"true"` + Value string `json:"value" required:"true"` +} + +// ! config-params +// ^ config-params +type Config struct { + Endpoint string `json:"endpoint" required:"true" default:"http://localhost:9090/api/v1/write"` // * + + // > @3@4@5@6 + // > + // > Auth config. + // > + // > `AuthConfig` params: + // > * `strategy` describes strategy to use; options:"disabled|tenant|basic|bearer" + // > By default strategy is `disabled`. + // > * `tenant_id` should be provided if strategy is `tenant`. + // > * `username` should be provided if strategy is `basic`. + // > Username is used for HTTP Basic Authentication. + // > * `password` should be provided if strategy is `basic`. + // > Password is used for HTTP Basic Authentication. + // > * `bearer_token` should be provided if strategy is `bearer`. + // > Token is used for HTTP Bearer Authentication. + Auth AuthConfig `json:"auth" child:"true"` // * + + // > @3@4@5@6 + // > + // > If set true, the plugin will use SSL/TLS connections method. + TLSEnabled bool `json:"tls_enabled" default:"false"` // * + + // > @3@4@5@6 + // > + // > If set, the plugin will skip SSL/TLS verification. + TLSSkipVerify bool `json:"tls_skip_verify" default:"false"` // * + + // > @3@4@5@6 + // > + // > Client timeout when sends requests to Loki HTTP API. + RequestTimeout cfg.Duration `json:"request_timeout" default:"1s" parse:"duration"` // * + RequestTimeout_ time.Duration + + // > @3@4@5@6 + // > + // > It defines how much time to wait for the connection. + ConnectionTimeout cfg.Duration `json:"connection_timeout" default:"5s" parse:"duration"` // * + ConnectionTimeout_ time.Duration + + // > @3@4@5@6 + // > + // > Keep-alive config. + // > + // > `KeepAliveConfig` params: + // > * `max_idle_conn_duration` - idle keep-alive connections are closed after this duration. + // > By default idle connections are closed after `10s`. + // > * `max_conn_duration` - keep-alive connections are closed after this duration. + // > If set to `0` - connection duration is unlimited. + // > By default connection duration is `5m`. + KeepAlive KeepAliveConfig `json:"keep_alive" child:"true"` // * + + // > @3@4@5@6 + // > + // > How much workers will be instantiated to send batches. + // > It also configures the amount of minimum and maximum number of database connections. + WorkersCount cfg.Expression `json:"workers_count" default:"gomaxprocs*4" parse:"expression"` // * + WorkersCount_ int + + // > @3@4@5@6 + // > + // > Maximum quantity of events to pack into one batch. + BatchSize cfg.Expression `json:"batch_size" default:"capacity/4" parse:"expression"` // * + BatchSize_ int + + // > @3@4@5@6 + // > + // > A minimum size of events in a batch to send. + // > If both batch_size and batch_size_bytes are set, they will work together. + BatchSizeBytes cfg.Expression `json:"batch_size_bytes" default:"0" parse:"expression"` // * + BatchSizeBytes_ int + + // > @3@4@5@6 + // > + // > After this timeout batch will be sent even if batch isn't completed. + BatchFlushTimeout cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` // * + BatchFlushTimeout_ time.Duration + + // > @3@4@5@6 + // > + // > Retention milliseconds for retry to Loki. + Retention cfg.Duration `json:"retention" default:"1s" parse:"duration"` // * + Retention_ time.Duration + + // > @3@4@5@6 + // > + // > Retries of insertion. If File.d cannot insert for this number of attempts, + // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). + Retry int `json:"retry" default:"10"` // * + + // > @3@4@5@6 + // > + // > After an insert error, fall with a non-zero exit code or not + // > **Experimental feature** + FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * + + // > @3@4@5@6 + // > + // > Multiplier for exponential increase of retention between retries + RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // * +} + +type AuthStrategy byte + +const ( + StrategyDisabled AuthStrategy = iota + StrategyTenant + StrategyBasic + StrategyBearer +) + +// ! config-params +// ^ config-params +type AuthConfig struct { + // > AuthStrategy.Strategy describes strategy to use. + Strategy string `json:"strategy" default:"disabled" options:"disabled|tenant|basic|bearer"` + Strategy_ AuthStrategy + + // > TenantID for Tenant Authentication. + TenantID string `json:"tenant_id"` + + // > Username for HTTP Basic Authentication. + Username string `json:"username"` + + // > Password for HTTP Basic Authentication. + Password string `json:"password"` + + // > Token for HTTP Bearer Authentication. + BearerToken string `json:"bearer_token"` +} + +type KeepAliveConfig struct { + // Idle keep-alive connections are closed after this duration. + MaxIdleConnDuration cfg.Duration `json:"max_idle_conn_duration" parse:"duration" default:"10s"` + MaxIdleConnDuration_ time.Duration + + // Keep-alive connections are closed after this duration. + MaxConnDuration cfg.Duration `json:"max_conn_duration" parse:"duration" default:"5m"` + MaxConnDuration_ time.Duration +} + +type Plugin struct { + controller pipeline.OutputPluginController + logger *zap.Logger + config *Config + avgEventSize int + + ctx context.Context + cancel context.CancelFunc + + client *promwrite.Client + batcher *pipeline.RetriableBatcher + + // plugin metrics + sendErrorMetric *prometheus.CounterVec +} + +func init() { + fd.DefaultPluginRegistry.RegisterOutput(&pipeline.PluginStaticInfo{ + Type: outPluginType, + Factory: Factory, + }) +} + +func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { + return &Plugin{}, &Config{} +} + +func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams) { + p.controller = params.Controller + p.config = config.(*Config) + p.logger = params.Logger.Desugar() + p.avgEventSize = params.PipelineSettings.AvgEventSize + p.registerMetrics(params.MetricCtl) + + p.prepareClient() + + batcherOpts := &pipeline.BatcherOptions{ + PipelineName: params.PipelineName, + OutputType: outPluginType, + Controller: p.controller, + Workers: p.config.WorkersCount_, + BatchSizeCount: p.config.BatchSize_, + BatchSizeBytes: p.config.BatchSizeBytes_, + FlushTimeout: p.config.BatchFlushTimeout_, + MetricCtl: params.MetricCtl, + } + + backoffOpts := pipeline.BackoffOpts{ + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: p.config.Retry, + } + + onError := func(err error) { + var level zapcore.Level + if p.config.FatalOnFailedInsert { + level = zapcore.FatalLevel + } else { + level = zapcore.ErrorLevel + } + + p.logger.Log(level, "can't send data to loki", zap.Error(err), + zap.Int("retries", p.config.Retry)) + } + + p.batcher = pipeline.NewRetriableBatcher( + batcherOpts, + p.out, + backoffOpts, + onError, + ) + + ctx, cancel := context.WithCancel(context.Background()) + p.ctx = ctx + p.cancel = cancel + + p.batcher.Start(ctx) +} + +func (p *Plugin) Stop() { + p.batcher.Stop() + p.cancel() +} + +func (p *Plugin) Out(event *pipeline.Event) { + p.batcher.Add(event) +} + +func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error { + if *workerData == nil { + *workerData = &data{ + outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize), + } + } + + data := (*workerData).(*data) + + // handle too much memory consumption + if cap(data.outBuf) > p.config.BatchSize_*p.avgEventSize { + data.outBuf = make([]byte, 0, p.config.BatchSize_*p.avgEventSize) + } + + data.outBuf = data.outBuf[:0] + + root := insaneJSON.Spawn() + defer insaneJSON.Release(root) + + dataArr := root.AddFieldNoAlloc(root, "data").MutateToArray() + batch.ForEach(func(event *pipeline.Event) { + dataArr.AddElementNoAlloc(root).MutateToNode(event.Root.Node) + }) + + err := p.send(root) + if err != nil { + p.sendErrorMetric.WithLabelValues().Inc() + p.logger.Error("can't send data to Loki", zap.String("address", p.config.Endpoint), zap.Error(err)) + } else { + p.logger.Debug("successfully sent", zap.String("data", string(data.outBuf))) + } + + return err +} + +func (p *Plugin) send(root *insaneJSON.Root) error { + messages := root.Dig("data").AsArray() + // # {"name":"partitions_total","labels":{"partition":"1"},"timestamp":"2025-06-05T05:19:28.129666195Z","value": 1}} + values := make([]promwrite.TimeSeries, 0, len(messages)) + + for _, msg := range messages { + nameNode := msg.Dig("name") + name := nameNode.AsString() + nameNode.Suicide() + + // timestampNode := msg.Dig("timestamp") + // timestamp := timestampNode.AsString() + // timestampNode.Suicide() + + valueNode := msg.Dig("value") + value := valueNode.AsFloat() + valueNode.Suicide() + + labelsNode := msg.Dig("labels") + labelValues := labelsNode.AsFields() + labelsNode.Suicide() + + labels := make([]promwrite.Label, 0, len(labelValues)+1) + labels = append(labels, promwrite.Label{ + Name: "__name__", + Value: name, + }) + for _, l := range labelValues { + labels = append(labels, promwrite.Label{ + Name: l.AsString(), + Value: labelsNode.Dig(l.AsString()).AsString(), + }) + } + + logLine := promwrite.TimeSeries{ + Labels: labels, + Sample: promwrite.Sample{ + Time: time.Now(), + Value: value, + }, + } + + values = append(values, logLine) + } + + _, err := p.client.Write(context.Background(), &promwrite.WriteRequest{TimeSeries: values}) + + return err +} + +func (p *Plugin) registerMetrics(ctl *metric.Ctl) { + p.sendErrorMetric = ctl.RegisterCounterVec("output_loki_send_error", "Total Loki send errors") +} + +func (p *Plugin) prepareClient() { + p.client = promwrite.NewClient(p.config.Endpoint) +} From 96ca7873764e1a6840278ede0572fa2d79ad70b8 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 18 Jun 2025 11:55:59 +0700 Subject: [PATCH 03/19] add types in generateCacheKey --- pipeline/metadata/templater.go | 27 +++++++++++---- pipeline/metadata/templater_test.go | 54 +++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 6 deletions(-) diff --git a/pipeline/metadata/templater.go b/pipeline/metadata/templater.go index 8973e92ac..7de97ab16 100644 --- a/pipeline/metadata/templater.go +++ b/pipeline/metadata/templater.go @@ -4,7 +4,7 @@ import ( "bytes" "fmt" "regexp" - "strconv" + "sort" "strings" "sync" "text/template" @@ -213,8 +213,14 @@ func (m *MetaTemplater) Render(data Data) (MetaData, error) { } func generateCacheKey(data map[string]any) string { + keys := make([]string, 0, len(data)) + for k := range data { + keys = append(keys, k) + } + sort.Strings(keys) + var builder strings.Builder - builder.Grow(len(data) * 16) // Preallocate memory for the builder (estimate) + builder.Grow(len(data) * 32) // Preallocate memory for the builder (estimate) for k, v := range data { switch v := v.(type) { @@ -224,14 +230,23 @@ func generateCacheKey(data map[string]any) string { builder.WriteString(":") builder.WriteString(v) builder.WriteString("|") - case int: - // Write the key and integer value to the builder + case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64: + builder.WriteString(k) + builder.WriteString(":") + builder.WriteString(fmt.Sprintf("%d", v)) + builder.WriteString("|") + case float32, float64: + builder.WriteString(k) + builder.WriteString(":") + builder.WriteString(fmt.Sprintf("%f", v)) + builder.WriteString("|") + case bool: builder.WriteString(k) builder.WriteString(":") - builder.WriteString(strconv.Itoa(v)) + builder.WriteString(fmt.Sprintf("%t", v)) builder.WriteString("|") } - // If the value is not a string or int, skip it + // If the value is not a string, int, float or bool, skip it } // Convert the builder to a string diff --git a/pipeline/metadata/templater_test.go b/pipeline/metadata/templater_test.go index cd1214106..6142842c8 100644 --- a/pipeline/metadata/templater_test.go +++ b/pipeline/metadata/templater_test.go @@ -194,3 +194,57 @@ func BenchmarkMetaTemplater_Render(b *testing.B) { } } } + +func TestGenerateCacheKey(t *testing.T) { + tests := []struct { + name string + input map[string]any + expected string + }{ + { + name: "empty map", + input: map[string]any{}, + expected: "", + }, + { + name: "string and int", + input: map[string]any{ + "topic": "topic1", + "partition": 2, + }, + expected: "topic:topic1|partition:2", + }, + { + name: "int32 && int64", + input: map[string]any{ + "topic": "topic1", + "partition": int32(2), + "offset": int64(123456789), + }, + expected: "topic:topic1|partition:2|offset:123456789", + }, + { + name: "float", + input: map[string]any{ + "size": float32(2.0), + }, + expected: "size:2.000000", + }, + { + name: "bool", + input: map[string]any{ + "is": true, + }, + expected: "is:true", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := generateCacheKey(tt.input) + if got != tt.expected { + t.Errorf("generateCacheKey() = %v, want %v", got, tt.expected) + } + }) + } +} From 150d4713b846be07260135ca12ed688a43432968 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 18 Jun 2025 14:17:18 +0700 Subject: [PATCH 04/19] prometheus output: send timestamp --- .../event_to_metrics/event_to_metrics.go | 47 +++++++++++++++++++ plugin/output/prometheus/prometheus.go | 8 ++-- 2 files changed, 51 insertions(+), 4 deletions(-) diff --git a/plugin/action/event_to_metrics/event_to_metrics.go b/plugin/action/event_to_metrics/event_to_metrics.go index 9657c3ecf..61827c5f6 100644 --- a/plugin/action/event_to_metrics/event_to_metrics.go +++ b/plugin/action/event_to_metrics/event_to_metrics.go @@ -1,8 +1,12 @@ package event_to_metrics import ( + "time" + + "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/pipeline" + "github.com/ozontech/file.d/xtime" insaneJSON "github.com/ozontech/insane-json" "go.uber.org/zap" ) @@ -15,11 +19,26 @@ type Plugin struct { config *Config logger *zap.Logger pluginController pipeline.ActionPluginController + format string } // ! config-params // ^ config-params type Config struct { + // > @3@4@5@6 + // > + // > The event field which defines the time when event was fired. + // > It is used to detect the event throughput in a particular time range. + // > If not set, the current time will be taken. + TimeField cfg.FieldSelector `json:"time_field" default:"time" parse:"selector"` // * + TimeField_ []string + + // > @3@4@5@6 + // > + // > It defines how to parse the time field format. Can be specified as a datetime layout in Go [time.Parse](https://pkg.go.dev/time#Parse) format or by alias. + // > List of available datetime format aliases can be found [here](/pipeline/README.md#datetime-parse-formats). + TimeFieldFormat string `json:"time_field_format" default:"rfc3339nano"` // * + Metrics []Metric } @@ -44,18 +63,46 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP p.config = config.(*Config) p.logger = params.Logger.Desugar() p.pluginController = params.Controller + + format, err := xtime.ParseFormatName(p.config.TimeFieldFormat) + if err != nil { + format = p.config.TimeFieldFormat + } + p.format = format } func (p *Plugin) Stop() { } func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { + var ts time.Time + + if len(p.config.TimeField_) != 0 { + tsValue := event.Root.Dig(p.config.TimeField_...).AsString() + t, err := xtime.ParseTime(p.format, tsValue) + if err != nil || t.IsZero() { + p.logger.Warn( + "can't parse field with timestamp using format", + zap.Any("time_field", p.config.TimeField), + zap.String("TimeFieldFormat", p.config.TimeFieldFormat), + zap.String("value", tsValue), + ) + ts = time.Now() + } else { + ts = t + } + } else { + ts = time.Now() + } + children := make([]*insaneJSON.Node, 0, len(p.config.Metrics)) for _, metric := range p.config.Metrics { elem := new(insaneJSON.Node) object := elem.MutateToObject() object.AddField("name").MutateToBytes([]byte(metric.Name)) + object.AddField("timestamp").MutateToInt64(ts.UnixNano()) + if len(metric.Value) == 0 { object.AddField("value").MutateToInt(1) } else { diff --git a/plugin/output/prometheus/prometheus.go b/plugin/output/prometheus/prometheus.go index 2cdee1203..5fa05a760 100644 --- a/plugin/output/prometheus/prometheus.go +++ b/plugin/output/prometheus/prometheus.go @@ -313,9 +313,9 @@ func (p *Plugin) send(root *insaneJSON.Root) error { name := nameNode.AsString() nameNode.Suicide() - // timestampNode := msg.Dig("timestamp") - // timestamp := timestampNode.AsString() - // timestampNode.Suicide() + timestampNode := msg.Dig("timestamp") + timestamp := timestampNode.AsInt64() + timestampNode.Suicide() valueNode := msg.Dig("value") value := valueNode.AsFloat() @@ -340,7 +340,7 @@ func (p *Plugin) send(root *insaneJSON.Root) error { logLine := promwrite.TimeSeries{ Labels: labels, Sample: promwrite.Sample{ - Time: time.Now(), + Time: time.Unix(0, timestamp), Value: value, }, } From 6dcd7f9f8842e7e09d7a196167179d62d7c1da93 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 18 Jun 2025 19:39:19 +0700 Subject: [PATCH 05/19] prometheus output: use collector for counter metric --- .../event_to_metrics/event_to_metrics.go | 2 + plugin/output/prometheus/prometheus.go | 38 +++++++++++++++++-- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/plugin/action/event_to_metrics/event_to_metrics.go b/plugin/action/event_to_metrics/event_to_metrics.go index 61827c5f6..a002e8350 100644 --- a/plugin/action/event_to_metrics/event_to_metrics.go +++ b/plugin/action/event_to_metrics/event_to_metrics.go @@ -44,6 +44,7 @@ type Config struct { type Metric struct { Name string `json:"name"` + Type string `json:"type"` Value string `json:"value"` Labels map[string]string `json:"labels"` } @@ -101,6 +102,7 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { object := elem.MutateToObject() object.AddField("name").MutateToBytes([]byte(metric.Name)) + object.AddField("type").MutateToBytes([]byte(metric.Type)) object.AddField("timestamp").MutateToInt64(ts.UnixNano()) if len(metric.Value) == 0 { diff --git a/plugin/output/prometheus/prometheus.go b/plugin/output/prometheus/prometheus.go index 5fa05a760..edbbd5dcf 100644 --- a/plugin/output/prometheus/prometheus.go +++ b/plugin/output/prometheus/prometheus.go @@ -2,7 +2,9 @@ package prometheus import ( "context" - "errors" + "fmt" + "strings" + "sync" "time" "github.com/castai/promwrite" @@ -21,8 +23,6 @@ import ( It sends the logs batches to Loki using HTTP API. }*/ -var errUnixNanoFormat = errors.New("please send time in UnixNano format or add a convert_date action") - const ( outPluginType = "prometheus" ) @@ -194,6 +194,8 @@ type Plugin struct { // plugin metrics sendErrorMetric *prometheus.CounterVec + + collector *sync.Map } func init() { @@ -213,6 +215,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger = params.Logger.Desugar() p.avgEventSize = params.PipelineSettings.AvgEventSize p.registerMetrics(params.MetricCtl) + p.collector = new(sync.Map) p.prepareClient() @@ -313,6 +316,10 @@ func (p *Plugin) send(root *insaneJSON.Root) error { name := nameNode.AsString() nameNode.Suicide() + typeNode := msg.Dig("type") + metricType := typeNode.AsString() + typeNode.Suicide() + timestampNode := msg.Dig("timestamp") timestamp := timestampNode.AsInt64() timestampNode.Suicide() @@ -337,6 +344,15 @@ func (p *Plugin) send(root *insaneJSON.Root) error { }) } + if metricType == "counter" { + labelsKeys := labelsToKey(labels) + prevValue, ok := p.collector.Load(labelsKeys) + if ok { + value += prevValue.(float64) + } + p.collector.Store(labelsKeys, value) + } + logLine := promwrite.TimeSeries{ Labels: labels, Sample: promwrite.Sample{ @@ -350,9 +366,25 @@ func (p *Plugin) send(root *insaneJSON.Root) error { _, err := p.client.Write(context.Background(), &promwrite.WriteRequest{TimeSeries: values}) + if err != nil { + // TODO: add metrics + if strings.Contains(err.Error(), "out of order sample") || strings.Contains(err.Error(), "duplicate sample for") { + p.logger.Warn("can't send data to Prometheus", zap.Error(err)) + return nil + } + } + return err } +func labelsToKey(labels []promwrite.Label) string { + var b strings.Builder + for _, l := range labels { + fmt.Fprintf(&b, "%s=%s,", l.Name, l.Value) + } + return b.String() +} + func (p *Plugin) registerMetrics(ctl *metric.Ctl) { p.sendErrorMetric = ctl.RegisterCounterVec("output_loki_send_error", "Total Loki send errors") } From e73df9d1834814c8cd4ef8aa6af030e284abe83e Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Mon, 23 Jun 2025 19:53:21 +0700 Subject: [PATCH 06/19] promtheus: remove dublicates --- plugin/output/prometheus/prometheus.go | 33 +++++++++++++++++++------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/plugin/output/prometheus/prometheus.go b/plugin/output/prometheus/prometheus.go index edbbd5dcf..fbec94c54 100644 --- a/plugin/output/prometheus/prometheus.go +++ b/plugin/output/prometheus/prometheus.go @@ -311,7 +311,12 @@ func (p *Plugin) send(root *insaneJSON.Root) error { // # {"name":"partitions_total","labels":{"partition":"1"},"timestamp":"2025-06-05T05:19:28.129666195Z","value": 1}} values := make([]promwrite.TimeSeries, 0, len(messages)) + type metricValue struct { + value float64 + timestamp int64 + } for _, msg := range messages { + skipSendValue := false nameNode := msg.Dig("name") name := nameNode.AsString() nameNode.Suicide() @@ -348,20 +353,30 @@ func (p *Plugin) send(root *insaneJSON.Root) error { labelsKeys := labelsToKey(labels) prevValue, ok := p.collector.Load(labelsKeys) if ok { - value += prevValue.(float64) + value += prevValue.(metricValue).value + prevTimestamp := prevValue.(metricValue).timestamp / 1000000 + if prevTimestamp >= timestamp/1000000 { + skipSendValue = true + } } - p.collector.Store(labelsKeys, value) + p.collector.Store(labelsKeys, metricValue{ + timestamp: timestamp, + value: value, + }) } - logLine := promwrite.TimeSeries{ - Labels: labels, - Sample: promwrite.Sample{ - Time: time.Unix(0, timestamp), - Value: value, - }, + if !skipSendValue { + logLine := promwrite.TimeSeries{ + Labels: labels, + Sample: promwrite.Sample{ + Time: time.Unix(0, timestamp), + Value: value, + }, + } + + values = append(values, logLine) } - values = append(values, logLine) } _, err := p.client.Write(context.Background(), &promwrite.WriteRequest{TimeSeries: values}) From 2f997b59050860d4f348d8576fbad14fdf16a654 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Tue, 24 Jun 2025 14:11:58 +0700 Subject: [PATCH 07/19] prometheus: add metric collector --- plugin/output/prometheus/metric_collector.go | 175 +++++++++++++++++++ plugin/output/prometheus/prometheus.go | 58 ++---- 2 files changed, 188 insertions(+), 45 deletions(-) create mode 100644 plugin/output/prometheus/metric_collector.go diff --git a/plugin/output/prometheus/metric_collector.go b/plugin/output/prometheus/metric_collector.go new file mode 100644 index 000000000..7d1e0dc54 --- /dev/null +++ b/plugin/output/prometheus/metric_collector.go @@ -0,0 +1,175 @@ +package prometheus + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/castai/promwrite" + "go.uber.org/zap" +) + +type metricCollector struct { + collector *sync.Map + timeout time.Duration + flushTicker *time.Ticker + shutdownChan chan struct{} + sender storageSender + logger *zap.Logger +} + +type metricValue struct { + value float64 + timestamp int64 + lastUpdateTime time.Time + lastValueIsSended bool +} + +type storageSender interface { + sendToStorage(values []promwrite.TimeSeries) error +} + +func newCollector(timeout time.Duration, sender storageSender, logger *zap.Logger) *metricCollector { + c := &metricCollector{ + collector: new(sync.Map), + timeout: timeout, + flushTicker: time.NewTicker(1 * time.Second), // Check every second + shutdownChan: make(chan struct{}), + sender: sender, + logger: logger, + } + go c.flushOldMetrics() + return c +} + +func (p *metricCollector) handleMetric(labels []promwrite.Label, value float64, timestamp int64, metricType string) []promwrite.TimeSeries { + labelsKey := labelsToKey(labels) + now := time.Now() + currentTimestampSec := timestamp / 1_000_000_000 + + var values []promwrite.TimeSeries + var shouldSend bool + var currentValue float64 + var prevMetric metricValue + + if prev, ok := p.collector.Load(labelsKey); ok { + prevMetric = prev.(metricValue) + prevTimestampSec := prevMetric.timestamp / 1_000_000_000 + + // For counters, accumulate values + currentValue = value + if metricType == "counter" { + currentValue += prevMetric.value + } + + // Check if time window advanced + shouldSend = prevTimestampSec < currentTimestampSec + + if shouldSend { + values = append(values, createTimeSeries(labels, prevMetric)) + } + } else { + currentValue = value + shouldSend = false // First value, don't send yet + } + + // Always store the current value + p.collector.Store(labelsKey, metricValue{ + value: currentValue, + timestamp: timestamp, + lastUpdateTime: now, + lastValueIsSended: shouldSend, + }) + + return values +} + +func (p *metricCollector) flushOldMetrics() { + for { + select { + case <-p.flushTicker.C: + var toSend []promwrite.TimeSeries + now := time.Now() + + p.collector.Range(func(key, value interface{}) bool { + metric := value.(metricValue) + if now.Sub(metric.lastUpdateTime) > p.timeout && !metric.lastValueIsSended { + labels := keyToLabels(key.(string)) + toSend = append(toSend, createTimeSeries(labels, metric)) + + metric.lastValueIsSended = true + p.collector.Store(key, metric) + } + return true + }) + + if len(toSend) > 0 { + // Send these metrics to your storage + err := p.sender.sendToStorage(toSend) + if err != nil { + p.logger.Error("can't send data", zap.Error(err)) + } + } + + case <-p.shutdownChan: + p.flushTicker.Stop() + return + } + } +} + +func (p *metricCollector) shutdown() { + close(p.shutdownChan) + // Flush all remaining metrics + var toSend []promwrite.TimeSeries + p.collector.Range(func(key, value interface{}) bool { + metric := value.(metricValue) + labels := keyToLabels(key.(string)) + toSend = append(toSend, createTimeSeries(labels, metric)) + return true + }) + p.sender.sendToStorage(toSend) +} + +// Helper function +func createTimeSeries(labels []promwrite.Label, metric metricValue) promwrite.TimeSeries { + return promwrite.TimeSeries{ + Labels: labels, + Sample: promwrite.Sample{ + Time: time.Unix(0, metric.timestamp), + Value: metric.value, + }, + } +} + +func keyToLabels(key string) []promwrite.Label { + if len(key) == 0 { + return nil + } + key = key[:len(key)-1] // Remove trailing comma + labels := make([]promwrite.Label, 0, strings.Count(key, ",")+1) + + for key != "" { + pair, rest, _ := strings.Cut(key, ",") + name, value, _ := strings.Cut(pair, "=") + labels = append(labels, promwrite.Label{Name: name, Value: value}) + key = rest + } + return labels +} + +func labelsToKey(labels []promwrite.Label) string { + sorted := make([]promwrite.Label, len(labels)) + copy(sorted, labels) + sort.Slice(sorted, func(i, j int) bool { + return sorted[i].Name < sorted[j].Name + }) + + var b strings.Builder + for _, l := range sorted { + fmt.Fprintf(&b, "%s=%s,", l.Name, l.Value) + } + return b.String() +} diff --git a/plugin/output/prometheus/prometheus.go b/plugin/output/prometheus/prometheus.go index fbec94c54..9bc6c322c 100644 --- a/plugin/output/prometheus/prometheus.go +++ b/plugin/output/prometheus/prometheus.go @@ -2,9 +2,7 @@ package prometheus import ( "context" - "fmt" "strings" - "sync" "time" "github.com/castai/promwrite" @@ -195,7 +193,7 @@ type Plugin struct { // plugin metrics sendErrorMetric *prometheus.CounterVec - collector *sync.Map + collector *metricCollector } func init() { @@ -215,7 +213,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger = params.Logger.Desugar() p.avgEventSize = params.PipelineSettings.AvgEventSize p.registerMetrics(params.MetricCtl) - p.collector = new(sync.Map) + p.collector = newCollector(10*time.Second, p, p.logger) p.prepareClient() @@ -264,6 +262,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP func (p *Plugin) Stop() { p.batcher.Stop() + p.collector.shutdown() p.cancel() } @@ -311,12 +310,7 @@ func (p *Plugin) send(root *insaneJSON.Root) error { // # {"name":"partitions_total","labels":{"partition":"1"},"timestamp":"2025-06-05T05:19:28.129666195Z","value": 1}} values := make([]promwrite.TimeSeries, 0, len(messages)) - type metricValue struct { - value float64 - timestamp int64 - } for _, msg := range messages { - skipSendValue := false nameNode := msg.Dig("name") name := nameNode.AsString() nameNode.Suicide() @@ -349,38 +343,15 @@ func (p *Plugin) send(root *insaneJSON.Root) error { }) } - if metricType == "counter" { - labelsKeys := labelsToKey(labels) - prevValue, ok := p.collector.Load(labelsKeys) - if ok { - value += prevValue.(metricValue).value - prevTimestamp := prevValue.(metricValue).timestamp / 1000000 - if prevTimestamp >= timestamp/1000000 { - skipSendValue = true - } - } - p.collector.Store(labelsKeys, metricValue{ - timestamp: timestamp, - value: value, - }) - } - - if !skipSendValue { - logLine := promwrite.TimeSeries{ - Labels: labels, - Sample: promwrite.Sample{ - Time: time.Unix(0, timestamp), - Value: value, - }, - } - - values = append(values, logLine) - } - + values = append(values, p.collector.handleMetric( + labels, + value, + timestamp, + metricType, + )...) } - _, err := p.client.Write(context.Background(), &promwrite.WriteRequest{TimeSeries: values}) - + err := p.sendToStorage(values) if err != nil { // TODO: add metrics if strings.Contains(err.Error(), "out of order sample") || strings.Contains(err.Error(), "duplicate sample for") { @@ -392,12 +363,9 @@ func (p *Plugin) send(root *insaneJSON.Root) error { return err } -func labelsToKey(labels []promwrite.Label) string { - var b strings.Builder - for _, l := range labels { - fmt.Fprintf(&b, "%s=%s,", l.Name, l.Value) - } - return b.String() +func (p *Plugin) sendToStorage(values []promwrite.TimeSeries) error { + _, err := p.client.Write(context.Background(), &promwrite.WriteRequest{TimeSeries: values}) + return err } func (p *Plugin) registerMetrics(ctl *metric.Ctl) { From 657b2184bdb64bee5c20eee4677fac05dc69c319 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Mon, 14 Jul 2025 17:46:05 +0700 Subject: [PATCH 08/19] event_to_metric: add do_if --- .../event_to_metrics/event_to_metrics.go | 36 ++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/plugin/action/event_to_metrics/event_to_metrics.go b/plugin/action/event_to_metrics/event_to_metrics.go index a002e8350..e08d1eabe 100644 --- a/plugin/action/event_to_metrics/event_to_metrics.go +++ b/plugin/action/event_to_metrics/event_to_metrics.go @@ -6,6 +6,7 @@ import ( "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/pipeline" + "github.com/ozontech/file.d/pipeline/doif" "github.com/ozontech/file.d/xtime" insaneJSON "github.com/ozontech/insane-json" "go.uber.org/zap" @@ -47,6 +48,11 @@ type Metric struct { Type string `json:"type"` Value string `json:"value"` Labels map[string]string `json:"labels"` + + DoIfCheckerMap map[string]any `json:"do_if"` + DoIfChecker *doif.Checker + + use bool } func init() { @@ -64,6 +70,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP p.config = config.(*Config) p.logger = params.Logger.Desugar() p.pluginController = params.Controller + p.config.Metrics = prepareCheckersForMetrics(p.config.Metrics, p.logger) format, err := xtime.ParseFormatName(p.config.TimeFieldFormat) if err != nil { @@ -72,10 +79,33 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP p.format = format } +func prepareCheckersForMetrics(metrics []Metric, logger *zap.Logger) []Metric { + for i := range metrics { + m := &metrics[i] + if m.DoIfCheckerMap != nil { + var err error + m.DoIfChecker, err = doif.NewFromMap(m.DoIfCheckerMap) + if err != nil { + logger.Fatal("can't init do_if for mask", zap.Error(err)) + } + } else { + m.use = true + } + } + + return metrics +} + func (p *Plugin) Stop() { } func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { + for i := range p.config.Metrics { + if p.config.Metrics[i].DoIfChecker != nil { + p.config.Metrics[i].use = p.config.Metrics[i].DoIfChecker.Check(event.Root) + } + } + var ts time.Time if len(p.config.TimeField_) != 0 { @@ -98,6 +128,10 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { children := make([]*insaneJSON.Node, 0, len(p.config.Metrics)) for _, metric := range p.config.Metrics { + if !metric.use { + continue + } + elem := new(insaneJSON.Node) object := elem.MutateToObject() @@ -127,7 +161,7 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { if len(children) == 0 { // zero array or an array that does not contain objects - return pipeline.ActionPass + return pipeline.ActionDiscard } p.pluginController.Spawn(event, children) From 0a2dd7356d3a38b021ee32c4abc2fdef0eeb1fb9 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Thu, 24 Jul 2025 20:21:39 +0700 Subject: [PATCH 09/19] add cardinality_limit action plugin --- cmd/file.d/file.d.go | 1 + go.mod | 5 +- go.sum | 9 ++ plugin/action/cardinality/README.idoc.md | 4 + plugin/action/cardinality/README.md | 18 ++++ plugin/action/cardinality/cache.go | 107 ++++++++++++++++++ plugin/action/cardinality/cardinality.go | 131 +++++++++++++++++++++++ 7 files changed, 274 insertions(+), 1 deletion(-) create mode 100644 plugin/action/cardinality/README.idoc.md create mode 100755 plugin/action/cardinality/README.md create mode 100644 plugin/action/cardinality/cache.go create mode 100644 plugin/action/cardinality/cardinality.go diff --git a/cmd/file.d/file.d.go b/cmd/file.d/file.d.go index 9e7e3e8e3..41c232dc8 100644 --- a/cmd/file.d/file.d.go +++ b/cmd/file.d/file.d.go @@ -19,6 +19,7 @@ import ( "github.com/ozontech/file.d/pipeline" _ "github.com/ozontech/file.d/plugin/action/add_file_name" _ "github.com/ozontech/file.d/plugin/action/add_host" + _ "github.com/ozontech/file.d/plugin/action/cardinality" _ "github.com/ozontech/file.d/plugin/action/convert_date" _ "github.com/ozontech/file.d/plugin/action/convert_log_level" _ "github.com/ozontech/file.d/plugin/action/convert_utf8_bytes" diff --git a/go.mod b/go.mod index 7bb9d7d53..9e46611b9 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/castai/promwrite v0.5.0 github.com/cenkalti/backoff/v4 v4.3.0 github.com/cespare/xxhash/v2 v2.3.0 + github.com/dgraph-io/ristretto/v2 v2.0.0 github.com/dominikbraun/graph v0.23.0 github.com/elliotchance/orderedmap/v2 v2.4.0 github.com/euank/go-kmsg-parser/v3 v3.0.0 @@ -73,6 +74,7 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dmarkham/enumer v1.5.8 // indirect github.com/docker/go-units v0.5.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/go-faster/city v1.0.1 // indirect github.com/go-faster/errors v0.6.1 // indirect @@ -123,6 +125,7 @@ require ( github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/pascaldekloe/name v1.0.1 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/prometheus v0.40.3 // indirect github.com/ryanuber/go-glob v1.0.0 // indirect @@ -146,7 +149,7 @@ require ( golang.org/x/mod v0.17.0 // indirect golang.org/x/oauth2 v0.10.0 // indirect golang.org/x/sync v0.11.0 // indirect - golang.org/x/sys v0.30.0 // indirect + golang.org/x/sys v0.31.0 // indirect golang.org/x/term v0.29.0 // indirect golang.org/x/text v0.22.0 // indirect golang.org/x/time v0.10.0 // indirect diff --git a/go.sum b/go.sum index fe5ed79f7..8e4980bf9 100644 --- a/go.sum +++ b/go.sum @@ -56,6 +56,10 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgraph-io/ristretto/v2 v2.0.0 h1:l0yiSOtlJvc0otkqyMaDNysg8E9/F/TYZwMbxscNOAQ= +github.com/dgraph-io/ristretto/v2 v2.0.0/go.mod h1:FVFokF2dRqXyPyeMnK1YDy8Fc6aTe0IKgbcd03CYeEk= +github.com/dgraph-io/ristretto/v2 v2.2.0 h1:bkY3XzJcXoMuELV8F+vS8kzNgicwQFAaGINAEJdWGOM= +github.com/dgraph-io/ristretto/v2 v2.2.0/go.mod h1:RZrm63UmcBAaYWC1DotLYBmTvgkrs0+XhBd7Npn7/zI= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dmarkham/enumer v1.5.8 h1:fIF11F9l5jyD++YYvxcSH5WgHfeaSGPaN/T4kOQ4qEM= @@ -64,6 +68,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo= github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/elliotchance/orderedmap/v2 v2.4.0 h1:6tUmMwD9F998FNpwFxA5E6NQvSpk2PVw7RKsVq3+2Cw= github.com/elliotchance/orderedmap/v2 v2.4.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q= github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= @@ -345,6 +351,7 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -489,6 +496,8 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/plugin/action/cardinality/README.idoc.md b/plugin/action/cardinality/README.idoc.md new file mode 100644 index 000000000..b7a08ee86 --- /dev/null +++ b/plugin/action/cardinality/README.idoc.md @@ -0,0 +1,4 @@ +# Cardinality plugin +@introduction + +> No config params diff --git a/plugin/action/cardinality/README.md b/plugin/action/cardinality/README.md new file mode 100755 index 000000000..d856baba0 --- /dev/null +++ b/plugin/action/cardinality/README.md @@ -0,0 +1,18 @@ +# Discard plugin +It drops an event. It is used in a combination with `match_fields`/`match_mode` parameters to filter out the events. + +**An example for discarding informational and debug logs:** +```yaml +pipelines: + example_pipeline: + ... + actions: + - type: discard + match_fields: + level: /info|debug/ + ... +``` + +> No config params + +
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/action/cardinality/cache.go b/plugin/action/cardinality/cache.go new file mode 100644 index 000000000..9702f7e08 --- /dev/null +++ b/plugin/action/cardinality/cache.go @@ -0,0 +1,107 @@ +package discard + +import ( + "sync" + "time" + + "github.com/dgraph-io/ristretto/v2" +) + +type Cache struct { + rc *ristretto.Cache[string, string] // Ristretto v2 cache instance + prefixTrie *prefixTrie // For efficient prefix counting + mu sync.RWMutex // Protects trie operations + ttl time.Duration +} + +type prefixTrie struct { + children map[rune]*prefixTrie + count int +} + +// NewCache creates a new cache instance with Ristretto v2 +func NewCache(maxItems int64, ttl time.Duration) (*Cache, error) { + rc, err := ristretto.NewCache(&ristretto.Config[string, string]{ + NumCounters: maxItems * 10, // Number of keys to track frequency + MaxCost: maxItems, // Maximum number of items in cache + BufferItems: 64, // Size of Get buffer + }) + if err != nil { + return nil, err + } + + return &Cache{ + rc: rc, + prefixTrie: &prefixTrie{children: make(map[rune]*prefixTrie)}, + ttl: ttl, + }, nil +} + +// Set adds an item to cache with TTL and updates prefix counts +func (c *Cache) Set(key string) bool { + // Set item in Ristretto with TTL + success := c.rc.SetWithTTL(key, "1", 1, c.ttl) + if !success { + return false + } + + // Update prefix trie + c.mu.Lock() + defer c.mu.Unlock() + c.updateTrie(key, 1) + return true +} + +// Get retrieves an item from cache +func (c *Cache) Get(key string) (string, bool) { + return c.rc.Get(key) +} + +// Delete removes an item from cache and updates prefix counts +func (c *Cache) Delete(key string) { + c.rc.Del(key) + c.mu.Lock() + defer c.mu.Unlock() + c.updateTrie(key, -1) +} + +// CountPrefix returns number of keys matching the prefix +func (c *Cache) CountPrefix(prefix string) int { + c.mu.RLock() + defer c.mu.RUnlock() + + node := c.prefixTrie + for _, char := range prefix { + if child, ok := node.children[char]; ok { + node = child + } else { + return 0 + } + } + return node.count +} + +// Clear completely resets the cache +func (c *Cache) Clear() { + c.rc.Clear() + c.mu.Lock() + defer c.mu.Unlock() + c.prefixTrie = &prefixTrie{children: make(map[rune]*prefixTrie)} +} + +// Close shuts down the cache +func (c *Cache) Close() { + c.rc.Close() +} + +// updateTrie maintains the prefix count trie +func (c *Cache) updateTrie(key string, delta int) { + node := c.prefixTrie + for _, char := range key { + if _, ok := node.children[char]; !ok { + node.children[char] = &prefixTrie{children: make(map[rune]*prefixTrie)} + } + node = node.children[char] + node.count += delta + } +} diff --git a/plugin/action/cardinality/cardinality.go b/plugin/action/cardinality/cardinality.go new file mode 100644 index 000000000..88b077c2c --- /dev/null +++ b/plugin/action/cardinality/cardinality.go @@ -0,0 +1,131 @@ +package discard + +import ( + "sort" + "strings" + "time" + + "github.com/ozontech/file.d/cfg" + "github.com/ozontech/file.d/fd" + "github.com/ozontech/file.d/pipeline" +) + +type Plugin struct { + cache *Cache + config *Config + keys []string + fields []string +} + +type Config struct { + KeyFields []cfg.FieldSelector `json:"key" slice:"true" required:"true"` + Fields []cfg.FieldSelector `json:"fields" slice:"true" required:"true"` + Limit int `json:"limit" default:"10000"` + TTL cfg.Duration `json:"ttl" default:"1h" parse:"duration"` // * + TTL_ time.Duration +} + +func init() { + fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{ + Type: "cardinality", + Factory: factory, + }) +} + +func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { + return &Plugin{}, &Config{} +} + +func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) { + p.config = config.(*Config) + + var err error + p.cache, err = NewCache(int64(p.config.Limit*10), p.config.TTL_) + if err != nil { + panic(err) + } + + p.keys = make([]string, 0, len(p.config.KeyFields)) + for _, fs := range p.config.KeyFields { + if fs != "" { + p.keys = append(p.keys, cfg.ParseFieldSelector(string(fs))[0]) + } + } + + p.fields = make([]string, 0, len(p.config.Fields)) + for _, fs := range p.config.Fields { + if fs != "" { + p.fields = append(p.fields, cfg.ParseFieldSelector(string(fs))[0]) + } + } +} + +func (p *Plugin) Stop() { + p.cache.Close() +} + +func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { + cacheKey := map[string]string{} + for _, key := range p.keys { + node := event.Root.Dig(key) + value := node.AsString() + cacheKey[key] = value + } + + cacheValue := map[string]string{} + for _, key := range p.fields { + node := event.Root.Dig(key) + value := node.AsString() + cacheValue[key] = value + } + + key := mapToKey(cacheKey) + value := mapToStringSorted(cacheKey, cacheValue) + p.cache.Set(value) + + if p.cache.CountPrefix(key) > p.config.Limit { + return pipeline.ActionDiscard + // TODO: add metrics + } else { + return pipeline.ActionPass + } +} + +func mapToStringSorted(m, n map[string]string) string { + var sb strings.Builder + sb.WriteString(mapToKey(m)) + + keys := make([]string, 0, len(n)) + for k := range n { + keys = append(keys, k) + } + sort.Strings(keys) + sb.WriteString("map[") + for i, k := range keys { + if i > 0 { + sb.WriteString(" ") + } + sb.WriteString(k + ":" + m[k]) + } + sb.WriteString("]") + return sb.String() +} + +func mapToKey(m map[string]string) string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + sort.Strings(keys) + + var sb strings.Builder + sb.WriteString("map[") + for i, k := range keys { + if i > 0 { + sb.WriteString(" ") + } + sb.WriteString(k + ":" + m[k]) + } + sb.WriteString("];") + return sb.String() +} From 3faa43aca666e181e1517ce9eca42251d810f7f9 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Fri, 8 Aug 2025 14:11:34 +0700 Subject: [PATCH 10/19] add cache test for cardinality limit --- go.mod | 4 +- go.sum | 11 +- plugin/action/cardinality/cache.go | 127 ++++++++--------- plugin/action/cardinality/cache_test.go | 172 +++++++++++++++++++++++ plugin/action/cardinality/cardinality.go | 62 +++++++- 5 files changed, 286 insertions(+), 90 deletions(-) create mode 100644 plugin/action/cardinality/cache_test.go diff --git a/go.mod b/go.mod index 9e46611b9..e2b6f1c99 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,6 @@ require ( github.com/castai/promwrite v0.5.0 github.com/cenkalti/backoff/v4 v4.3.0 github.com/cespare/xxhash/v2 v2.3.0 - github.com/dgraph-io/ristretto/v2 v2.0.0 github.com/dominikbraun/graph v0.23.0 github.com/elliotchance/orderedmap/v2 v2.4.0 github.com/euank/go-kmsg-parser/v3 v3.0.0 @@ -32,6 +31,7 @@ require ( github.com/klauspost/compress v1.17.8 github.com/minio/minio-go v6.0.14+incompatible github.com/ozontech/insane-json v0.1.9 + github.com/plar/go-adaptive-radix-tree/v2 v2.0.3 github.com/prometheus/client_golang v1.16.0 github.com/prometheus/client_model v0.3.0 github.com/prometheus/common v0.42.0 @@ -74,7 +74,6 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dmarkham/enumer v1.5.8 // indirect github.com/docker/go-units v0.5.0 // indirect - github.com/dustin/go-humanize v1.0.1 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/go-faster/city v1.0.1 // indirect github.com/go-faster/errors v0.6.1 // indirect @@ -125,7 +124,6 @@ require ( github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/pascaldekloe/name v1.0.1 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect - github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/prometheus v0.40.3 // indirect github.com/ryanuber/go-glob v1.0.0 // indirect diff --git a/go.sum b/go.sum index 8e4980bf9..20aeae6ed 100644 --- a/go.sum +++ b/go.sum @@ -56,10 +56,6 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/ristretto/v2 v2.0.0 h1:l0yiSOtlJvc0otkqyMaDNysg8E9/F/TYZwMbxscNOAQ= -github.com/dgraph-io/ristretto/v2 v2.0.0/go.mod h1:FVFokF2dRqXyPyeMnK1YDy8Fc6aTe0IKgbcd03CYeEk= -github.com/dgraph-io/ristretto/v2 v2.2.0 h1:bkY3XzJcXoMuELV8F+vS8kzNgicwQFAaGINAEJdWGOM= -github.com/dgraph-io/ristretto/v2 v2.2.0/go.mod h1:RZrm63UmcBAaYWC1DotLYBmTvgkrs0+XhBd7Npn7/zI= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dmarkham/enumer v1.5.8 h1:fIF11F9l5jyD++YYvxcSH5WgHfeaSGPaN/T4kOQ4qEM= @@ -68,8 +64,6 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo= github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc= -github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= -github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/elliotchance/orderedmap/v2 v2.4.0 h1:6tUmMwD9F998FNpwFxA5E6NQvSpk2PVw7RKsVq3+2Cw= github.com/elliotchance/orderedmap/v2 v2.4.0/go.mod h1:85lZyVbpGaGvHvnKa7Qhx7zncAdBIBq6u56Hb1PRU5Q= github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= @@ -293,6 +287,8 @@ github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFu github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/plar/go-adaptive-radix-tree/v2 v2.0.3 h1:cJx/EUTduV4q10O5HSzHgPrViApJkJQk9OSeaT7UYUU= +github.com/plar/go-adaptive-radix-tree/v2 v2.0.3/go.mod h1:8yf9K81YK94H4gKh/K3hCBeC2s4JA/PYgqMkkOadwvk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= @@ -351,7 +347,6 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= @@ -494,8 +489,6 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= diff --git a/plugin/action/cardinality/cache.go b/plugin/action/cardinality/cache.go index 9702f7e08..8ae9e0f8a 100644 --- a/plugin/action/cardinality/cache.go +++ b/plugin/action/cardinality/cache.go @@ -4,104 +4,87 @@ import ( "sync" "time" - "github.com/dgraph-io/ristretto/v2" + art "github.com/plar/go-adaptive-radix-tree/v2" ) type Cache struct { - rc *ristretto.Cache[string, string] // Ristretto v2 cache instance - prefixTrie *prefixTrie // For efficient prefix counting - mu sync.RWMutex // Protects trie operations - ttl time.Duration + mu *sync.RWMutex + tree art.Tree + ttl time.Duration } -type prefixTrie struct { - children map[rune]*prefixTrie - count int -} - -// NewCache creates a new cache instance with Ristretto v2 -func NewCache(maxItems int64, ttl time.Duration) (*Cache, error) { - rc, err := ristretto.NewCache(&ristretto.Config[string, string]{ - NumCounters: maxItems * 10, // Number of keys to track frequency - MaxCost: maxItems, // Maximum number of items in cache - BufferItems: 64, // Size of Get buffer - }) - if err != nil { - return nil, err - } - +func NewCache(ttl time.Duration) (*Cache, error) { return &Cache{ - rc: rc, - prefixTrie: &prefixTrie{children: make(map[rune]*prefixTrie)}, - ttl: ttl, + tree: art.New(), + ttl: ttl, + mu: &sync.RWMutex{}, }, nil } -// Set adds an item to cache with TTL and updates prefix counts func (c *Cache) Set(key string) bool { - // Set item in Ristretto with TTL - success := c.rc.SetWithTTL(key, "1", 1, c.ttl) - if !success { - return false - } - - // Update prefix trie c.mu.Lock() defer c.mu.Unlock() - c.updateTrie(key, 1) - return true -} - -// Get retrieves an item from cache -func (c *Cache) Get(key string) (string, bool) { - return c.rc.Get(key) -} -// Delete removes an item from cache and updates prefix counts -func (c *Cache) Delete(key string) { - c.rc.Del(key) - c.mu.Lock() - defer c.mu.Unlock() - c.updateTrie(key, -1) + c.tree.Insert(art.Key(key), time.Now()) + return true } -// CountPrefix returns number of keys matching the prefix -func (c *Cache) CountPrefix(prefix string) int { +func (c *Cache) IsExists(key string) bool { c.mu.RLock() - defer c.mu.RUnlock() + timeValue, found := c.tree.Search(art.Key(key)) + c.mu.RUnlock() - node := c.prefixTrie - for _, char := range prefix { - if child, ok := node.children[char]; ok { - node = child - } else { - return 0 + if found { + now := time.Now() + isExpire := c.isExpire(now, timeValue.(time.Time)) + if isExpire { + c.delete(key) + return false } } - return node.count + return found +} + +func (c *Cache) isExpire(now, value time.Time) bool { + diff := now.Sub(value) + return diff > c.ttl } -// Clear completely resets the cache -func (c *Cache) Clear() { - c.rc.Clear() +func (c *Cache) delete(key string) { c.mu.Lock() defer c.mu.Unlock() - c.prefixTrie = &prefixTrie{children: make(map[rune]*prefixTrie)} -} -// Close shuts down the cache -func (c *Cache) Close() { - c.rc.Close() + c.tree.Delete(art.Key(key)) } -// updateTrie maintains the prefix count trie -func (c *Cache) updateTrie(key string, delta int) { - node := c.prefixTrie - for _, char := range key { - if _, ok := node.children[char]; !ok { - node.children[char] = &prefixTrie{children: make(map[rune]*prefixTrie)} +func (c *Cache) CountPrefix(prefix string) (count int) { + var keysToDelete []art.Key + c.mu.RLock() + now := time.Now() + c.tree.ForEachPrefix(art.Key(prefix), func(node art.Node) bool { + timeValue := node.Value().(time.Time) + if c.isExpire(now, timeValue) { + keysToDelete = append(keysToDelete, node.Key()) + return false + } else { + count++ + return true + } + }) + c.mu.RUnlock() + + if len(keysToDelete) > 0 { + for _, key := range keysToDelete { + c.delete(string(key)) } - node = node.children[char] - node.count += delta } + return +} + +type PrefixCache interface { + Set(key string) bool + IsExists(key string) bool + Delete(key string) + CountPrefix(prefix string) int + Cleanup() int } diff --git a/plugin/action/cardinality/cache_test.go b/plugin/action/cardinality/cache_test.go new file mode 100644 index 000000000..530eef480 --- /dev/null +++ b/plugin/action/cardinality/cache_test.go @@ -0,0 +1,172 @@ +package discard + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewCache(t *testing.T) { + cache, err := NewCache(time.Minute) + assert.NoError(t, err) + assert.NotNil(t, cache) +} + +func TestSetAndExists(t *testing.T) { + cache, _ := NewCache(time.Minute) + + t.Run("basic set and get", func(t *testing.T) { + key := "test-key" + assert.True(t, cache.Set(key)) + + found := cache.IsExists(key) + assert.True(t, found) + }) + + t.Run("non-existent key", func(t *testing.T) { + found := cache.IsExists("non-existent") + assert.False(t, found) + }) +} + +func TestDelete(t *testing.T) { + cache, _ := NewCache(time.Minute) + + key := "to-delete" + cache.Set(key) + + t.Run("delete existing key", func(t *testing.T) { + cache.delete(key) + found := cache.IsExists(key) + assert.False(t, found) + }) + + t.Run("delete non-existent key", func(t *testing.T) { + // Should not panic + cache.delete("never-existed") + }) +} + +func TestCountPrefix(t *testing.T) { + cache, _ := NewCache(time.Minute) + + keys := []string{ + "key1_subkey1", + "key1_subkey1", + "key1_subkey2", + + "key2_subkey1", + } + + for _, key := range keys { + cache.Set(key) + } + + testCases := []struct { + prefix string + count int + }{ + {"key1", 2}, + {"key2", 1}, + {"key3", 0}, + } + + for _, tc := range testCases { + t.Run("prefix "+tc.prefix, func(t *testing.T) { + assert.Equal(t, tc.count, cache.CountPrefix(tc.prefix)) + }) + } + + t.Run("count after delete", func(t *testing.T) { + cache.delete("key1_subkey1") + assert.Equal(t, 1, cache.CountPrefix("key1")) + }) +} + +func TestConcurrentOperations(t *testing.T) { + cache, _ := NewCache(time.Minute) + + var wg sync.WaitGroup + keys := []string{"key1", "key2", "key3"} + + // Test concurrent sets + wg.Add(len(keys)) + for _, key := range keys { + go func(k string) { + defer wg.Done() + for i := 0; i < 100; i++ { + cache.Set(k) + } + }(key) + } + wg.Wait() + + // Verify all keys were set + for _, key := range keys { + found := cache.IsExists(key) + assert.True(t, found) + } + + // Test concurrent gets and sets + wg.Add(len(keys)) + for _, key := range keys { + go func(k string) { + defer wg.Done() + for i := 0; i < 100; i++ { + cache.IsExists(k) + cache.Set(k + "-new") + } + }(key) + } + wg.Wait() + + // Test concurrent deletes + wg.Add(len(keys)) + for _, key := range keys { + go func(k string) { + defer wg.Done() + for i := 0; i < 100; i++ { + cache.delete(k) + } + }(key) + } + wg.Wait() + + // Verify prefix counts under concurrent access + wg.Add(2) + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + cache.CountPrefix("key") + } + }() + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + cache.Set("key-x") + cache.Set("key-y") + cache.delete("key-x") + } + }() + wg.Wait() +} + +func TestTTL(t *testing.T) { + cache, _ := NewCache(100 * time.Millisecond) + + key := "ttl-key" + cache.Set(key) + + t.Run("key exists before TTL", func(t *testing.T) { + found := cache.IsExists(key) + assert.True(t, found) + }) + + t.Run("key expires after TTL", func(t *testing.T) { + time.Sleep(200 * time.Millisecond) + found := cache.IsExists(key) + assert.False(t, found) + }) +} diff --git a/plugin/action/cardinality/cardinality.go b/plugin/action/cardinality/cardinality.go index 88b077c2c..b99d2939d 100644 --- a/plugin/action/cardinality/cardinality.go +++ b/plugin/action/cardinality/cardinality.go @@ -7,7 +7,10 @@ import ( "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/fd" + "github.com/ozontech/file.d/metric" "github.com/ozontech/file.d/pipeline" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" ) type Plugin struct { @@ -15,6 +18,9 @@ type Plugin struct { config *Config keys []string fields []string + logger *zap.Logger + + cardinalityDiscardMetric *prometheus.GaugeVec } type Config struct { @@ -36,11 +42,42 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { return &Plugin{}, &Config{} } +func (p *Plugin) makeMetric(ctl *metric.Ctl, name, help string, labels ...string) *prometheus.GaugeVec { + if name == "" { + return nil + } + + uniq := make(map[string]struct{}) + labelNames := make([]string, 0, len(labels)) + for _, label := range labels { + if label == "" { + p.logger.Fatal("empty label name") + } + if _, ok := uniq[label]; ok { + p.logger.Fatal("metric labels must be unique") + } + uniq[label] = struct{}{} + + labelNames = append(labelNames, label) + } + + return ctl.RegisterGaugeVec(name, help, labelNames...) +} + +func (p *Plugin) registerMetrics(ctl *metric.Ctl) { + p.cardinalityDiscardMetric = p.makeMetric(ctl, + "unique_count", + "Number of uniqie values", + p.keys..., + ) +} + func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) { p.config = config.(*Config) + p.logger = params.Logger.Desugar() var err error - p.cache, err = NewCache(int64(p.config.Limit*10), p.config.TTL_) + p.cache, err = NewCache(p.config.TTL_) if err != nil { panic(err) } @@ -58,21 +95,24 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP p.fields = append(p.fields, cfg.ParseFieldSelector(string(fs))[0]) } } + + p.registerMetrics(params.MetricCtl) } func (p *Plugin) Stop() { - p.cache.Close() + } func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { - cacheKey := map[string]string{} + cacheKey := make(map[string]string, len(p.keys)) + for _, key := range p.keys { node := event.Root.Dig(key) value := node.AsString() cacheKey[key] = value } - cacheValue := map[string]string{} + cacheValue := make(map[string]string, len(p.fields)) for _, key := range p.fields { node := event.Root.Dig(key) value := node.AsString() @@ -83,9 +123,19 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { value := mapToStringSorted(cacheKey, cacheValue) p.cache.Set(value) - if p.cache.CountPrefix(key) > p.config.Limit { + keysCount := p.cache.CountPrefix(key) + + labelsValues := make([]string, 0, len(p.keys)) + for _, key := range p.keys { + if val, exists := cacheKey[key]; exists { + labelsValues = append(labelsValues, val) + } else { + labelsValues = append(labelsValues, "unknown") + } + } + p.cardinalityDiscardMetric.WithLabelValues(labelsValues...).Set(float64(keysCount)) + if p.config.Limit > 0 && keysCount > p.config.Limit { return pipeline.ActionDiscard - // TODO: add metrics } else { return pipeline.ActionPass } From 271497a9f09c483547355665f5a46cc0145a7622 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Tue, 19 Aug 2025 17:25:17 +0700 Subject: [PATCH 11/19] fix some bugs --- plugin/action/cardinality/cardinality.go | 11 ++--- plugin/action/cardinality/cardinality_test.go | 49 +++++++++++++++++++ 2 files changed, 54 insertions(+), 6 deletions(-) create mode 100644 plugin/action/cardinality/cardinality_test.go diff --git a/plugin/action/cardinality/cardinality.go b/plugin/action/cardinality/cardinality.go index b99d2939d..f1daf5e9d 100644 --- a/plugin/action/cardinality/cardinality.go +++ b/plugin/action/cardinality/cardinality.go @@ -107,15 +107,13 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { cacheKey := make(map[string]string, len(p.keys)) for _, key := range p.keys { - node := event.Root.Dig(key) - value := node.AsString() + value := pipeline.CloneString(event.Root.Dig(key).AsString()) cacheKey[key] = value } cacheValue := make(map[string]string, len(p.fields)) for _, key := range p.fields { - node := event.Root.Dig(key) - value := node.AsString() + value := pipeline.CloneString(event.Root.Dig(key).AsString()) cacheValue[key] = value } @@ -133,8 +131,9 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { labelsValues = append(labelsValues, "unknown") } } - p.cardinalityDiscardMetric.WithLabelValues(labelsValues...).Set(float64(keysCount)) + if p.config.Limit > 0 && keysCount > p.config.Limit { + p.cardinalityDiscardMetric.WithLabelValues(labelsValues...).Set(float64(keysCount)) return pipeline.ActionDiscard } else { return pipeline.ActionPass @@ -155,7 +154,7 @@ func mapToStringSorted(m, n map[string]string) string { if i > 0 { sb.WriteString(" ") } - sb.WriteString(k + ":" + m[k]) + sb.WriteString(k + ":" + n[k]) } sb.WriteString("]") return sb.String() diff --git a/plugin/action/cardinality/cardinality_test.go b/plugin/action/cardinality/cardinality_test.go new file mode 100644 index 000000000..d908a86fc --- /dev/null +++ b/plugin/action/cardinality/cardinality_test.go @@ -0,0 +1,49 @@ +package discard + +import ( + "sort" + "strings" + "testing" +) + +func TestMapToStringSorted(t *testing.T) { + tests := []struct { + name string + m map[string]string + n map[string]string + expected string + }{ + { + name: "empty maps", + m: map[string]string{}, + n: map[string]string{}, + expected: "map[];map[]", + }, + { + name: "simple maps", + m: map[string]string{"service": "test", "host": "localhost"}, + n: map[string]string{"value": "1", "level": "3"}, + expected: "map[host:localhost service:test];map[level:3 value:1]", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := mapToStringSorted(tt.m, tt.n) + if result != tt.expected { + t.Errorf("mapToStringSorted() = %v, want %v", result, tt.expected) + } + + // Verify the output is properly sorted + if strings.HasPrefix(result, "map[") && strings.HasSuffix(result, "]") { + content := result[4 : len(result)-1] + if content != "" { + pairs := strings.Split(content, " ") + if !sort.StringsAreSorted(pairs) { + t.Errorf("output pairs are not sorted: %v", pairs) + } + } + } + }) + } +} From 506319e2be3453e1bfa470d0060148c3f5e981b1 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 20 Aug 2025 13:51:41 +0700 Subject: [PATCH 12/19] actionRemoveFields in cardinality limit action --- plugin/action/cardinality/cardinality.go | 51 ++++++++++++++---------- 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/plugin/action/cardinality/cardinality.go b/plugin/action/cardinality/cardinality.go index f1daf5e9d..384b88475 100644 --- a/plugin/action/cardinality/cardinality.go +++ b/plugin/action/cardinality/cardinality.go @@ -20,12 +20,18 @@ type Plugin struct { fields []string logger *zap.Logger - cardinalityDiscardMetric *prometheus.GaugeVec + cardinalityDiscardCounter *prometheus.CounterVec } +const ( + actionDiscard = "discard" + actionRemoveFields = "remove_fields" +) + type Config struct { KeyFields []cfg.FieldSelector `json:"key" slice:"true" required:"true"` Fields []cfg.FieldSelector `json:"fields" slice:"true" required:"true"` + Action string `json:"action" default:"discard" options:"discard|remove_fields"` Limit int `json:"limit" default:"10000"` TTL cfg.Duration `json:"ttl" default:"1h" parse:"duration"` // * TTL_ time.Duration @@ -42,7 +48,7 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { return &Plugin{}, &Config{} } -func (p *Plugin) makeMetric(ctl *metric.Ctl, name, help string, labels ...string) *prometheus.GaugeVec { +func (p *Plugin) makeMetric(ctl *metric.Ctl, name, help string, labels ...string) *prometheus.CounterVec { if name == "" { return nil } @@ -61,13 +67,13 @@ func (p *Plugin) makeMetric(ctl *metric.Ctl, name, help string, labels ...string labelNames = append(labelNames, label) } - return ctl.RegisterGaugeVec(name, help, labelNames...) + return ctl.RegisterCounterVec(name, help, labelNames...) } func (p *Plugin) registerMetrics(ctl *metric.Ctl) { - p.cardinalityDiscardMetric = p.makeMetric(ctl, - "unique_count", - "Number of uniqie values", + p.cardinalityDiscardCounter = p.makeMetric(ctl, + "cardinality_discard_total", + "Total number of events discarded due to cardinality limits", p.keys..., ) } @@ -119,25 +125,30 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { key := mapToKey(cacheKey) value := mapToStringSorted(cacheKey, cacheValue) - p.cache.Set(value) - keysCount := p.cache.CountPrefix(key) - labelsValues := make([]string, 0, len(p.keys)) - for _, key := range p.keys { - if val, exists := cacheKey[key]; exists { - labelsValues = append(labelsValues, val) - } else { - labelsValues = append(labelsValues, "unknown") - } - } - if p.config.Limit > 0 && keysCount > p.config.Limit { - p.cardinalityDiscardMetric.WithLabelValues(labelsValues...).Set(float64(keysCount)) - return pipeline.ActionDiscard + labelsValues := make([]string, 0, len(p.keys)) + for _, key := range p.keys { + if val, exists := cacheKey[key]; exists { + labelsValues = append(labelsValues, val) + } else { + labelsValues = append(labelsValues, "unknown") + } + } + p.cardinalityDiscardCounter.WithLabelValues(labelsValues...).Inc() + switch p.config.Action { + case actionDiscard: + return pipeline.ActionDiscard + case actionRemoveFields: + for _, key := range p.fields { + event.Root.Dig(key).Suicide() + } + } } else { - return pipeline.ActionPass + p.cache.Set(value) } + return pipeline.ActionPass } func mapToStringSorted(m, n map[string]string) string { From 195a61831fe24e20ce6ca5f12cbec45fd44bae02 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 20 Aug 2025 13:56:59 +0700 Subject: [PATCH 13/19] rename cardinality to cardinality_limit --- cmd/file.d/file.d.go | 2 +- plugin/action/cardinality/README.idoc.md | 4 ---- plugin/action/cardinality_limit/README.idoc.md | 3 +++ .../action/{cardinality => cardinality_limit}/README.md | 0 plugin/action/{cardinality => cardinality_limit}/cache.go | 8 -------- .../{cardinality => cardinality_limit}/cache_test.go | 0 .../cardinality_limit.go} | 2 +- .../cardinality_limit_test.go} | 0 8 files changed, 5 insertions(+), 14 deletions(-) delete mode 100644 plugin/action/cardinality/README.idoc.md create mode 100644 plugin/action/cardinality_limit/README.idoc.md rename plugin/action/{cardinality => cardinality_limit}/README.md (100%) rename plugin/action/{cardinality => cardinality_limit}/cache.go (90%) rename plugin/action/{cardinality => cardinality_limit}/cache_test.go (100%) rename plugin/action/{cardinality/cardinality.go => cardinality_limit/cardinality_limit.go} (99%) rename plugin/action/{cardinality/cardinality_test.go => cardinality_limit/cardinality_limit_test.go} (100%) diff --git a/cmd/file.d/file.d.go b/cmd/file.d/file.d.go index 41c232dc8..7452092a2 100644 --- a/cmd/file.d/file.d.go +++ b/cmd/file.d/file.d.go @@ -19,7 +19,7 @@ import ( "github.com/ozontech/file.d/pipeline" _ "github.com/ozontech/file.d/plugin/action/add_file_name" _ "github.com/ozontech/file.d/plugin/action/add_host" - _ "github.com/ozontech/file.d/plugin/action/cardinality" + _ "github.com/ozontech/file.d/plugin/action/cardinality_limit" _ "github.com/ozontech/file.d/plugin/action/convert_date" _ "github.com/ozontech/file.d/plugin/action/convert_log_level" _ "github.com/ozontech/file.d/plugin/action/convert_utf8_bytes" diff --git a/plugin/action/cardinality/README.idoc.md b/plugin/action/cardinality/README.idoc.md deleted file mode 100644 index b7a08ee86..000000000 --- a/plugin/action/cardinality/README.idoc.md +++ /dev/null @@ -1,4 +0,0 @@ -# Cardinality plugin -@introduction - -> No config params diff --git a/plugin/action/cardinality_limit/README.idoc.md b/plugin/action/cardinality_limit/README.idoc.md new file mode 100644 index 000000000..c22026fd2 --- /dev/null +++ b/plugin/action/cardinality_limit/README.idoc.md @@ -0,0 +1,3 @@ +# Cardinality limit plugin +@introduction + diff --git a/plugin/action/cardinality/README.md b/plugin/action/cardinality_limit/README.md similarity index 100% rename from plugin/action/cardinality/README.md rename to plugin/action/cardinality_limit/README.md diff --git a/plugin/action/cardinality/cache.go b/plugin/action/cardinality_limit/cache.go similarity index 90% rename from plugin/action/cardinality/cache.go rename to plugin/action/cardinality_limit/cache.go index 8ae9e0f8a..5d4cbc2eb 100644 --- a/plugin/action/cardinality/cache.go +++ b/plugin/action/cardinality_limit/cache.go @@ -80,11 +80,3 @@ func (c *Cache) CountPrefix(prefix string) (count int) { } return } - -type PrefixCache interface { - Set(key string) bool - IsExists(key string) bool - Delete(key string) - CountPrefix(prefix string) int - Cleanup() int -} diff --git a/plugin/action/cardinality/cache_test.go b/plugin/action/cardinality_limit/cache_test.go similarity index 100% rename from plugin/action/cardinality/cache_test.go rename to plugin/action/cardinality_limit/cache_test.go diff --git a/plugin/action/cardinality/cardinality.go b/plugin/action/cardinality_limit/cardinality_limit.go similarity index 99% rename from plugin/action/cardinality/cardinality.go rename to plugin/action/cardinality_limit/cardinality_limit.go index 384b88475..3becf27d4 100644 --- a/plugin/action/cardinality/cardinality.go +++ b/plugin/action/cardinality_limit/cardinality_limit.go @@ -39,7 +39,7 @@ type Config struct { func init() { fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{ - Type: "cardinality", + Type: "cardinality_limit", Factory: factory, }) } diff --git a/plugin/action/cardinality/cardinality_test.go b/plugin/action/cardinality_limit/cardinality_limit_test.go similarity index 100% rename from plugin/action/cardinality/cardinality_test.go rename to plugin/action/cardinality_limit/cardinality_limit_test.go From c2b7efac28c27773731fd1db97570609137bf085 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Thu, 21 Aug 2025 18:46:10 +0700 Subject: [PATCH 14/19] add cardinality limit tests --- .../cardinality_limit/cardinality_limit.go | 6 +- .../cardinality_limit_test.go | 151 ++++++++++++++++++ 2 files changed, 156 insertions(+), 1 deletion(-) diff --git a/plugin/action/cardinality_limit/cardinality_limit.go b/plugin/action/cardinality_limit/cardinality_limit.go index 3becf27d4..6106b399f 100644 --- a/plugin/action/cardinality_limit/cardinality_limit.go +++ b/plugin/action/cardinality_limit/cardinality_limit.go @@ -95,6 +95,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP } } + if len(p.config.Fields) == 0 { + p.logger.Fatal("you have to set key fields") + } + p.fields = make([]string, 0, len(p.config.Fields)) for _, fs := range p.config.Fields { if fs != "" { @@ -127,7 +131,7 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { value := mapToStringSorted(cacheKey, cacheValue) keysCount := p.cache.CountPrefix(key) - if p.config.Limit > 0 && keysCount > p.config.Limit { + if p.config.Limit > 0 && keysCount >= p.config.Limit { labelsValues := make([]string, 0, len(p.keys)) for _, key := range p.keys { if val, exists := cacheKey[key]; exists { diff --git a/plugin/action/cardinality_limit/cardinality_limit_test.go b/plugin/action/cardinality_limit/cardinality_limit_test.go index d908a86fc..4f8c4b796 100644 --- a/plugin/action/cardinality_limit/cardinality_limit_test.go +++ b/plugin/action/cardinality_limit/cardinality_limit_test.go @@ -1,9 +1,17 @@ package discard import ( + "fmt" "sort" "strings" + "sync" "testing" + "time" + + "github.com/ozontech/file.d/cfg" + "github.com/ozontech/file.d/pipeline" + "github.com/ozontech/file.d/test" + "github.com/stretchr/testify/assert" ) func TestMapToStringSorted(t *testing.T) { @@ -47,3 +55,146 @@ func TestMapToStringSorted(t *testing.T) { }) } } + +func TestCardinalityLimitDiscard(t *testing.T) { + limit := 10 + config := &Config{ + KeyFields: []cfg.FieldSelector{"host"}, + Fields: []cfg.FieldSelector{"i"}, + Limit: limit, + Action: actionDiscard, + TTL_: 1 * time.Hour, + } + + p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) + + inEventsCnt := 0 + inWg := sync.WaitGroup{} + genEventsCnt := limit + 10 + inWg.Add(genEventsCnt) + + input.SetInFn(func() { + defer inWg.Done() + inEventsCnt++ + }) + + outEventsCnt := 0 + output.SetOutFn(func(e *pipeline.Event) { + outEventsCnt++ + }) + + for i := 0; i < genEventsCnt; i++ { + json := fmt.Sprintf(`{"host":"localhost","i":"%d"}`, i) + input.In(10, "test", test.NewOffset(0), []byte(json)) + } + inWg.Wait() + time.Sleep(100 * time.Millisecond) + + p.Stop() + assert.Equal(t, inEventsCnt, genEventsCnt, "wrong in events count") + assert.Equal(t, limit, outEventsCnt, "wrong out events count") +} + +func TestCardinalityLimitRemoveFields(t *testing.T) { + limit := 10 + config := &Config{ + KeyFields: []cfg.FieldSelector{"host"}, + Fields: []cfg.FieldSelector{"i"}, + Limit: limit, + Action: actionRemoveFields, + TTL_: 1 * time.Hour, + } + + p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) + + inEventsCnt := 0 + inWg := sync.WaitGroup{} + genEventsCnt := limit + 10 + inWg.Add(genEventsCnt) + + input.SetInFn(func() { + defer inWg.Done() + inEventsCnt++ + }) + + outEventsCnt := 0 + outWg := sync.WaitGroup{} + outWg.Add(genEventsCnt) + output.SetOutFn(func(e *pipeline.Event) { + defer outWg.Done() + + // check exists field + value := pipeline.CloneString(e.Root.Dig(string(config.Fields[0])).AsString()) + + if value != "" { + outEventsCnt++ + } + }) + + for i := 0; i < genEventsCnt; i++ { + json := fmt.Sprintf(`{"host":"localhost","i":"%d"}`, i) + input.In(10, "test", test.NewOffset(0), []byte(json)) + } + inWg.Wait() + outWg.Wait() + + p.Stop() + assert.Equal(t, inEventsCnt, genEventsCnt, "wrong in events count") + assert.Equal(t, limit, outEventsCnt, "wrong out events count") +} + +func TestCardinalityLimitDiscardIfNoSetKeyFields(t *testing.T) { + limit := 10 + config := &Config{ + KeyFields: []cfg.FieldSelector{}, + Fields: []cfg.FieldSelector{"i"}, + Limit: limit, + Action: actionDiscard, + TTL_: 1 * time.Hour, + } + + p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) + + inEventsCnt := 0 + inWg := sync.WaitGroup{} + genEventsCnt := limit + 10 + inWg.Add(genEventsCnt) + + input.SetInFn(func() { + defer inWg.Done() + inEventsCnt++ + }) + + outEventsCnt := 0 + output.SetOutFn(func(e *pipeline.Event) { + outEventsCnt++ + }) + + for i := 0; i < genEventsCnt; i++ { + json := fmt.Sprintf(`{"host":"localhost%d","i":"%d"}`, i, i) + input.In(10, "test", test.NewOffset(0), []byte(json)) + } + inWg.Wait() + time.Sleep(100 * time.Millisecond) + + p.Stop() + assert.Equal(t, inEventsCnt, genEventsCnt, "wrong in events count") + assert.Equal(t, limit, outEventsCnt, "wrong out events count") +} + +func TestSetAndCountPrefix(t *testing.T) { + cache, _ := NewCache(time.Minute) + + cacheKey := map[string]string{ + "host": "localhost", + } + cacheValue := map[string]string{ + "i": "0", + } + key := mapToKey(cacheKey) + value := mapToStringSorted(cacheKey, cacheValue) + cache.Set(value) + + keysCount := cache.CountPrefix(key) + assert.Equal(t, 1, keysCount, "wrong in events count") +} From 1a1c370ddf17945cefbe82d57e0a6ec1cc559150 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Thu, 21 Aug 2025 19:25:34 +0700 Subject: [PATCH 15/19] metric prefix --- .../cardinality_limit/cardinality_limit.go | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/plugin/action/cardinality_limit/cardinality_limit.go b/plugin/action/cardinality_limit/cardinality_limit.go index 6106b399f..1ec31669a 100644 --- a/plugin/action/cardinality_limit/cardinality_limit.go +++ b/plugin/action/cardinality_limit/cardinality_limit.go @@ -1,6 +1,7 @@ package discard import ( + "fmt" "sort" "strings" "time" @@ -29,12 +30,13 @@ const ( ) type Config struct { - KeyFields []cfg.FieldSelector `json:"key" slice:"true" required:"true"` - Fields []cfg.FieldSelector `json:"fields" slice:"true" required:"true"` - Action string `json:"action" default:"discard" options:"discard|remove_fields"` - Limit int `json:"limit" default:"10000"` - TTL cfg.Duration `json:"ttl" default:"1h" parse:"duration"` // * - TTL_ time.Duration + KeyFields []cfg.FieldSelector `json:"key" slice:"true" required:"true"` + Fields []cfg.FieldSelector `json:"fields" slice:"true" required:"true"` + Action string `json:"action" default:"discard" options:"discard|remove_fields"` + MetricPrefix string `json:"metric_prefix" default:""` + Limit int `json:"limit" default:"10000"` + TTL cfg.Duration `json:"ttl" default:"1h" parse:"duration"` // * + TTL_ time.Duration } func init() { @@ -70,9 +72,15 @@ func (p *Plugin) makeMetric(ctl *metric.Ctl, name, help string, labels ...string return ctl.RegisterCounterVec(name, help, labelNames...) } -func (p *Plugin) registerMetrics(ctl *metric.Ctl) { +func (p *Plugin) registerMetrics(ctl *metric.Ctl, prefix string) { + var metricName string + if prefix == "" { + metricName = "cardinality_discard_total" + } else { + metricName = fmt.Sprintf(`cardinality_discard_%s_total`, prefix) + } p.cardinalityDiscardCounter = p.makeMetric(ctl, - "cardinality_discard_total", + metricName, "Total number of events discarded due to cardinality limits", p.keys..., ) @@ -106,7 +114,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginP } } - p.registerMetrics(params.MetricCtl) + p.registerMetrics(params.MetricCtl, p.config.MetricPrefix) } func (p *Plugin) Stop() { From e3601c8f9a9c9605d566c431166679b2311c0e2c Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Fri, 29 Aug 2025 13:38:59 +0700 Subject: [PATCH 16/19] fix prometheus output error messages --- plugin/output/prometheus/prometheus.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin/output/prometheus/prometheus.go b/plugin/output/prometheus/prometheus.go index 9bc6c322c..b000b827e 100644 --- a/plugin/output/prometheus/prometheus.go +++ b/plugin/output/prometheus/prometheus.go @@ -242,7 +242,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP level = zapcore.ErrorLevel } - p.logger.Log(level, "can't send data to loki", zap.Error(err), + p.logger.Log(level, "can't send data to Prometheus", zap.Error(err), zap.Int("retries", p.config.Retry)) } @@ -297,7 +297,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err err := p.send(root) if err != nil { p.sendErrorMetric.WithLabelValues().Inc() - p.logger.Error("can't send data to Loki", zap.String("address", p.config.Endpoint), zap.Error(err)) + p.logger.Error("can't send data to Prometheus", zap.String("address", p.config.Endpoint), zap.Error(err)) } else { p.logger.Debug("successfully sent", zap.String("data", string(data.outBuf))) } From dfdae238c1df5739be6785c759b526185282138b Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Mon, 1 Sep 2025 18:57:51 +0700 Subject: [PATCH 17/19] add debug logs && fix too old sample --- plugin/output/prometheus/metric_collector.go | 42 ++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/plugin/output/prometheus/metric_collector.go b/plugin/output/prometheus/metric_collector.go index 7d1e0dc54..48ebf31cc 100644 --- a/plugin/output/prometheus/metric_collector.go +++ b/plugin/output/prometheus/metric_collector.go @@ -68,6 +68,28 @@ func (p *metricCollector) handleMetric(labels []promwrite.Label, value float64, shouldSend = prevTimestampSec < currentTimestampSec if shouldSend { + prevMetric.timestamp = max(prevMetric.timestamp, timestamp-1_000_000_000) + + zapLabels := make([]zap.Field, 0, len(labels)+2) + timeMetric := time.Unix(0, prevMetric.timestamp) + + zapLabels = append(zapLabels, zap.Time("prom_timestamp", timeMetric)) + zapLabels = append(zapLabels, zap.Time("current_timestamp", time.Unix(0, timestamp))) + for i := range labels { + zapLabels = append(zapLabels, zap.String("prom_label_"+labels[i].Name, labels[i].Value)) + } + p.logger.Info( + "send metric", + zapLabels..., + ) + + if time.Since(timeMetric) > 5*time.Minute { + p.logger.Error( + "too old sample", + zapLabels..., + ) + } + values = append(values, createTimeSeries(labels, prevMetric)) } } else { @@ -97,6 +119,26 @@ func (p *metricCollector) flushOldMetrics() { metric := value.(metricValue) if now.Sub(metric.lastUpdateTime) > p.timeout && !metric.lastValueIsSended { labels := keyToLabels(key.(string)) + + zapLabels := make([]zap.Field, 0, len(labels)+2) + timeMetric := time.Unix(0, metric.timestamp) + zapLabels = append(zapLabels, zap.Time("prom_timestamp", timeMetric)) + zapLabels = append(zapLabels, zap.Bool("flush_old_metric", true)) + for i := range labels { + zapLabels = append(zapLabels, zap.String("prom_label_"+labels[i].Name, labels[i].Value)) + } + p.logger.Info( + "send metric", + zapLabels..., + ) + + if time.Since(timeMetric) > 10*time.Minute { + p.logger.Error( + "too old sample", + zapLabels..., + ) + } + toSend = append(toSend, createTimeSeries(labels, metric)) metric.lastValueIsSended = true From 26fb142300c7c2e415746d70e5fe002d68ae5a17 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Mon, 1 Sep 2025 20:18:09 +0700 Subject: [PATCH 18/19] add metric_collector_test --- plugin/output/prometheus/metric_collector.go | 42 -- .../prometheus/metric_collector_test.go | 397 ++++++++++++++++++ plugin/output/prometheus/prometheus.go | 8 +- 3 files changed, 401 insertions(+), 46 deletions(-) create mode 100644 plugin/output/prometheus/metric_collector_test.go diff --git a/plugin/output/prometheus/metric_collector.go b/plugin/output/prometheus/metric_collector.go index 48ebf31cc..d4ed0445c 100644 --- a/plugin/output/prometheus/metric_collector.go +++ b/plugin/output/prometheus/metric_collector.go @@ -69,27 +69,6 @@ func (p *metricCollector) handleMetric(labels []promwrite.Label, value float64, if shouldSend { prevMetric.timestamp = max(prevMetric.timestamp, timestamp-1_000_000_000) - - zapLabels := make([]zap.Field, 0, len(labels)+2) - timeMetric := time.Unix(0, prevMetric.timestamp) - - zapLabels = append(zapLabels, zap.Time("prom_timestamp", timeMetric)) - zapLabels = append(zapLabels, zap.Time("current_timestamp", time.Unix(0, timestamp))) - for i := range labels { - zapLabels = append(zapLabels, zap.String("prom_label_"+labels[i].Name, labels[i].Value)) - } - p.logger.Info( - "send metric", - zapLabels..., - ) - - if time.Since(timeMetric) > 5*time.Minute { - p.logger.Error( - "too old sample", - zapLabels..., - ) - } - values = append(values, createTimeSeries(labels, prevMetric)) } } else { @@ -119,28 +98,7 @@ func (p *metricCollector) flushOldMetrics() { metric := value.(metricValue) if now.Sub(metric.lastUpdateTime) > p.timeout && !metric.lastValueIsSended { labels := keyToLabels(key.(string)) - - zapLabels := make([]zap.Field, 0, len(labels)+2) - timeMetric := time.Unix(0, metric.timestamp) - zapLabels = append(zapLabels, zap.Time("prom_timestamp", timeMetric)) - zapLabels = append(zapLabels, zap.Bool("flush_old_metric", true)) - for i := range labels { - zapLabels = append(zapLabels, zap.String("prom_label_"+labels[i].Name, labels[i].Value)) - } - p.logger.Info( - "send metric", - zapLabels..., - ) - - if time.Since(timeMetric) > 10*time.Minute { - p.logger.Error( - "too old sample", - zapLabels..., - ) - } - toSend = append(toSend, createTimeSeries(labels, metric)) - metric.lastValueIsSended = true p.collector.Store(key, metric) } diff --git a/plugin/output/prometheus/metric_collector_test.go b/plugin/output/prometheus/metric_collector_test.go new file mode 100644 index 000000000..cf0c33227 --- /dev/null +++ b/plugin/output/prometheus/metric_collector_test.go @@ -0,0 +1,397 @@ +package prometheus + +import ( + "errors" + "strings" + "sync" + "testing" + "time" + + "github.com/castai/promwrite" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" +) + +// TestStorageSender implements storageSender for testing +type TestStorageSender struct { + sentMetrics [][]promwrite.TimeSeries + mu sync.Mutex + returnError error +} + +func (t *TestStorageSender) sendToStorage(values []promwrite.TimeSeries) error { + t.mu.Lock() + defer t.mu.Unlock() + t.sentMetrics = append(t.sentMetrics, values) + return t.returnError +} + +func (t *TestStorageSender) getSentMetrics() [][]promwrite.TimeSeries { + t.mu.Lock() + defer t.mu.Unlock() + return t.sentMetrics +} + +func (t *TestStorageSender) reset() { + t.mu.Lock() + defer t.mu.Unlock() + t.sentMetrics = nil + t.returnError = nil +} + +func (t *TestStorageSender) setError(err error) { + t.mu.Lock() + defer t.mu.Unlock() + t.returnError = err +} + +func TestMetricCollector(t *testing.T) { + t.Run("labelsToKey and keyToLabels roundtrip", func(t *testing.T) { + labels := []promwrite.Label{ + {Name: "job", Value: "test"}, + {Name: "instance", Value: "localhost"}, + {Name: "__name__", Value: "test_metric"}, + } + + key := labelsToKey(labels) + convertedLabels := keyToLabels(key) + + assert.Len(t, convertedLabels, 3) + // Since labels are sorted in key generation, we need to check values + labelMap := make(map[string]string) + for _, l := range convertedLabels { + labelMap[l.Name] = l.Value + } + assert.Equal(t, "test", labelMap["job"]) + assert.Equal(t, "localhost", labelMap["instance"]) + assert.Equal(t, "test_metric", labelMap["__name__"]) + }) + + t.Run("handleMetric counter accumulation", func(t *testing.T) { + logger := zaptest.NewLogger(t) + testSender := &TestStorageSender{} + collector := newCollector(5*time.Minute, testSender, logger) + + labels := []promwrite.Label{ + {Name: "__name__", Value: "test_counter"}, + {Name: "job", Value: "test"}, + } + + // First value - should not be sent + now := time.Now() + values := collector.handleMetric(labels, 10.0, now.UnixNano(), "counter") + assert.Empty(t, values) + assert.Empty(t, testSender.getSentMetrics()) + + // Second value in same time window - should accumulate but not send + values = collector.handleMetric(labels, 5.0, now.UnixNano(), "counter") + assert.Empty(t, values) + assert.Empty(t, testSender.getSentMetrics()) + + // Third value in next time window - should send accumulated value + nextTime := now.Add(time.Second) + values = collector.handleMetric(labels, 3.0, nextTime.UnixNano(), "counter") + assert.Len(t, values, 1) + assert.Equal(t, 15.0, values[0].Sample.Value) // 10 + 5 + }) + + t.Run("handleMetric gauge behavior", func(t *testing.T) { + logger := zaptest.NewLogger(t) + testSender := &TestStorageSender{} + collector := newCollector(5*time.Minute, testSender, logger) + + labels := []promwrite.Label{ + {Name: "__name__", Value: "test_gauge"}, + {Name: "job", Value: "test"}, + } + + now := time.Now() + + // First value - should not be sent + values := collector.handleMetric(labels, 100.0, now.UnixNano(), "gauge") + assert.Empty(t, values) + + // Second value in same time window - should replace but not send + values = collector.handleMetric(labels, 200.0, now.UnixNano(), "gauge") + assert.Empty(t, values) + + // Third value in next time window - should send latest value + nextTime := now.Add(time.Second) + values = collector.handleMetric(labels, 300.0, nextTime.UnixNano(), "gauge") + assert.Len(t, values, 1) + assert.Equal(t, 200.0, values[0].Sample.Value) // Latest value before time window change + }) + + t.Run("flushOldMetrics manual check", func(t *testing.T) { + logger := zaptest.NewLogger(t) + testSender := &TestStorageSender{} + shortTimeout := 100 * time.Millisecond + collector := newCollector(shortTimeout, testSender, logger) + + labels := []promwrite.Label{ + {Name: "__name__", Value: "test_flush"}, + {Name: "job", Value: "test"}, + } + + // Add a metric that will timeout + collector.handleMetric(labels, 42.0, time.Now().UnixNano(), "gauge") + + // Wait for timeout + a bit more + time.Sleep(shortTimeout + 50*time.Millisecond) + + // Manually trigger the flush logic that would be called by the ticker + var toSend []promwrite.TimeSeries + now := time.Now() + collector.collector.Range(func(key, value interface{}) bool { + metric := value.(metricValue) + if now.Sub(metric.lastUpdateTime) > shortTimeout && !metric.lastValueIsSended { + labels := keyToLabels(key.(string)) + toSend = append(toSend, createTimeSeries(labels, metric)) + } + return true + }) + + assert.Len(t, toSend, 1) + assert.Equal(t, 42.0, toSend[0].Sample.Value) + }) + + t.Run("flushOldMetrics sends timed out metrics", func(t *testing.T) { + logger := zaptest.NewLogger(t) + testSender := &TestStorageSender{} + shortTimeout := 1 * time.Second + collector := newCollector(shortTimeout, testSender, logger) + + labels := []promwrite.Label{ + {Name: "__name__", Value: "test_flush"}, + {Name: "job", Value: "test"}, + } + + // Add a metric that will timeout + sendedValues := collector.handleMetric(labels, 42.0, time.Now().UnixNano(), "gauge") + assert.Len(t, sendedValues, 0) + + // Wait for timeout + a bit more + time.Sleep(shortTimeout + 1*time.Second) + now := time.Now() + + var toSend []promwrite.TimeSeries + collector.collector.Range(func(key, value interface{}) bool { + metric := value.(metricValue) + if now.Sub(metric.lastUpdateTime) > shortTimeout && !metric.lastValueIsSended { + labels := keyToLabels(key.(string)) + toSend = append(toSend, createTimeSeries(labels, metric)) + } + return true + }) + + assert.Len(t, toSend, 0) + }) + + t.Run("shutdown flushes all metrics", func(t *testing.T) { + logger := zaptest.NewLogger(t) + testSender := &TestStorageSender{} + collector := newCollector(5*time.Minute, testSender, logger) + + labels1 := []promwrite.Label{ + {Name: "__name__", Value: "metric1"}, + {Name: "job", Value: "test"}, + } + labels2 := []promwrite.Label{ + {Name: "__name__", Value: "metric2"}, + {Name: "job", Value: "test"}, + } + + // Add multiple metrics + collector.handleMetric(labels1, 10.0, time.Now().UnixNano(), "gauge") + collector.handleMetric(labels2, 20.0, time.Now().UnixNano(), "gauge") + + collector.shutdown() + + sentMetrics := testSender.getSentMetrics() + assert.Len(t, sentMetrics, 1) + assert.Len(t, sentMetrics[0], 2) + }) + + t.Run("concurrent access", func(t *testing.T) { + logger := zaptest.NewLogger(t) + testSender := &TestStorageSender{} + collector := newCollector(5*time.Minute, testSender, logger) + + var wg sync.WaitGroup + numGoroutines := 10 + numMetrics := 100 + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for j := 0; j < numMetrics; j++ { + labels := []promwrite.Label{ + {Name: "__name__", Value: "concurrent_metric"}, + {Name: "worker", Value: string(rune(workerID))}, + {Name: "index", Value: string(rune(j))}, + } + collector.handleMetric(labels, float64(j), time.Now().UnixNano(), "counter") + } + }(i) + } + + wg.Wait() + + // Verify all metrics are stored + count := 0 + collector.collector.Range(func(_, _ interface{}) bool { + count++ + return true + }) + + assert.Equal(t, numGoroutines*numMetrics, count) + }) + + t.Run("storage sender error handling", func(t *testing.T) { + logger := zap.NewNop() + testSender := &TestStorageSender{} + testSender.setError(errors.New("storage error")) + collector := newCollector(100*time.Millisecond, testSender, logger) + + labels := []promwrite.Label{ + {Name: "__name__", Value: "error_metric"}, + {Name: "job", Value: "test"}, + } + + collector.handleMetric(labels, 99.0, time.Now().UnixNano(), "gauge") + + // Wait for flush + time.Sleep(150 * time.Millisecond) + + // Error should be handled gracefully (logged but not panic) + sentMetrics := testSender.getSentMetrics() + assert.Equal(t, [][]promwrite.TimeSeries(nil), sentMetrics) + }) + + t.Run("metric value struct validation", func(t *testing.T) { + now := time.Now() + mv := metricValue{ + value: 42.0, + timestamp: now.UnixNano(), + lastUpdateTime: now, + lastValueIsSended: false, + } + + labels := []promwrite.Label{ + {Name: "__name__", Value: "test"}, + } + + ts := createTimeSeries(labels, mv) + assert.Equal(t, 42.0, ts.Sample.Value) + assert.Equal(t, now.Truncate(time.Nanosecond), ts.Sample.Time.Truncate(time.Nanosecond)) + assert.Equal(t, labels, ts.Labels) + }) + + t.Run("label sorting in key generation", func(t *testing.T) { + unsortedLabels := []promwrite.Label{ + {Name: "z", Value: "last"}, + {Name: "a", Value: "first"}, + {Name: "m", Value: "middle"}, + } + + key := labelsToKey(unsortedLabels) + + // Key should be sorted alphabetically by label name + assert.True(t, strings.HasPrefix(key, "a=first,")) + assert.Contains(t, key, "m=middle,") + assert.True(t, strings.HasSuffix(key, "z=last,")) + }) +} + +func TestEdgeCases(t *testing.T) { + t.Run("very old timestamps", func(t *testing.T) { + logger := zaptest.NewLogger(t) + testSender := &TestStorageSender{} + collector := newCollector(5*time.Minute, testSender, logger) + + labels := []promwrite.Label{ + {Name: "__name__", Value: "old_metric"}, + } + + // Very old timestamp + oldTime := time.Now().Add(-1 * time.Hour) + collector.handleMetric(labels, 1.0, oldTime.UnixNano(), "gauge") + + now := time.Now() + // New timestamp to trigger send + values := collector.handleMetric(labels, 2.0, now.UnixNano(), "gauge") + + // Should have sent the old value despite being very old (timestamp: 1 second before now) + assert.Len(t, values, 1) + assert.Equal(t, 1.0, values[0].Sample.Value) + assert.Equal(t, float64(1), now.Sub(values[0].Sample.Time).Seconds()) + }) + + t.Run("metric type handling", func(t *testing.T) { + logger := zaptest.NewLogger(t) + testSender := &TestStorageSender{} + collector := newCollector(5*time.Minute, testSender, logger) + + labels := []promwrite.Label{ + {Name: "__name__", Value: "test_type"}, + } + + now := time.Now() + + // Test unknown metric type (should behave like gauge) + collector.handleMetric(labels, 10.0, now.UnixNano(), "unknown") + values := collector.handleMetric(labels, 20.0, now.Add(time.Second).UnixNano(), "unknown") + + // Should send the last value (10.0) since time window advanced + if len(values) > 0 { + assert.Equal(t, 10.0, values[0].Sample.Value) + } + }) +} + +func TestCreateTimeSeries(t *testing.T) { + t.Run("createTimeSeries with valid metricValue", func(t *testing.T) { + now := time.Now() + mv := metricValue{ + value: 123.45, + timestamp: now.UnixNano(), + } + + labels := []promwrite.Label{ + {Name: "__name__", Value: "test_metric"}, + {Name: "instance", Value: "localhost"}, + } + + ts := createTimeSeries(labels, mv) + + assert.Equal(t, labels, ts.Labels) + assert.Equal(t, 123.45, ts.Sample.Value) + assert.Equal(t, now.Truncate(time.Nanosecond), ts.Sample.Time.Truncate(time.Nanosecond)) + }) +} + +// Benchmark tests +func BenchmarkLabelsToKey(b *testing.B) { + labels := []promwrite.Label{ + {Name: "__name__", Value: "benchmark_metric"}, + {Name: "job", Value: "benchmark"}, + {Name: "instance", Value: "localhost:9090"}, + {Name: "environment", Value: "production"}, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + labelsToKey(labels) + } +} + +func BenchmarkKeyToLabels(b *testing.B) { + key := "__name__=test_metric,environment=production,instance=localhost:9090,job=test," + + b.ResetTimer() + for i := 0; i < b.N; i++ { + keyToLabels(key) + } +} diff --git a/plugin/output/prometheus/prometheus.go b/plugin/output/prometheus/prometheus.go index b000b827e..7a63bd69b 100644 --- a/plugin/output/prometheus/prometheus.go +++ b/plugin/output/prometheus/prometheus.go @@ -191,7 +191,7 @@ type Plugin struct { batcher *pipeline.RetriableBatcher // plugin metrics - sendErrorMetric *prometheus.CounterVec + sendErrorMetric prometheus.Counter collector *metricCollector } @@ -296,7 +296,7 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err err := p.send(root) if err != nil { - p.sendErrorMetric.WithLabelValues().Inc() + p.sendErrorMetric.Inc() p.logger.Error("can't send data to Prometheus", zap.String("address", p.config.Endpoint), zap.Error(err)) } else { p.logger.Debug("successfully sent", zap.String("data", string(data.outBuf))) @@ -353,8 +353,8 @@ func (p *Plugin) send(root *insaneJSON.Root) error { err := p.sendToStorage(values) if err != nil { - // TODO: add metrics if strings.Contains(err.Error(), "out of order sample") || strings.Contains(err.Error(), "duplicate sample for") { + p.sendErrorMetric.Inc() p.logger.Warn("can't send data to Prometheus", zap.Error(err)) return nil } @@ -369,7 +369,7 @@ func (p *Plugin) sendToStorage(values []promwrite.TimeSeries) error { } func (p *Plugin) registerMetrics(ctl *metric.Ctl) { - p.sendErrorMetric = ctl.RegisterCounterVec("output_loki_send_error", "Total Loki send errors") + p.sendErrorMetric = ctl.RegisterCounter("output_prometheus_send_error", "Total Prometheus send errors") } func (p *Plugin) prepareClient() { From 4cd3c68c46412468377c09bad87e414d2b3c960a Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Mon, 8 Sep 2025 19:55:09 +0700 Subject: [PATCH 19/19] keep-alive in prometheus output --- plugin/output/prometheus/prometheus.go | 39 +++++++++++++++++++++----- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/plugin/output/prometheus/prometheus.go b/plugin/output/prometheus/prometheus.go index 7a63bd69b..4b0f2f532 100644 --- a/plugin/output/prometheus/prometheus.go +++ b/plugin/output/prometheus/prometheus.go @@ -2,6 +2,8 @@ package prometheus import ( "context" + "net" + "net/http" "strings" "time" @@ -187,13 +189,19 @@ type Plugin struct { ctx context.Context cancel context.CancelFunc - client *promwrite.Client + client PrometheusClient batcher *pipeline.RetriableBatcher // plugin metrics sendErrorMetric prometheus.Counter collector *metricCollector + + router *pipeline.Router +} + +type PrometheusClient interface { + Write(ctx context.Context, req *promwrite.WriteRequest, options ...promwrite.WriteOption) (*promwrite.WriteResponse, error) } func init() { @@ -228,15 +236,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MetricCtl: params.MetricCtl, } + p.router = params.Router backoffOpts := pipeline.BackoffOpts{ - MinRetention: p.config.Retention_, - Multiplier: float64(p.config.RetentionExponentMultiplier), - AttemptNum: p.config.Retry, + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: p.config.Retry, + IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(), } - onError := func(err error) { + onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert { + if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel @@ -244,6 +254,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger.Log(level, "can't send data to Prometheus", zap.Error(err), zap.Int("retries", p.config.Retry)) + + for i := range events { + p.router.Fail(events[i]) + } } p.batcher = pipeline.NewRetriableBatcher( @@ -373,5 +387,16 @@ func (p *Plugin) registerMetrics(ctl *metric.Ctl) { } func (p *Plugin) prepareClient() { - p.client = promwrite.NewClient(p.config.Endpoint) + customClient := &http.Client{ + Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: p.config.ConnectionTimeout_, + KeepAlive: p.config.KeepAlive.MaxConnDuration_, + }).DialContext, + IdleConnTimeout: p.config.KeepAlive.MaxIdleConnDuration_, + }, + Timeout: p.config.RequestTimeout_, + } + + p.client = promwrite.NewClient(p.config.Endpoint, promwrite.HttpClient(customClient)) }