diff --git a/cmd/file.d/file.d.go b/cmd/file.d/file.d.go index 7d83102a1..7452092a2 100644 --- a/cmd/file.d/file.d.go +++ b/cmd/file.d/file.d.go @@ -19,12 +19,14 @@ import ( "github.com/ozontech/file.d/pipeline" _ "github.com/ozontech/file.d/plugin/action/add_file_name" _ "github.com/ozontech/file.d/plugin/action/add_host" + _ "github.com/ozontech/file.d/plugin/action/cardinality_limit" _ "github.com/ozontech/file.d/plugin/action/convert_date" _ "github.com/ozontech/file.d/plugin/action/convert_log_level" _ "github.com/ozontech/file.d/plugin/action/convert_utf8_bytes" _ "github.com/ozontech/file.d/plugin/action/debug" _ "github.com/ozontech/file.d/plugin/action/decode" _ "github.com/ozontech/file.d/plugin/action/discard" + _ "github.com/ozontech/file.d/plugin/action/event_to_metrics" _ "github.com/ozontech/file.d/plugin/action/flatten" _ "github.com/ozontech/file.d/plugin/action/hash" _ "github.com/ozontech/file.d/plugin/action/join" @@ -59,6 +61,7 @@ import ( _ "github.com/ozontech/file.d/plugin/output/kafka" _ "github.com/ozontech/file.d/plugin/output/loki" _ "github.com/ozontech/file.d/plugin/output/postgres" + _ "github.com/ozontech/file.d/plugin/output/prometheus" _ "github.com/ozontech/file.d/plugin/output/s3" _ "github.com/ozontech/file.d/plugin/output/splunk" _ "github.com/ozontech/file.d/plugin/output/stdout" diff --git a/go.mod b/go.mod index 468b32619..e2b6f1c99 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/bitly/go-simplejson v0.5.1 github.com/bmatcuk/doublestar/v4 v4.0.2 github.com/bufbuild/protocompile v0.13.0 + github.com/castai/promwrite v0.5.0 github.com/cenkalti/backoff/v4 v4.3.0 github.com/cespare/xxhash/v2 v2.3.0 github.com/dominikbraun/graph v0.23.0 @@ -30,6 +31,7 @@ require ( github.com/klauspost/compress v1.17.8 github.com/minio/minio-go v6.0.14+incompatible github.com/ozontech/insane-json v0.1.9 + github.com/plar/go-adaptive-radix-tree/v2 v2.0.3 github.com/prometheus/client_golang v1.16.0 github.com/prometheus/client_model v0.3.0 github.com/prometheus/common v0.42.0 @@ -67,11 +69,11 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cilium/ebpf v0.9.1 // indirect github.com/containerd/cgroups/v3 v3.0.1 // indirect - github.com/coreos/go-systemd/v22 v22.3.2 // indirect + github.com/coreos/go-systemd/v22 v22.4.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dmarkham/enumer v1.5.8 // indirect - github.com/docker/go-units v0.4.0 // indirect + github.com/docker/go-units v0.5.0 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/go-faster/city v1.0.1 // indirect github.com/go-faster/errors v0.6.1 // indirect @@ -85,6 +87,7 @@ require ( github.com/godbus/dbus/v5 v5.0.4 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -98,7 +101,7 @@ require ( github.com/hashicorp/go-sockaddr v1.0.7 // indirect github.com/hashicorp/go-version v1.6.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect - github.com/imdario/mergo v0.3.11 // indirect + github.com/imdario/mergo v0.3.12 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect @@ -122,6 +125,7 @@ require ( github.com/pascaldekloe/name v1.0.1 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/prometheus v0.40.3 // indirect github.com/ryanuber/go-glob v1.0.0 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/sirupsen/logrus v1.8.1 // indirect @@ -143,14 +147,13 @@ require ( golang.org/x/mod v0.17.0 // indirect golang.org/x/oauth2 v0.10.0 // indirect golang.org/x/sync v0.11.0 // indirect - golang.org/x/sys v0.30.0 // indirect + golang.org/x/sys v0.31.0 // indirect golang.org/x/term v0.29.0 // indirect golang.org/x/text v0.22.0 // indirect golang.org/x/time v0.10.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect google.golang.org/appengine v1.6.7 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - gopkg.in/ini.v1 v1.62.0 // indirect k8s.io/klog/v2 v2.110.1 // indirect k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect diff --git a/go.sum b/go.sum index bb0a9091c..20aeae6ed 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,8 @@ github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/bufbuild/protocompile v0.13.0 h1:6cwUB0Y2tSvmNxsbunwzmIto3xOlJOV7ALALuVOs92M= github.com/bufbuild/protocompile v0.13.0/go.mod h1:dr++fGGeMPWHv7jPeT06ZKukm45NJscd7rUxQVzEKRk= +github.com/castai/promwrite v0.5.0 h1:AxpHvaeWPqk+GLqLix0JkALzwLk5ZIMUemqvL4AAv5k= +github.com/castai/promwrite v0.5.0/go.mod h1:PCwrucOaNJAcKdR8Tktz+/pQEXOnCWFL+2Yk7c9DmEU= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -47,8 +49,8 @@ github.com/containerd/cgroups/v3 v3.0.1 h1:4hfGvu8rfGIwVIDd+nLzn/B9ZXx4BcCjzt5To github.com/containerd/cgroups/v3 v3.0.1/go.mod h1:/vtwk1VXrtoa5AaZLkypuOJgA/6DyPMZHJPGQNtlHnw= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= -github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= -github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/coreos/go-systemd/v22 v22.4.0 h1:y9YHcjnjynCd/DVbg5j9L/33jQM3MxJlbj/zWskzfGU= +github.com/coreos/go-systemd/v22 v22.4.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -58,8 +60,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dmarkham/enumer v1.5.8 h1:fIF11F9l5jyD++YYvxcSH5WgHfeaSGPaN/T4kOQ4qEM= github.com/dmarkham/enumer v1.5.8/go.mod h1:d10o8R3t/gROm2p3BXqTkMt2+HMuxEmWCXzorAruYak= -github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= -github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= +github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dominikbraun/graph v0.23.0 h1:TdZB4pPqCLFxYhdyMFb1TBdFxp8XLcJfTTBQucVPgCo= github.com/dominikbraun/graph v0.23.0/go.mod h1:yOjYyogZLY1LSG9E33JWZJiq5k83Qy2C6POAuiViluc= github.com/elliotchance/orderedmap/v2 v2.4.0 h1:6tUmMwD9F998FNpwFxA5E6NQvSpk2PVw7RKsVq3+2Cw= @@ -113,6 +115,8 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgjlRvqsdk= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -121,8 +125,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= -github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20221102093814-76f304f74e5e h1:F1LLQqQ8WoIbyoxLUY+JUZe1kuHdxThM6CPUATzE6Io= +github.com/google/pprof v0.0.0-20221102093814-76f304f74e5e/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -155,8 +159,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/vault/api v1.16.0 h1:nbEYGJiAPGzT9U4oWgaaB0g+Rj8E59QuHKyA5LhwQN4= github.com/hashicorp/vault/api v1.16.0/go.mod h1:KhuUhzOD8lDSk29AtzNjgAu2kxRA9jL9NAbkFlqvkBA= -github.com/imdario/mergo v0.3.11 h1:3tnifQM4i+fbajXKBHXWEH+KvNHqojZ778UH75j3bGA= -github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= +github.com/imdario/mergo v0.3.12 h1:b6R2BslTbIEToALKP7LxUvijTsNI9TAe80pLWN2g/HU= +github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= @@ -283,6 +287,8 @@ github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFu github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/plar/go-adaptive-radix-tree/v2 v2.0.3 h1:cJx/EUTduV4q10O5HSzHgPrViApJkJQk9OSeaT7UYUU= +github.com/plar/go-adaptive-radix-tree/v2 v2.0.3/go.mod h1:8yf9K81YK94H4gKh/K3hCBeC2s4JA/PYgqMkkOadwvk= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= @@ -295,6 +301,8 @@ github.com/prometheus/common v0.42.0 h1:EKsfXEYo4JpWMHH5cg+KOUWeuJSov1Id8zGR8eeI github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg= github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/prometheus/prometheus v0.40.3 h1:oMw1vVyrxHTigXAcFY6QHrGUnQEbKEOKo737cPgYBwY= +github.com/prometheus/prometheus v0.40.3/go.mod h1:/UhsWkOXkO11wqTW2Bx5YDOwRweSDcaFBlTIzFe7P0Y= github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI= github.com/redis/go-redis/v9 v9.8.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/rjeczalik/notify v0.9.3 h1:6rJAzHTGKXGj76sbRgDiDcYj/HniypXmSJo1SWakZeY= @@ -481,8 +489,8 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -540,8 +548,8 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= -gopkg.in/ini.v1 v1.62.0 h1:duBzk771uxoUuOlyRLkHsygud9+5lrlGjdFBb4mSKDU= -gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/ini.v1 v1.66.6 h1:LATuAqN/shcYAOkv3wl2L4rkaKqkcgTBQjOyYDvcPKI= +gopkg.in/ini.v1 v1.66.6/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/pipeline/metadata/templater.go b/pipeline/metadata/templater.go index 8973e92ac..7de97ab16 100644 --- a/pipeline/metadata/templater.go +++ b/pipeline/metadata/templater.go @@ -4,7 +4,7 @@ import ( "bytes" "fmt" "regexp" - "strconv" + "sort" "strings" "sync" "text/template" @@ -213,8 +213,14 @@ func (m *MetaTemplater) Render(data Data) (MetaData, error) { } func generateCacheKey(data map[string]any) string { + keys := make([]string, 0, len(data)) + for k := range data { + keys = append(keys, k) + } + sort.Strings(keys) + var builder strings.Builder - builder.Grow(len(data) * 16) // Preallocate memory for the builder (estimate) + builder.Grow(len(data) * 32) // Preallocate memory for the builder (estimate) for k, v := range data { switch v := v.(type) { @@ -224,14 +230,23 @@ func generateCacheKey(data map[string]any) string { builder.WriteString(":") builder.WriteString(v) builder.WriteString("|") - case int: - // Write the key and integer value to the builder + case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64: + builder.WriteString(k) + builder.WriteString(":") + builder.WriteString(fmt.Sprintf("%d", v)) + builder.WriteString("|") + case float32, float64: + builder.WriteString(k) + builder.WriteString(":") + builder.WriteString(fmt.Sprintf("%f", v)) + builder.WriteString("|") + case bool: builder.WriteString(k) builder.WriteString(":") - builder.WriteString(strconv.Itoa(v)) + builder.WriteString(fmt.Sprintf("%t", v)) builder.WriteString("|") } - // If the value is not a string or int, skip it + // If the value is not a string, int, float or bool, skip it } // Convert the builder to a string diff --git a/pipeline/metadata/templater_test.go b/pipeline/metadata/templater_test.go index cd1214106..6142842c8 100644 --- a/pipeline/metadata/templater_test.go +++ b/pipeline/metadata/templater_test.go @@ -194,3 +194,57 @@ func BenchmarkMetaTemplater_Render(b *testing.B) { } } } + +func TestGenerateCacheKey(t *testing.T) { + tests := []struct { + name string + input map[string]any + expected string + }{ + { + name: "empty map", + input: map[string]any{}, + expected: "", + }, + { + name: "string and int", + input: map[string]any{ + "topic": "topic1", + "partition": 2, + }, + expected: "topic:topic1|partition:2", + }, + { + name: "int32 && int64", + input: map[string]any{ + "topic": "topic1", + "partition": int32(2), + "offset": int64(123456789), + }, + expected: "topic:topic1|partition:2|offset:123456789", + }, + { + name: "float", + input: map[string]any{ + "size": float32(2.0), + }, + expected: "size:2.000000", + }, + { + name: "bool", + input: map[string]any{ + "is": true, + }, + expected: "is:true", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := generateCacheKey(tt.input) + if got != tt.expected { + t.Errorf("generateCacheKey() = %v, want %v", got, tt.expected) + } + }) + } +} diff --git a/plugin/action/cardinality_limit/README.idoc.md b/plugin/action/cardinality_limit/README.idoc.md new file mode 100644 index 000000000..c22026fd2 --- /dev/null +++ b/plugin/action/cardinality_limit/README.idoc.md @@ -0,0 +1,3 @@ +# Cardinality limit plugin +@introduction + diff --git a/plugin/action/cardinality_limit/README.md b/plugin/action/cardinality_limit/README.md new file mode 100755 index 000000000..d856baba0 --- /dev/null +++ b/plugin/action/cardinality_limit/README.md @@ -0,0 +1,18 @@ +# Discard plugin +It drops an event. It is used in a combination with `match_fields`/`match_mode` parameters to filter out the events. + +**An example for discarding informational and debug logs:** +```yaml +pipelines: + example_pipeline: + ... + actions: + - type: discard + match_fields: + level: /info|debug/ + ... +``` + +> No config params + +
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/action/cardinality_limit/cache.go b/plugin/action/cardinality_limit/cache.go new file mode 100644 index 000000000..5d4cbc2eb --- /dev/null +++ b/plugin/action/cardinality_limit/cache.go @@ -0,0 +1,82 @@ +package discard + +import ( + "sync" + "time" + + art "github.com/plar/go-adaptive-radix-tree/v2" +) + +type Cache struct { + mu *sync.RWMutex + tree art.Tree + ttl time.Duration +} + +func NewCache(ttl time.Duration) (*Cache, error) { + return &Cache{ + tree: art.New(), + ttl: ttl, + mu: &sync.RWMutex{}, + }, nil +} + +func (c *Cache) Set(key string) bool { + c.mu.Lock() + defer c.mu.Unlock() + + c.tree.Insert(art.Key(key), time.Now()) + return true +} + +func (c *Cache) IsExists(key string) bool { + c.mu.RLock() + timeValue, found := c.tree.Search(art.Key(key)) + c.mu.RUnlock() + + if found { + now := time.Now() + isExpire := c.isExpire(now, timeValue.(time.Time)) + if isExpire { + c.delete(key) + return false + } + } + return found +} + +func (c *Cache) isExpire(now, value time.Time) bool { + diff := now.Sub(value) + return diff > c.ttl +} + +func (c *Cache) delete(key string) { + c.mu.Lock() + defer c.mu.Unlock() + + c.tree.Delete(art.Key(key)) +} + +func (c *Cache) CountPrefix(prefix string) (count int) { + var keysToDelete []art.Key + c.mu.RLock() + now := time.Now() + c.tree.ForEachPrefix(art.Key(prefix), func(node art.Node) bool { + timeValue := node.Value().(time.Time) + if c.isExpire(now, timeValue) { + keysToDelete = append(keysToDelete, node.Key()) + return false + } else { + count++ + return true + } + }) + c.mu.RUnlock() + + if len(keysToDelete) > 0 { + for _, key := range keysToDelete { + c.delete(string(key)) + } + } + return +} diff --git a/plugin/action/cardinality_limit/cache_test.go b/plugin/action/cardinality_limit/cache_test.go new file mode 100644 index 000000000..530eef480 --- /dev/null +++ b/plugin/action/cardinality_limit/cache_test.go @@ -0,0 +1,172 @@ +package discard + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewCache(t *testing.T) { + cache, err := NewCache(time.Minute) + assert.NoError(t, err) + assert.NotNil(t, cache) +} + +func TestSetAndExists(t *testing.T) { + cache, _ := NewCache(time.Minute) + + t.Run("basic set and get", func(t *testing.T) { + key := "test-key" + assert.True(t, cache.Set(key)) + + found := cache.IsExists(key) + assert.True(t, found) + }) + + t.Run("non-existent key", func(t *testing.T) { + found := cache.IsExists("non-existent") + assert.False(t, found) + }) +} + +func TestDelete(t *testing.T) { + cache, _ := NewCache(time.Minute) + + key := "to-delete" + cache.Set(key) + + t.Run("delete existing key", func(t *testing.T) { + cache.delete(key) + found := cache.IsExists(key) + assert.False(t, found) + }) + + t.Run("delete non-existent key", func(t *testing.T) { + // Should not panic + cache.delete("never-existed") + }) +} + +func TestCountPrefix(t *testing.T) { + cache, _ := NewCache(time.Minute) + + keys := []string{ + "key1_subkey1", + "key1_subkey1", + "key1_subkey2", + + "key2_subkey1", + } + + for _, key := range keys { + cache.Set(key) + } + + testCases := []struct { + prefix string + count int + }{ + {"key1", 2}, + {"key2", 1}, + {"key3", 0}, + } + + for _, tc := range testCases { + t.Run("prefix "+tc.prefix, func(t *testing.T) { + assert.Equal(t, tc.count, cache.CountPrefix(tc.prefix)) + }) + } + + t.Run("count after delete", func(t *testing.T) { + cache.delete("key1_subkey1") + assert.Equal(t, 1, cache.CountPrefix("key1")) + }) +} + +func TestConcurrentOperations(t *testing.T) { + cache, _ := NewCache(time.Minute) + + var wg sync.WaitGroup + keys := []string{"key1", "key2", "key3"} + + // Test concurrent sets + wg.Add(len(keys)) + for _, key := range keys { + go func(k string) { + defer wg.Done() + for i := 0; i < 100; i++ { + cache.Set(k) + } + }(key) + } + wg.Wait() + + // Verify all keys were set + for _, key := range keys { + found := cache.IsExists(key) + assert.True(t, found) + } + + // Test concurrent gets and sets + wg.Add(len(keys)) + for _, key := range keys { + go func(k string) { + defer wg.Done() + for i := 0; i < 100; i++ { + cache.IsExists(k) + cache.Set(k + "-new") + } + }(key) + } + wg.Wait() + + // Test concurrent deletes + wg.Add(len(keys)) + for _, key := range keys { + go func(k string) { + defer wg.Done() + for i := 0; i < 100; i++ { + cache.delete(k) + } + }(key) + } + wg.Wait() + + // Verify prefix counts under concurrent access + wg.Add(2) + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + cache.CountPrefix("key") + } + }() + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + cache.Set("key-x") + cache.Set("key-y") + cache.delete("key-x") + } + }() + wg.Wait() +} + +func TestTTL(t *testing.T) { + cache, _ := NewCache(100 * time.Millisecond) + + key := "ttl-key" + cache.Set(key) + + t.Run("key exists before TTL", func(t *testing.T) { + found := cache.IsExists(key) + assert.True(t, found) + }) + + t.Run("key expires after TTL", func(t *testing.T) { + time.Sleep(200 * time.Millisecond) + found := cache.IsExists(key) + assert.False(t, found) + }) +} diff --git a/plugin/action/cardinality_limit/cardinality_limit.go b/plugin/action/cardinality_limit/cardinality_limit.go new file mode 100644 index 000000000..1ec31669a --- /dev/null +++ b/plugin/action/cardinality_limit/cardinality_limit.go @@ -0,0 +1,203 @@ +package discard + +import ( + "fmt" + "sort" + "strings" + "time" + + "github.com/ozontech/file.d/cfg" + "github.com/ozontech/file.d/fd" + "github.com/ozontech/file.d/metric" + "github.com/ozontech/file.d/pipeline" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +type Plugin struct { + cache *Cache + config *Config + keys []string + fields []string + logger *zap.Logger + + cardinalityDiscardCounter *prometheus.CounterVec +} + +const ( + actionDiscard = "discard" + actionRemoveFields = "remove_fields" +) + +type Config struct { + KeyFields []cfg.FieldSelector `json:"key" slice:"true" required:"true"` + Fields []cfg.FieldSelector `json:"fields" slice:"true" required:"true"` + Action string `json:"action" default:"discard" options:"discard|remove_fields"` + MetricPrefix string `json:"metric_prefix" default:""` + Limit int `json:"limit" default:"10000"` + TTL cfg.Duration `json:"ttl" default:"1h" parse:"duration"` // * + TTL_ time.Duration +} + +func init() { + fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{ + Type: "cardinality_limit", + Factory: factory, + }) +} + +func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { + return &Plugin{}, &Config{} +} + +func (p *Plugin) makeMetric(ctl *metric.Ctl, name, help string, labels ...string) *prometheus.CounterVec { + if name == "" { + return nil + } + + uniq := make(map[string]struct{}) + labelNames := make([]string, 0, len(labels)) + for _, label := range labels { + if label == "" { + p.logger.Fatal("empty label name") + } + if _, ok := uniq[label]; ok { + p.logger.Fatal("metric labels must be unique") + } + uniq[label] = struct{}{} + + labelNames = append(labelNames, label) + } + + return ctl.RegisterCounterVec(name, help, labelNames...) +} + +func (p *Plugin) registerMetrics(ctl *metric.Ctl, prefix string) { + var metricName string + if prefix == "" { + metricName = "cardinality_discard_total" + } else { + metricName = fmt.Sprintf(`cardinality_discard_%s_total`, prefix) + } + p.cardinalityDiscardCounter = p.makeMetric(ctl, + metricName, + "Total number of events discarded due to cardinality limits", + p.keys..., + ) +} + +func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) { + p.config = config.(*Config) + p.logger = params.Logger.Desugar() + + var err error + p.cache, err = NewCache(p.config.TTL_) + if err != nil { + panic(err) + } + + p.keys = make([]string, 0, len(p.config.KeyFields)) + for _, fs := range p.config.KeyFields { + if fs != "" { + p.keys = append(p.keys, cfg.ParseFieldSelector(string(fs))[0]) + } + } + + if len(p.config.Fields) == 0 { + p.logger.Fatal("you have to set key fields") + } + + p.fields = make([]string, 0, len(p.config.Fields)) + for _, fs := range p.config.Fields { + if fs != "" { + p.fields = append(p.fields, cfg.ParseFieldSelector(string(fs))[0]) + } + } + + p.registerMetrics(params.MetricCtl, p.config.MetricPrefix) +} + +func (p *Plugin) Stop() { + +} + +func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { + cacheKey := make(map[string]string, len(p.keys)) + + for _, key := range p.keys { + value := pipeline.CloneString(event.Root.Dig(key).AsString()) + cacheKey[key] = value + } + + cacheValue := make(map[string]string, len(p.fields)) + for _, key := range p.fields { + value := pipeline.CloneString(event.Root.Dig(key).AsString()) + cacheValue[key] = value + } + + key := mapToKey(cacheKey) + value := mapToStringSorted(cacheKey, cacheValue) + keysCount := p.cache.CountPrefix(key) + + if p.config.Limit > 0 && keysCount >= p.config.Limit { + labelsValues := make([]string, 0, len(p.keys)) + for _, key := range p.keys { + if val, exists := cacheKey[key]; exists { + labelsValues = append(labelsValues, val) + } else { + labelsValues = append(labelsValues, "unknown") + } + } + p.cardinalityDiscardCounter.WithLabelValues(labelsValues...).Inc() + switch p.config.Action { + case actionDiscard: + return pipeline.ActionDiscard + case actionRemoveFields: + for _, key := range p.fields { + event.Root.Dig(key).Suicide() + } + } + } else { + p.cache.Set(value) + } + return pipeline.ActionPass +} + +func mapToStringSorted(m, n map[string]string) string { + var sb strings.Builder + sb.WriteString(mapToKey(m)) + + keys := make([]string, 0, len(n)) + for k := range n { + keys = append(keys, k) + } + sort.Strings(keys) + sb.WriteString("map[") + for i, k := range keys { + if i > 0 { + sb.WriteString(" ") + } + sb.WriteString(k + ":" + n[k]) + } + sb.WriteString("]") + return sb.String() +} + +func mapToKey(m map[string]string) string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + sort.Strings(keys) + + var sb strings.Builder + sb.WriteString("map[") + for i, k := range keys { + if i > 0 { + sb.WriteString(" ") + } + sb.WriteString(k + ":" + m[k]) + } + sb.WriteString("];") + return sb.String() +} diff --git a/plugin/action/cardinality_limit/cardinality_limit_test.go b/plugin/action/cardinality_limit/cardinality_limit_test.go new file mode 100644 index 000000000..4f8c4b796 --- /dev/null +++ b/plugin/action/cardinality_limit/cardinality_limit_test.go @@ -0,0 +1,200 @@ +package discard + +import ( + "fmt" + "sort" + "strings" + "sync" + "testing" + "time" + + "github.com/ozontech/file.d/cfg" + "github.com/ozontech/file.d/pipeline" + "github.com/ozontech/file.d/test" + "github.com/stretchr/testify/assert" +) + +func TestMapToStringSorted(t *testing.T) { + tests := []struct { + name string + m map[string]string + n map[string]string + expected string + }{ + { + name: "empty maps", + m: map[string]string{}, + n: map[string]string{}, + expected: "map[];map[]", + }, + { + name: "simple maps", + m: map[string]string{"service": "test", "host": "localhost"}, + n: map[string]string{"value": "1", "level": "3"}, + expected: "map[host:localhost service:test];map[level:3 value:1]", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := mapToStringSorted(tt.m, tt.n) + if result != tt.expected { + t.Errorf("mapToStringSorted() = %v, want %v", result, tt.expected) + } + + // Verify the output is properly sorted + if strings.HasPrefix(result, "map[") && strings.HasSuffix(result, "]") { + content := result[4 : len(result)-1] + if content != "" { + pairs := strings.Split(content, " ") + if !sort.StringsAreSorted(pairs) { + t.Errorf("output pairs are not sorted: %v", pairs) + } + } + } + }) + } +} + +func TestCardinalityLimitDiscard(t *testing.T) { + limit := 10 + config := &Config{ + KeyFields: []cfg.FieldSelector{"host"}, + Fields: []cfg.FieldSelector{"i"}, + Limit: limit, + Action: actionDiscard, + TTL_: 1 * time.Hour, + } + + p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) + + inEventsCnt := 0 + inWg := sync.WaitGroup{} + genEventsCnt := limit + 10 + inWg.Add(genEventsCnt) + + input.SetInFn(func() { + defer inWg.Done() + inEventsCnt++ + }) + + outEventsCnt := 0 + output.SetOutFn(func(e *pipeline.Event) { + outEventsCnt++ + }) + + for i := 0; i < genEventsCnt; i++ { + json := fmt.Sprintf(`{"host":"localhost","i":"%d"}`, i) + input.In(10, "test", test.NewOffset(0), []byte(json)) + } + inWg.Wait() + time.Sleep(100 * time.Millisecond) + + p.Stop() + assert.Equal(t, inEventsCnt, genEventsCnt, "wrong in events count") + assert.Equal(t, limit, outEventsCnt, "wrong out events count") +} + +func TestCardinalityLimitRemoveFields(t *testing.T) { + limit := 10 + config := &Config{ + KeyFields: []cfg.FieldSelector{"host"}, + Fields: []cfg.FieldSelector{"i"}, + Limit: limit, + Action: actionRemoveFields, + TTL_: 1 * time.Hour, + } + + p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) + + inEventsCnt := 0 + inWg := sync.WaitGroup{} + genEventsCnt := limit + 10 + inWg.Add(genEventsCnt) + + input.SetInFn(func() { + defer inWg.Done() + inEventsCnt++ + }) + + outEventsCnt := 0 + outWg := sync.WaitGroup{} + outWg.Add(genEventsCnt) + output.SetOutFn(func(e *pipeline.Event) { + defer outWg.Done() + + // check exists field + value := pipeline.CloneString(e.Root.Dig(string(config.Fields[0])).AsString()) + + if value != "" { + outEventsCnt++ + } + }) + + for i := 0; i < genEventsCnt; i++ { + json := fmt.Sprintf(`{"host":"localhost","i":"%d"}`, i) + input.In(10, "test", test.NewOffset(0), []byte(json)) + } + inWg.Wait() + outWg.Wait() + + p.Stop() + assert.Equal(t, inEventsCnt, genEventsCnt, "wrong in events count") + assert.Equal(t, limit, outEventsCnt, "wrong out events count") +} + +func TestCardinalityLimitDiscardIfNoSetKeyFields(t *testing.T) { + limit := 10 + config := &Config{ + KeyFields: []cfg.FieldSelector{}, + Fields: []cfg.FieldSelector{"i"}, + Limit: limit, + Action: actionDiscard, + TTL_: 1 * time.Hour, + } + + p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, config, pipeline.MatchModeAnd, nil, false)) + + inEventsCnt := 0 + inWg := sync.WaitGroup{} + genEventsCnt := limit + 10 + inWg.Add(genEventsCnt) + + input.SetInFn(func() { + defer inWg.Done() + inEventsCnt++ + }) + + outEventsCnt := 0 + output.SetOutFn(func(e *pipeline.Event) { + outEventsCnt++ + }) + + for i := 0; i < genEventsCnt; i++ { + json := fmt.Sprintf(`{"host":"localhost%d","i":"%d"}`, i, i) + input.In(10, "test", test.NewOffset(0), []byte(json)) + } + inWg.Wait() + time.Sleep(100 * time.Millisecond) + + p.Stop() + assert.Equal(t, inEventsCnt, genEventsCnt, "wrong in events count") + assert.Equal(t, limit, outEventsCnt, "wrong out events count") +} + +func TestSetAndCountPrefix(t *testing.T) { + cache, _ := NewCache(time.Minute) + + cacheKey := map[string]string{ + "host": "localhost", + } + cacheValue := map[string]string{ + "i": "0", + } + key := mapToKey(cacheKey) + value := mapToStringSorted(cacheKey, cacheValue) + cache.Set(value) + + keysCount := cache.CountPrefix(key) + assert.Equal(t, 1, keysCount, "wrong in events count") +} diff --git a/plugin/action/event_to_metrics/event_to_metrics.go b/plugin/action/event_to_metrics/event_to_metrics.go new file mode 100644 index 000000000..e08d1eabe --- /dev/null +++ b/plugin/action/event_to_metrics/event_to_metrics.go @@ -0,0 +1,169 @@ +package event_to_metrics + +import ( + "time" + + "github.com/ozontech/file.d/cfg" + "github.com/ozontech/file.d/fd" + "github.com/ozontech/file.d/pipeline" + "github.com/ozontech/file.d/pipeline/doif" + "github.com/ozontech/file.d/xtime" + insaneJSON "github.com/ozontech/insane-json" + "go.uber.org/zap" +) + +/*{ introduction +Get metric from event +}*/ + +type Plugin struct { + config *Config + logger *zap.Logger + pluginController pipeline.ActionPluginController + format string +} + +// ! config-params +// ^ config-params +type Config struct { + // > @3@4@5@6 + // > + // > The event field which defines the time when event was fired. + // > It is used to detect the event throughput in a particular time range. + // > If not set, the current time will be taken. + TimeField cfg.FieldSelector `json:"time_field" default:"time" parse:"selector"` // * + TimeField_ []string + + // > @3@4@5@6 + // > + // > It defines how to parse the time field format. Can be specified as a datetime layout in Go [time.Parse](https://pkg.go.dev/time#Parse) format or by alias. + // > List of available datetime format aliases can be found [here](/pipeline/README.md#datetime-parse-formats). + TimeFieldFormat string `json:"time_field_format" default:"rfc3339nano"` // * + + Metrics []Metric +} + +type Metric struct { + Name string `json:"name"` + Type string `json:"type"` + Value string `json:"value"` + Labels map[string]string `json:"labels"` + + DoIfCheckerMap map[string]any `json:"do_if"` + DoIfChecker *doif.Checker + + use bool +} + +func init() { + fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{ + Type: "event_to_metrics", + Factory: factory, + }) +} + +func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { + return &Plugin{}, &Config{} +} + +func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) { + p.config = config.(*Config) + p.logger = params.Logger.Desugar() + p.pluginController = params.Controller + p.config.Metrics = prepareCheckersForMetrics(p.config.Metrics, p.logger) + + format, err := xtime.ParseFormatName(p.config.TimeFieldFormat) + if err != nil { + format = p.config.TimeFieldFormat + } + p.format = format +} + +func prepareCheckersForMetrics(metrics []Metric, logger *zap.Logger) []Metric { + for i := range metrics { + m := &metrics[i] + if m.DoIfCheckerMap != nil { + var err error + m.DoIfChecker, err = doif.NewFromMap(m.DoIfCheckerMap) + if err != nil { + logger.Fatal("can't init do_if for mask", zap.Error(err)) + } + } else { + m.use = true + } + } + + return metrics +} + +func (p *Plugin) Stop() { +} + +func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { + for i := range p.config.Metrics { + if p.config.Metrics[i].DoIfChecker != nil { + p.config.Metrics[i].use = p.config.Metrics[i].DoIfChecker.Check(event.Root) + } + } + + var ts time.Time + + if len(p.config.TimeField_) != 0 { + tsValue := event.Root.Dig(p.config.TimeField_...).AsString() + t, err := xtime.ParseTime(p.format, tsValue) + if err != nil || t.IsZero() { + p.logger.Warn( + "can't parse field with timestamp using format", + zap.Any("time_field", p.config.TimeField), + zap.String("TimeFieldFormat", p.config.TimeFieldFormat), + zap.String("value", tsValue), + ) + ts = time.Now() + } else { + ts = t + } + } else { + ts = time.Now() + } + + children := make([]*insaneJSON.Node, 0, len(p.config.Metrics)) + for _, metric := range p.config.Metrics { + if !metric.use { + continue + } + + elem := new(insaneJSON.Node) + object := elem.MutateToObject() + + object.AddField("name").MutateToBytes([]byte(metric.Name)) + object.AddField("type").MutateToBytes([]byte(metric.Type)) + object.AddField("timestamp").MutateToInt64(ts.UnixNano()) + + if len(metric.Value) == 0 { + object.AddField("value").MutateToInt(1) + } else { + valueNode := event.Root.Dig(metric.Value).AsFloat() + object.AddField("value").MutateToFloat(valueNode) + } + + if len(metric.Labels) > 0 { + labelsObject := object.AddField("labels").MutateToObject() + + for labelName, labelValue := range metric.Labels { + node := event.Root.Dig(labelValue) + value := node.AsString() + labelsObject.AddField(labelName).MutateToBytes([]byte(value)) + } + } + + children = append(children, elem) + } + + if len(children) == 0 { + // zero array or an array that does not contain objects + return pipeline.ActionDiscard + } + + p.pluginController.Spawn(event, children) + return pipeline.ActionBreak +} diff --git a/plugin/output/prometheus/metric_collector.go b/plugin/output/prometheus/metric_collector.go new file mode 100644 index 000000000..d4ed0445c --- /dev/null +++ b/plugin/output/prometheus/metric_collector.go @@ -0,0 +1,175 @@ +package prometheus + +import ( + "fmt" + "sort" + "strings" + "sync" + "time" + + "github.com/castai/promwrite" + "go.uber.org/zap" +) + +type metricCollector struct { + collector *sync.Map + timeout time.Duration + flushTicker *time.Ticker + shutdownChan chan struct{} + sender storageSender + logger *zap.Logger +} + +type metricValue struct { + value float64 + timestamp int64 + lastUpdateTime time.Time + lastValueIsSended bool +} + +type storageSender interface { + sendToStorage(values []promwrite.TimeSeries) error +} + +func newCollector(timeout time.Duration, sender storageSender, logger *zap.Logger) *metricCollector { + c := &metricCollector{ + collector: new(sync.Map), + timeout: timeout, + flushTicker: time.NewTicker(1 * time.Second), // Check every second + shutdownChan: make(chan struct{}), + sender: sender, + logger: logger, + } + go c.flushOldMetrics() + return c +} + +func (p *metricCollector) handleMetric(labels []promwrite.Label, value float64, timestamp int64, metricType string) []promwrite.TimeSeries { + labelsKey := labelsToKey(labels) + now := time.Now() + currentTimestampSec := timestamp / 1_000_000_000 + + var values []promwrite.TimeSeries + var shouldSend bool + var currentValue float64 + var prevMetric metricValue + + if prev, ok := p.collector.Load(labelsKey); ok { + prevMetric = prev.(metricValue) + prevTimestampSec := prevMetric.timestamp / 1_000_000_000 + + // For counters, accumulate values + currentValue = value + if metricType == "counter" { + currentValue += prevMetric.value + } + + // Check if time window advanced + shouldSend = prevTimestampSec < currentTimestampSec + + if shouldSend { + prevMetric.timestamp = max(prevMetric.timestamp, timestamp-1_000_000_000) + values = append(values, createTimeSeries(labels, prevMetric)) + } + } else { + currentValue = value + shouldSend = false // First value, don't send yet + } + + // Always store the current value + p.collector.Store(labelsKey, metricValue{ + value: currentValue, + timestamp: timestamp, + lastUpdateTime: now, + lastValueIsSended: shouldSend, + }) + + return values +} + +func (p *metricCollector) flushOldMetrics() { + for { + select { + case <-p.flushTicker.C: + var toSend []promwrite.TimeSeries + now := time.Now() + + p.collector.Range(func(key, value interface{}) bool { + metric := value.(metricValue) + if now.Sub(metric.lastUpdateTime) > p.timeout && !metric.lastValueIsSended { + labels := keyToLabels(key.(string)) + toSend = append(toSend, createTimeSeries(labels, metric)) + metric.lastValueIsSended = true + p.collector.Store(key, metric) + } + return true + }) + + if len(toSend) > 0 { + // Send these metrics to your storage + err := p.sender.sendToStorage(toSend) + if err != nil { + p.logger.Error("can't send data", zap.Error(err)) + } + } + + case <-p.shutdownChan: + p.flushTicker.Stop() + return + } + } +} + +func (p *metricCollector) shutdown() { + close(p.shutdownChan) + // Flush all remaining metrics + var toSend []promwrite.TimeSeries + p.collector.Range(func(key, value interface{}) bool { + metric := value.(metricValue) + labels := keyToLabels(key.(string)) + toSend = append(toSend, createTimeSeries(labels, metric)) + return true + }) + p.sender.sendToStorage(toSend) +} + +// Helper function +func createTimeSeries(labels []promwrite.Label, metric metricValue) promwrite.TimeSeries { + return promwrite.TimeSeries{ + Labels: labels, + Sample: promwrite.Sample{ + Time: time.Unix(0, metric.timestamp), + Value: metric.value, + }, + } +} + +func keyToLabels(key string) []promwrite.Label { + if len(key) == 0 { + return nil + } + key = key[:len(key)-1] // Remove trailing comma + labels := make([]promwrite.Label, 0, strings.Count(key, ",")+1) + + for key != "" { + pair, rest, _ := strings.Cut(key, ",") + name, value, _ := strings.Cut(pair, "=") + labels = append(labels, promwrite.Label{Name: name, Value: value}) + key = rest + } + return labels +} + +func labelsToKey(labels []promwrite.Label) string { + sorted := make([]promwrite.Label, len(labels)) + copy(sorted, labels) + sort.Slice(sorted, func(i, j int) bool { + return sorted[i].Name < sorted[j].Name + }) + + var b strings.Builder + for _, l := range sorted { + fmt.Fprintf(&b, "%s=%s,", l.Name, l.Value) + } + return b.String() +} diff --git a/plugin/output/prometheus/metric_collector_test.go b/plugin/output/prometheus/metric_collector_test.go new file mode 100644 index 000000000..cf0c33227 --- /dev/null +++ b/plugin/output/prometheus/metric_collector_test.go @@ -0,0 +1,397 @@ +package prometheus + +import ( + "errors" + "strings" + "sync" + "testing" + "time" + + "github.com/castai/promwrite" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" +) + +// TestStorageSender implements storageSender for testing +type TestStorageSender struct { + sentMetrics [][]promwrite.TimeSeries + mu sync.Mutex + returnError error +} + +func (t *TestStorageSender) sendToStorage(values []promwrite.TimeSeries) error { + t.mu.Lock() + defer t.mu.Unlock() + t.sentMetrics = append(t.sentMetrics, values) + return t.returnError +} + +func (t *TestStorageSender) getSentMetrics() [][]promwrite.TimeSeries { + t.mu.Lock() + defer t.mu.Unlock() + return t.sentMetrics +} + +func (t *TestStorageSender) reset() { + t.mu.Lock() + defer t.mu.Unlock() + t.sentMetrics = nil + t.returnError = nil +} + +func (t *TestStorageSender) setError(err error) { + t.mu.Lock() + defer t.mu.Unlock() + t.returnError = err +} + +func TestMetricCollector(t *testing.T) { + t.Run("labelsToKey and keyToLabels roundtrip", func(t *testing.T) { + labels := []promwrite.Label{ + {Name: "job", Value: "test"}, + {Name: "instance", Value: "localhost"}, + {Name: "__name__", Value: "test_metric"}, + } + + key := labelsToKey(labels) + convertedLabels := keyToLabels(key) + + assert.Len(t, convertedLabels, 3) + // Since labels are sorted in key generation, we need to check values + labelMap := make(map[string]string) + for _, l := range convertedLabels { + labelMap[l.Name] = l.Value + } + assert.Equal(t, "test", labelMap["job"]) + assert.Equal(t, "localhost", labelMap["instance"]) + assert.Equal(t, "test_metric", labelMap["__name__"]) + }) + + t.Run("handleMetric counter accumulation", func(t *testing.T) { + logger := zaptest.NewLogger(t) + testSender := &TestStorageSender{} + collector := newCollector(5*time.Minute, testSender, logger) + + labels := []promwrite.Label{ + {Name: "__name__", Value: "test_counter"}, + {Name: "job", Value: "test"}, + } + + // First value - should not be sent + now := time.Now() + values := collector.handleMetric(labels, 10.0, now.UnixNano(), "counter") + assert.Empty(t, values) + assert.Empty(t, testSender.getSentMetrics()) + + // Second value in same time window - should accumulate but not send + values = collector.handleMetric(labels, 5.0, now.UnixNano(), "counter") + assert.Empty(t, values) + assert.Empty(t, testSender.getSentMetrics()) + + // Third value in next time window - should send accumulated value + nextTime := now.Add(time.Second) + values = collector.handleMetric(labels, 3.0, nextTime.UnixNano(), "counter") + assert.Len(t, values, 1) + assert.Equal(t, 15.0, values[0].Sample.Value) // 10 + 5 + }) + + t.Run("handleMetric gauge behavior", func(t *testing.T) { + logger := zaptest.NewLogger(t) + testSender := &TestStorageSender{} + collector := newCollector(5*time.Minute, testSender, logger) + + labels := []promwrite.Label{ + {Name: "__name__", Value: "test_gauge"}, + {Name: "job", Value: "test"}, + } + + now := time.Now() + + // First value - should not be sent + values := collector.handleMetric(labels, 100.0, now.UnixNano(), "gauge") + assert.Empty(t, values) + + // Second value in same time window - should replace but not send + values = collector.handleMetric(labels, 200.0, now.UnixNano(), "gauge") + assert.Empty(t, values) + + // Third value in next time window - should send latest value + nextTime := now.Add(time.Second) + values = collector.handleMetric(labels, 300.0, nextTime.UnixNano(), "gauge") + assert.Len(t, values, 1) + assert.Equal(t, 200.0, values[0].Sample.Value) // Latest value before time window change + }) + + t.Run("flushOldMetrics manual check", func(t *testing.T) { + logger := zaptest.NewLogger(t) + testSender := &TestStorageSender{} + shortTimeout := 100 * time.Millisecond + collector := newCollector(shortTimeout, testSender, logger) + + labels := []promwrite.Label{ + {Name: "__name__", Value: "test_flush"}, + {Name: "job", Value: "test"}, + } + + // Add a metric that will timeout + collector.handleMetric(labels, 42.0, time.Now().UnixNano(), "gauge") + + // Wait for timeout + a bit more + time.Sleep(shortTimeout + 50*time.Millisecond) + + // Manually trigger the flush logic that would be called by the ticker + var toSend []promwrite.TimeSeries + now := time.Now() + collector.collector.Range(func(key, value interface{}) bool { + metric := value.(metricValue) + if now.Sub(metric.lastUpdateTime) > shortTimeout && !metric.lastValueIsSended { + labels := keyToLabels(key.(string)) + toSend = append(toSend, createTimeSeries(labels, metric)) + } + return true + }) + + assert.Len(t, toSend, 1) + assert.Equal(t, 42.0, toSend[0].Sample.Value) + }) + + t.Run("flushOldMetrics sends timed out metrics", func(t *testing.T) { + logger := zaptest.NewLogger(t) + testSender := &TestStorageSender{} + shortTimeout := 1 * time.Second + collector := newCollector(shortTimeout, testSender, logger) + + labels := []promwrite.Label{ + {Name: "__name__", Value: "test_flush"}, + {Name: "job", Value: "test"}, + } + + // Add a metric that will timeout + sendedValues := collector.handleMetric(labels, 42.0, time.Now().UnixNano(), "gauge") + assert.Len(t, sendedValues, 0) + + // Wait for timeout + a bit more + time.Sleep(shortTimeout + 1*time.Second) + now := time.Now() + + var toSend []promwrite.TimeSeries + collector.collector.Range(func(key, value interface{}) bool { + metric := value.(metricValue) + if now.Sub(metric.lastUpdateTime) > shortTimeout && !metric.lastValueIsSended { + labels := keyToLabels(key.(string)) + toSend = append(toSend, createTimeSeries(labels, metric)) + } + return true + }) + + assert.Len(t, toSend, 0) + }) + + t.Run("shutdown flushes all metrics", func(t *testing.T) { + logger := zaptest.NewLogger(t) + testSender := &TestStorageSender{} + collector := newCollector(5*time.Minute, testSender, logger) + + labels1 := []promwrite.Label{ + {Name: "__name__", Value: "metric1"}, + {Name: "job", Value: "test"}, + } + labels2 := []promwrite.Label{ + {Name: "__name__", Value: "metric2"}, + {Name: "job", Value: "test"}, + } + + // Add multiple metrics + collector.handleMetric(labels1, 10.0, time.Now().UnixNano(), "gauge") + collector.handleMetric(labels2, 20.0, time.Now().UnixNano(), "gauge") + + collector.shutdown() + + sentMetrics := testSender.getSentMetrics() + assert.Len(t, sentMetrics, 1) + assert.Len(t, sentMetrics[0], 2) + }) + + t.Run("concurrent access", func(t *testing.T) { + logger := zaptest.NewLogger(t) + testSender := &TestStorageSender{} + collector := newCollector(5*time.Minute, testSender, logger) + + var wg sync.WaitGroup + numGoroutines := 10 + numMetrics := 100 + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for j := 0; j < numMetrics; j++ { + labels := []promwrite.Label{ + {Name: "__name__", Value: "concurrent_metric"}, + {Name: "worker", Value: string(rune(workerID))}, + {Name: "index", Value: string(rune(j))}, + } + collector.handleMetric(labels, float64(j), time.Now().UnixNano(), "counter") + } + }(i) + } + + wg.Wait() + + // Verify all metrics are stored + count := 0 + collector.collector.Range(func(_, _ interface{}) bool { + count++ + return true + }) + + assert.Equal(t, numGoroutines*numMetrics, count) + }) + + t.Run("storage sender error handling", func(t *testing.T) { + logger := zap.NewNop() + testSender := &TestStorageSender{} + testSender.setError(errors.New("storage error")) + collector := newCollector(100*time.Millisecond, testSender, logger) + + labels := []promwrite.Label{ + {Name: "__name__", Value: "error_metric"}, + {Name: "job", Value: "test"}, + } + + collector.handleMetric(labels, 99.0, time.Now().UnixNano(), "gauge") + + // Wait for flush + time.Sleep(150 * time.Millisecond) + + // Error should be handled gracefully (logged but not panic) + sentMetrics := testSender.getSentMetrics() + assert.Equal(t, [][]promwrite.TimeSeries(nil), sentMetrics) + }) + + t.Run("metric value struct validation", func(t *testing.T) { + now := time.Now() + mv := metricValue{ + value: 42.0, + timestamp: now.UnixNano(), + lastUpdateTime: now, + lastValueIsSended: false, + } + + labels := []promwrite.Label{ + {Name: "__name__", Value: "test"}, + } + + ts := createTimeSeries(labels, mv) + assert.Equal(t, 42.0, ts.Sample.Value) + assert.Equal(t, now.Truncate(time.Nanosecond), ts.Sample.Time.Truncate(time.Nanosecond)) + assert.Equal(t, labels, ts.Labels) + }) + + t.Run("label sorting in key generation", func(t *testing.T) { + unsortedLabels := []promwrite.Label{ + {Name: "z", Value: "last"}, + {Name: "a", Value: "first"}, + {Name: "m", Value: "middle"}, + } + + key := labelsToKey(unsortedLabels) + + // Key should be sorted alphabetically by label name + assert.True(t, strings.HasPrefix(key, "a=first,")) + assert.Contains(t, key, "m=middle,") + assert.True(t, strings.HasSuffix(key, "z=last,")) + }) +} + +func TestEdgeCases(t *testing.T) { + t.Run("very old timestamps", func(t *testing.T) { + logger := zaptest.NewLogger(t) + testSender := &TestStorageSender{} + collector := newCollector(5*time.Minute, testSender, logger) + + labels := []promwrite.Label{ + {Name: "__name__", Value: "old_metric"}, + } + + // Very old timestamp + oldTime := time.Now().Add(-1 * time.Hour) + collector.handleMetric(labels, 1.0, oldTime.UnixNano(), "gauge") + + now := time.Now() + // New timestamp to trigger send + values := collector.handleMetric(labels, 2.0, now.UnixNano(), "gauge") + + // Should have sent the old value despite being very old (timestamp: 1 second before now) + assert.Len(t, values, 1) + assert.Equal(t, 1.0, values[0].Sample.Value) + assert.Equal(t, float64(1), now.Sub(values[0].Sample.Time).Seconds()) + }) + + t.Run("metric type handling", func(t *testing.T) { + logger := zaptest.NewLogger(t) + testSender := &TestStorageSender{} + collector := newCollector(5*time.Minute, testSender, logger) + + labels := []promwrite.Label{ + {Name: "__name__", Value: "test_type"}, + } + + now := time.Now() + + // Test unknown metric type (should behave like gauge) + collector.handleMetric(labels, 10.0, now.UnixNano(), "unknown") + values := collector.handleMetric(labels, 20.0, now.Add(time.Second).UnixNano(), "unknown") + + // Should send the last value (10.0) since time window advanced + if len(values) > 0 { + assert.Equal(t, 10.0, values[0].Sample.Value) + } + }) +} + +func TestCreateTimeSeries(t *testing.T) { + t.Run("createTimeSeries with valid metricValue", func(t *testing.T) { + now := time.Now() + mv := metricValue{ + value: 123.45, + timestamp: now.UnixNano(), + } + + labels := []promwrite.Label{ + {Name: "__name__", Value: "test_metric"}, + {Name: "instance", Value: "localhost"}, + } + + ts := createTimeSeries(labels, mv) + + assert.Equal(t, labels, ts.Labels) + assert.Equal(t, 123.45, ts.Sample.Value) + assert.Equal(t, now.Truncate(time.Nanosecond), ts.Sample.Time.Truncate(time.Nanosecond)) + }) +} + +// Benchmark tests +func BenchmarkLabelsToKey(b *testing.B) { + labels := []promwrite.Label{ + {Name: "__name__", Value: "benchmark_metric"}, + {Name: "job", Value: "benchmark"}, + {Name: "instance", Value: "localhost:9090"}, + {Name: "environment", Value: "production"}, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + labelsToKey(labels) + } +} + +func BenchmarkKeyToLabels(b *testing.B) { + key := "__name__=test_metric,environment=production,instance=localhost:9090,job=test," + + b.ResetTimer() + for i := 0; i < b.N; i++ { + keyToLabels(key) + } +} diff --git a/plugin/output/prometheus/prometheus.go b/plugin/output/prometheus/prometheus.go new file mode 100644 index 000000000..4b0f2f532 --- /dev/null +++ b/plugin/output/prometheus/prometheus.go @@ -0,0 +1,402 @@ +package prometheus + +import ( + "context" + "net" + "net/http" + "strings" + "time" + + "github.com/castai/promwrite" + "github.com/ozontech/file.d/cfg" + "github.com/ozontech/file.d/fd" + "github.com/ozontech/file.d/metric" + "github.com/ozontech/file.d/pipeline" + + insaneJSON "github.com/ozontech/insane-json" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +/*{ introduction +It sends the logs batches to Loki using HTTP API. +}*/ + +const ( + outPluginType = "prometheus" +) + +type data struct { + outBuf []byte +} + +type Label struct { + Label string `json:"label" required:"true"` + Value string `json:"value" required:"true"` +} + +// ! config-params +// ^ config-params +type Config struct { + Endpoint string `json:"endpoint" required:"true" default:"http://localhost:9090/api/v1/write"` // * + + // > @3@4@5@6 + // > + // > Auth config. + // > + // > `AuthConfig` params: + // > * `strategy` describes strategy to use; options:"disabled|tenant|basic|bearer" + // > By default strategy is `disabled`. + // > * `tenant_id` should be provided if strategy is `tenant`. + // > * `username` should be provided if strategy is `basic`. + // > Username is used for HTTP Basic Authentication. + // > * `password` should be provided if strategy is `basic`. + // > Password is used for HTTP Basic Authentication. + // > * `bearer_token` should be provided if strategy is `bearer`. + // > Token is used for HTTP Bearer Authentication. + Auth AuthConfig `json:"auth" child:"true"` // * + + // > @3@4@5@6 + // > + // > If set true, the plugin will use SSL/TLS connections method. + TLSEnabled bool `json:"tls_enabled" default:"false"` // * + + // > @3@4@5@6 + // > + // > If set, the plugin will skip SSL/TLS verification. + TLSSkipVerify bool `json:"tls_skip_verify" default:"false"` // * + + // > @3@4@5@6 + // > + // > Client timeout when sends requests to Loki HTTP API. + RequestTimeout cfg.Duration `json:"request_timeout" default:"1s" parse:"duration"` // * + RequestTimeout_ time.Duration + + // > @3@4@5@6 + // > + // > It defines how much time to wait for the connection. + ConnectionTimeout cfg.Duration `json:"connection_timeout" default:"5s" parse:"duration"` // * + ConnectionTimeout_ time.Duration + + // > @3@4@5@6 + // > + // > Keep-alive config. + // > + // > `KeepAliveConfig` params: + // > * `max_idle_conn_duration` - idle keep-alive connections are closed after this duration. + // > By default idle connections are closed after `10s`. + // > * `max_conn_duration` - keep-alive connections are closed after this duration. + // > If set to `0` - connection duration is unlimited. + // > By default connection duration is `5m`. + KeepAlive KeepAliveConfig `json:"keep_alive" child:"true"` // * + + // > @3@4@5@6 + // > + // > How much workers will be instantiated to send batches. + // > It also configures the amount of minimum and maximum number of database connections. + WorkersCount cfg.Expression `json:"workers_count" default:"gomaxprocs*4" parse:"expression"` // * + WorkersCount_ int + + // > @3@4@5@6 + // > + // > Maximum quantity of events to pack into one batch. + BatchSize cfg.Expression `json:"batch_size" default:"capacity/4" parse:"expression"` // * + BatchSize_ int + + // > @3@4@5@6 + // > + // > A minimum size of events in a batch to send. + // > If both batch_size and batch_size_bytes are set, they will work together. + BatchSizeBytes cfg.Expression `json:"batch_size_bytes" default:"0" parse:"expression"` // * + BatchSizeBytes_ int + + // > @3@4@5@6 + // > + // > After this timeout batch will be sent even if batch isn't completed. + BatchFlushTimeout cfg.Duration `json:"batch_flush_timeout" default:"200ms" parse:"duration"` // * + BatchFlushTimeout_ time.Duration + + // > @3@4@5@6 + // > + // > Retention milliseconds for retry to Loki. + Retention cfg.Duration `json:"retention" default:"1s" parse:"duration"` // * + Retention_ time.Duration + + // > @3@4@5@6 + // > + // > Retries of insertion. If File.d cannot insert for this number of attempts, + // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). + Retry int `json:"retry" default:"10"` // * + + // > @3@4@5@6 + // > + // > After an insert error, fall with a non-zero exit code or not + // > **Experimental feature** + FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * + + // > @3@4@5@6 + // > + // > Multiplier for exponential increase of retention between retries + RetentionExponentMultiplier int `json:"retention_exponentially_multiplier" default:"2"` // * +} + +type AuthStrategy byte + +const ( + StrategyDisabled AuthStrategy = iota + StrategyTenant + StrategyBasic + StrategyBearer +) + +// ! config-params +// ^ config-params +type AuthConfig struct { + // > AuthStrategy.Strategy describes strategy to use. + Strategy string `json:"strategy" default:"disabled" options:"disabled|tenant|basic|bearer"` + Strategy_ AuthStrategy + + // > TenantID for Tenant Authentication. + TenantID string `json:"tenant_id"` + + // > Username for HTTP Basic Authentication. + Username string `json:"username"` + + // > Password for HTTP Basic Authentication. + Password string `json:"password"` + + // > Token for HTTP Bearer Authentication. + BearerToken string `json:"bearer_token"` +} + +type KeepAliveConfig struct { + // Idle keep-alive connections are closed after this duration. + MaxIdleConnDuration cfg.Duration `json:"max_idle_conn_duration" parse:"duration" default:"10s"` + MaxIdleConnDuration_ time.Duration + + // Keep-alive connections are closed after this duration. + MaxConnDuration cfg.Duration `json:"max_conn_duration" parse:"duration" default:"5m"` + MaxConnDuration_ time.Duration +} + +type Plugin struct { + controller pipeline.OutputPluginController + logger *zap.Logger + config *Config + avgEventSize int + + ctx context.Context + cancel context.CancelFunc + + client PrometheusClient + batcher *pipeline.RetriableBatcher + + // plugin metrics + sendErrorMetric prometheus.Counter + + collector *metricCollector + + router *pipeline.Router +} + +type PrometheusClient interface { + Write(ctx context.Context, req *promwrite.WriteRequest, options ...promwrite.WriteOption) (*promwrite.WriteResponse, error) +} + +func init() { + fd.DefaultPluginRegistry.RegisterOutput(&pipeline.PluginStaticInfo{ + Type: outPluginType, + Factory: Factory, + }) +} + +func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { + return &Plugin{}, &Config{} +} + +func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginParams) { + p.controller = params.Controller + p.config = config.(*Config) + p.logger = params.Logger.Desugar() + p.avgEventSize = params.PipelineSettings.AvgEventSize + p.registerMetrics(params.MetricCtl) + p.collector = newCollector(10*time.Second, p, p.logger) + + p.prepareClient() + + batcherOpts := &pipeline.BatcherOptions{ + PipelineName: params.PipelineName, + OutputType: outPluginType, + Controller: p.controller, + Workers: p.config.WorkersCount_, + BatchSizeCount: p.config.BatchSize_, + BatchSizeBytes: p.config.BatchSizeBytes_, + FlushTimeout: p.config.BatchFlushTimeout_, + MetricCtl: params.MetricCtl, + } + + p.router = params.Router + backoffOpts := pipeline.BackoffOpts{ + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: p.config.Retry, + IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(), + } + + onError := func(err error, events []*pipeline.Event) { + var level zapcore.Level + if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() { + level = zapcore.FatalLevel + } else { + level = zapcore.ErrorLevel + } + + p.logger.Log(level, "can't send data to Prometheus", zap.Error(err), + zap.Int("retries", p.config.Retry)) + + for i := range events { + p.router.Fail(events[i]) + } + } + + p.batcher = pipeline.NewRetriableBatcher( + batcherOpts, + p.out, + backoffOpts, + onError, + ) + + ctx, cancel := context.WithCancel(context.Background()) + p.ctx = ctx + p.cancel = cancel + + p.batcher.Start(ctx) +} + +func (p *Plugin) Stop() { + p.batcher.Stop() + p.collector.shutdown() + p.cancel() +} + +func (p *Plugin) Out(event *pipeline.Event) { + p.batcher.Add(event) +} + +func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) error { + if *workerData == nil { + *workerData = &data{ + outBuf: make([]byte, 0, p.config.BatchSize_*p.avgEventSize), + } + } + + data := (*workerData).(*data) + + // handle too much memory consumption + if cap(data.outBuf) > p.config.BatchSize_*p.avgEventSize { + data.outBuf = make([]byte, 0, p.config.BatchSize_*p.avgEventSize) + } + + data.outBuf = data.outBuf[:0] + + root := insaneJSON.Spawn() + defer insaneJSON.Release(root) + + dataArr := root.AddFieldNoAlloc(root, "data").MutateToArray() + batch.ForEach(func(event *pipeline.Event) { + dataArr.AddElementNoAlloc(root).MutateToNode(event.Root.Node) + }) + + err := p.send(root) + if err != nil { + p.sendErrorMetric.Inc() + p.logger.Error("can't send data to Prometheus", zap.String("address", p.config.Endpoint), zap.Error(err)) + } else { + p.logger.Debug("successfully sent", zap.String("data", string(data.outBuf))) + } + + return err +} + +func (p *Plugin) send(root *insaneJSON.Root) error { + messages := root.Dig("data").AsArray() + // # {"name":"partitions_total","labels":{"partition":"1"},"timestamp":"2025-06-05T05:19:28.129666195Z","value": 1}} + values := make([]promwrite.TimeSeries, 0, len(messages)) + + for _, msg := range messages { + nameNode := msg.Dig("name") + name := nameNode.AsString() + nameNode.Suicide() + + typeNode := msg.Dig("type") + metricType := typeNode.AsString() + typeNode.Suicide() + + timestampNode := msg.Dig("timestamp") + timestamp := timestampNode.AsInt64() + timestampNode.Suicide() + + valueNode := msg.Dig("value") + value := valueNode.AsFloat() + valueNode.Suicide() + + labelsNode := msg.Dig("labels") + labelValues := labelsNode.AsFields() + labelsNode.Suicide() + + labels := make([]promwrite.Label, 0, len(labelValues)+1) + labels = append(labels, promwrite.Label{ + Name: "__name__", + Value: name, + }) + for _, l := range labelValues { + labels = append(labels, promwrite.Label{ + Name: l.AsString(), + Value: labelsNode.Dig(l.AsString()).AsString(), + }) + } + + values = append(values, p.collector.handleMetric( + labels, + value, + timestamp, + metricType, + )...) + } + + err := p.sendToStorage(values) + if err != nil { + if strings.Contains(err.Error(), "out of order sample") || strings.Contains(err.Error(), "duplicate sample for") { + p.sendErrorMetric.Inc() + p.logger.Warn("can't send data to Prometheus", zap.Error(err)) + return nil + } + } + + return err +} + +func (p *Plugin) sendToStorage(values []promwrite.TimeSeries) error { + _, err := p.client.Write(context.Background(), &promwrite.WriteRequest{TimeSeries: values}) + return err +} + +func (p *Plugin) registerMetrics(ctl *metric.Ctl) { + p.sendErrorMetric = ctl.RegisterCounter("output_prometheus_send_error", "Total Prometheus send errors") +} + +func (p *Plugin) prepareClient() { + customClient := &http.Client{ + Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: p.config.ConnectionTimeout_, + KeepAlive: p.config.KeepAlive.MaxConnDuration_, + }).DialContext, + IdleConnTimeout: p.config.KeepAlive.MaxIdleConnDuration_, + }, + Timeout: p.config.RequestTimeout_, + } + + p.client = promwrite.NewClient(p.config.Endpoint, promwrite.HttpClient(customClient)) +}