Skip to content

Commit ebf4db1

Browse files
committed
Update to a single large stream with multiple consumers
This feels like the more simple approach since creating a stream per testrun would require us to think a lot about security on the CreateStream endpoint. This way ETOS will handle the stream creation itself.
1 parent 694b8b0 commit ebf4db1

File tree

8 files changed

+60
-48
lines changed

8 files changed

+60
-48
lines changed

cmd/sse/main.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,14 @@ func main() {
8585

8686
var streamer stream.Streamer
8787
if cfg.RabbitMQURI() != "" {
88-
log.Info("Starting up a RabbitMQStreamer")
89-
streamer, err = stream.NewRabbitMQStreamer(*rabbitMQStream.NewEnvironmentOptions().SetUri(cfg.RabbitMQURI()), log)
88+
if cfg.RabbitMQStreamName() == "" {
89+
log.Fatal("ETOS_RABBITMQ_STREAM_NAME must be set for the SSE server to work.")
90+
}
91+
log.Infof("Starting up a RabbitMQStreamer with stream name: %s", cfg.RabbitMQStreamName())
92+
streamer, err = stream.NewRabbitMQStreamer(*rabbitMQStream.NewEnvironmentOptions().SetUri(cfg.RabbitMQURI()), log, cfg.RabbitMQStreamName())
93+
if err := streamer.CreateStream(ctx, log, cfg.RabbitMQStreamName()); err != nil {
94+
log.Fatal(err.Error())
95+
}
9096
} else {
9197
log.Warning("RabbitMQURI is not set, defaulting to FileStreamer")
9298
streamer, err = stream.NewFileStreamer(100*time.Millisecond, log)

go.mod

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ require (
1414
github.com/maxcnunes/httpfake v1.2.4
1515
github.com/package-url/packageurl-go v0.1.3
1616
github.com/rabbitmq/amqp091-go v1.10.0
17-
github.com/rabbitmq/rabbitmq-stream-go-client v1.4.10
17+
github.com/rabbitmq/rabbitmq-stream-go-client v1.5.0
1818
github.com/sethvargo/go-retry v0.3.0
1919
github.com/sirupsen/logrus v1.9.3
2020
github.com/snowzach/rotatefilehook v0.0.0-20220211133110-53752135082d
@@ -102,12 +102,12 @@ require (
102102
go.uber.org/atomic v1.7.0 // indirect
103103
go.uber.org/multierr v1.6.0 // indirect
104104
go.uber.org/zap v1.17.0 // indirect
105-
golang.org/x/crypto v0.26.0 // indirect
106-
golang.org/x/net v0.28.0 // indirect
105+
golang.org/x/crypto v0.31.0 // indirect
106+
golang.org/x/net v0.33.0 // indirect
107107
golang.org/x/oauth2 v0.21.0 // indirect
108-
golang.org/x/sys v0.23.0 // indirect
109-
golang.org/x/term v0.23.0 // indirect
110-
golang.org/x/text v0.17.0 // indirect
108+
golang.org/x/sys v0.28.0 // indirect
109+
golang.org/x/term v0.27.0 // indirect
110+
golang.org/x/text v0.21.0 // indirect
111111
golang.org/x/time v0.3.0 // indirect
112112
google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d // indirect
113113
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect

go.sum

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
125125
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
126126
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
127127
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
128-
github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af h1:kmjWCqn2qkEml422C2Rrd27c3VGxi6a/6HNq8QmHRKM=
129-
github.com/google/pprof v0.0.0-20240525223248-4bfdf5a9a2af/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo=
128+
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg=
129+
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
130130
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
131131
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
132132
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@@ -193,10 +193,10 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
193193
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
194194
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
195195
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
196-
github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA=
197-
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
198-
github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk=
199-
github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0=
196+
github.com/onsi/ginkgo/v2 v2.22.2 h1:/3X8Panh8/WwhU/3Ssa6rCKqPLuAkVY2I0RoyDLySlU=
197+
github.com/onsi/ginkgo/v2 v2.22.2/go.mod h1:oeMosUL+8LtarXBHu/c0bx2D/K9zyQ6uX3cTyztHwsk=
198+
github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8=
199+
github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY=
200200
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
201201
github.com/package-url/packageurl-go v0.1.3 h1:4juMED3hHiz0set3Vq3KeQ75KD1avthoXLtmE3I0PLs=
202202
github.com/package-url/packageurl-go v0.1.3/go.mod h1:nKAWB8E6uk1MHqiS/lQb9pYBGH2+mdJ2PJc2s50dQY0=
@@ -230,8 +230,8 @@ github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3x
230230
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
231231
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
232232
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
233-
github.com/rabbitmq/rabbitmq-stream-go-client v1.4.10 h1:1kDn/orisEbfMtxdZwWKpxX9+FahnzoRCuGCLZ66fAc=
234-
github.com/rabbitmq/rabbitmq-stream-go-client v1.4.10/go.mod h1:SdWsW0K5FVo8lIx0lCH17wh7RItXEQb8bfpxVlTVqS8=
233+
github.com/rabbitmq/rabbitmq-stream-go-client v1.5.0 h1:2UWryxhtQmYA3Bx2iajQCre3yQbARiSikpC/8iWbu3k=
234+
github.com/rabbitmq/rabbitmq-stream-go-client v1.5.0/go.mod h1:KDXSNVSqj4QNg6TNMBnQQ/oWHaxLjUI1520j68SyEcY=
235235
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
236236
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
237237
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
@@ -328,8 +328,8 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnf
328328
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
329329
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
330330
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
331-
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
332-
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
331+
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
332+
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
333333
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
334334
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
335335
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
@@ -350,8 +350,8 @@ golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81R
350350
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
351351
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
352352
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
353-
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
354-
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
353+
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
354+
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
355355
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
356356
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
357357
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -364,8 +364,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
364364
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
365365
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
366366
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
367-
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
368-
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
367+
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
368+
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
369369
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
370370
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
371371
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -381,15 +381,15 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
381381
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
382382
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
383383
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
384-
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
385-
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
386-
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
387-
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
384+
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
385+
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
386+
golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q=
387+
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
388388
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
389389
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
390390
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
391-
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
392-
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
391+
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
392+
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
393393
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
394394
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
395395
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@@ -400,8 +400,8 @@ golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBn
400400
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
401401
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
402402
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
403-
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
404-
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
403+
golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8=
404+
golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw=
405405
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
406406
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
407407
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

internal/config/sse.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,21 @@ import (
2323
type SSEConfig interface {
2424
Config
2525
RabbitMQURI() string
26+
RabbitMQStreamName() string
2627
}
2728

2829
type sseCfg struct {
2930
Config
30-
rabbitmqURI string
31+
rabbitmqURI string
32+
rabbitmqStreamName string
3133
}
3234

3335
// NewSSEConfig creates a sse config interface based on input parameters or environment variables.
3436
func NewSSEConfig() SSEConfig {
3537
var conf sseCfg
3638

37-
flag.StringVar(&conf.rabbitmqURI, "rabbitmquri", os.Getenv("ETOS_RABBITMQ_URI"), "URI to the RabbitMQ ")
39+
flag.StringVar(&conf.rabbitmqURI, "rabbitmquri", os.Getenv("ETOS_RABBITMQ_URI"), "URI to the RabbitMQ")
40+
flag.StringVar(&conf.rabbitmqStreamName, "rabbitmqstreamname", os.Getenv("ETOS_RABBITMQ_STREAM_NAME"), "Stream name to use with RabbitMQ.")
3841
base := load()
3942
conf.Config = base
4043
flag.Parse()
@@ -45,3 +48,8 @@ func NewSSEConfig() SSEConfig {
4548
func (c *sseCfg) RabbitMQURI() string {
4649
return c.rabbitmqURI
4750
}
51+
52+
// RabbitMQStreamName returns the stream name to use with RabbitMQ.
53+
func (c *sseCfg) RabbitMQStreamName() string {
54+
return c.rabbitmqStreamName
55+
}

internal/stream/rabbitmq.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,16 @@ const IgnoreUnfiltered = false
3535
type RabbitMQStreamer struct {
3636
environment *stream.Environment
3737
logger *logrus.Entry
38+
streamName string
3839
}
3940

4041
// NewRabbitMQStreamer creates a new RabbitMQ streamer. Only a single connection should be created.
41-
func NewRabbitMQStreamer(options stream.EnvironmentOptions, logger *logrus.Entry) (Streamer, error) {
42+
func NewRabbitMQStreamer(options stream.EnvironmentOptions, logger *logrus.Entry, streamName string) (Streamer, error) {
4243
env, err := stream.NewEnvironment(&options)
4344
if err != nil {
4445
log.Fatal(err)
4546
}
46-
return &RabbitMQStreamer{environment: env, logger: logger}, err
47+
return &RabbitMQStreamer{environment: env, logger: logger, streamName: streamName}, err
4748
}
4849

4950
// CreateStream creates a new RabbitMQ stream.
@@ -52,16 +53,15 @@ func (s *RabbitMQStreamer) CreateStream(ctx context.Context, logger *logrus.Entr
5253
// This will create the stream if not already created.
5354
return s.environment.DeclareStream(name,
5455
&stream.StreamOptions{
55-
// TODO: More sane numbers
5656
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
57-
MaxAge: time.Second * 10,
57+
MaxAge: time.Hour * 48,
5858
},
5959
)
6060
}
6161

6262
// NewStream creates a new stream struct to consume from.
6363
func (s *RabbitMQStreamer) NewStream(ctx context.Context, logger *logrus.Entry, name string) (Stream, error) {
64-
exists, err := s.environment.StreamExists(name)
64+
exists, err := s.environment.StreamExists(s.streamName)
6565
if err != nil {
6666
return nil, err
6767
}
@@ -73,7 +73,7 @@ func (s *RabbitMQStreamer) NewStream(ctx context.Context, logger *logrus.Entry,
7373
SetConsumerName(name).
7474
SetCRCCheck(false).
7575
SetOffset(stream.OffsetSpecification{}.First())
76-
return &RabbitMQStream{ctx: ctx, logger: logger, streamName: name, environment: s.environment, options: options}, nil
76+
return &RabbitMQStream{ctx: ctx, logger: logger, streamName: s.streamName, environment: s.environment, options: options}, nil
7777
}
7878

7979
// Close the RabbitMQ connection.

pkg/events/events.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ func New(data []byte) (Event, error) {
4141
}
4242
e.Data = string(data)
4343
return e, nil
44-
4544
}
4645

4746
// Write an event to a writer object

python/src/etos_api/__init__.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616
"""ETOS API module."""
17+
1718
import os
1819
from importlib.metadata import PackageNotFoundError, version
1920

@@ -38,10 +39,6 @@
3839
from .library.providers.register import RegisterProviders
3940
from .main import APP
4041

41-
# The API shall not send logs to RabbitMQ as it
42-
# is too early in the ETOS test run.
43-
os.environ["ETOS_ENABLE_SENDING_LOGS"] = "false"
44-
4542
try:
4643
VERSION = version("etos_api")
4744
except PackageNotFoundError:

python/src/etos_api/library/context_logging.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616
"""ETOS API context based logging."""
17+
1718
import logging
1819
from contextvars import ContextVar
1920
from etos_lib.logging.logger import FORMAT_CONFIG
@@ -32,46 +33,47 @@ class ContextLogging(logging.Logger):
3233
get for each logging method called.
3334
"""
3435

36+
# Default identifier is 'Unknown' which is ignored by the ETOS internal messagebus.
3537
identifier = ContextVar("identifier")
3638

3739
def critical(self, msg, *args, **kwargs):
3840
"""Add identifier to critical calls.
3941
4042
For documentation read :obj:`logging.Logger.critical`
4143
"""
42-
FORMAT_CONFIG.identifier = self.identifier.get("Main") # Default=Main
44+
FORMAT_CONFIG.identifier = self.identifier.get("Unknown") # Default=Unknown
4345
return super().critical(msg, *args, **kwargs)
4446

4547
def error(self, msg, *args, **kwargs):
4648
"""Add identifier to error calls.
4749
4850
For documentation read :obj:`logging.Logger.error`
4951
"""
50-
FORMAT_CONFIG.identifier = self.identifier.get("Main") # Default=Main
52+
FORMAT_CONFIG.identifier = self.identifier.get("Unknown") # Default=Unknown
5153
return super().error(msg, *args, **kwargs)
5254

5355
def warning(self, msg, *args, **kwargs):
5456
"""Add identifier to warning calls.
5557
5658
For documentation read :obj:`logging.Logger.warning`
5759
"""
58-
FORMAT_CONFIG.identifier = self.identifier.get("Main") # Default=Main
60+
FORMAT_CONFIG.identifier = self.identifier.get("Unknown") # Default=Unknown
5961
return super().warning(msg, *args, **kwargs)
6062

6163
def info(self, msg, *args, **kwargs):
6264
"""Add identifier to info calls.
6365
6466
For documentation read :obj:`logging.Logger.info`
6567
"""
66-
FORMAT_CONFIG.identifier = self.identifier.get("Main") # Default=Main
68+
FORMAT_CONFIG.identifier = self.identifier.get("Unknown") # Default=Unknown
6769
return super().info(msg, *args, **kwargs)
6870

6971
def debug(self, msg, *args, **kwargs):
7072
"""Add identifier to debug calls.
7173
7274
For documentation read :obj:`logging.Logger.debug`
7375
"""
74-
FORMAT_CONFIG.identifier = self.identifier.get("Main") # Default=Main
76+
FORMAT_CONFIG.identifier = self.identifier.get("Unknown") # Default=Unknown
7577
return super().debug(msg, *args, **kwargs)
7678

7779

0 commit comments

Comments
 (0)