From 5a1ed706b48e2c51dd6c2a74f6a9b09637de012c Mon Sep 17 00:00:00 2001 From: assamluo Date: Thu, 7 Aug 2025 17:34:02 +0800 Subject: [PATCH 1/3] feat(kafka-queue): plain|scram-sha-256|scram-sha-512 mechanism config support --- kq/config.go | 1 + kq/queue.go | 32 ++++++++++++++++++++++++++------ 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/kq/config.go b/kq/config.go index 6429dd8..f0b0187 100644 --- a/kq/config.go +++ b/kq/config.go @@ -23,4 +23,5 @@ type KqConf struct { Password string `json:",optional"` ForceCommit bool `json:",default=true"` CommitInOrder bool `json:",default=false"` + Mechanism string `json:",options=plain|scram-sha-256|scram-sha-512,default=plain"` } diff --git a/kq/queue.go b/kq/queue.go index b7f23ac..a5388dd 100644 --- a/kq/queue.go +++ b/kq/queue.go @@ -13,7 +13,9 @@ import ( "github.com/segmentio/kafka-go" _ "github.com/segmentio/kafka-go/gzip" _ "github.com/segmentio/kafka-go/lz4" + "github.com/segmentio/kafka-go/sasl" "github.com/segmentio/kafka-go/sasl/plain" + "github.com/segmentio/kafka-go/sasl/scram" _ "github.com/segmentio/kafka-go/snappy" "github.com/zeromicro/go-queue/kq/internal" "github.com/zeromicro/go-zero/core/contextx" @@ -102,14 +104,35 @@ func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.Mess q := kafkaQueues{ group: service.NewServiceGroup(), } + + var mechanism sasl.Mechanism + var err error + switch c.Mechanism { + case "plain": + mechanism = plain.Mechanism{ + Username: c.Username, + Password: c.Password, + } + case "scram-sha-256": + mechanism, err = scram.Mechanism(scram.SHA256, c.Username, c.Password) + if err != nil { + return nil, err + } + case "scram-sha-512": + mechanism, err = scram.Mechanism(scram.SHA512, c.Username, c.Password) + if err != nil { + return nil, err + } + } + for i := 0; i < c.Conns; i++ { - q.queues = append(q.queues, newKafkaQueue(c, handler, options)) + q.queues = append(q.queues, newKafkaQueue(c, handler, options, mechanism)) } return q, nil } -func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue.MessageQueue { +func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions, mechanism sasl.Mechanism) queue.MessageQueue { var offset int64 if c.Offset == firstOffset { offset = kafka.FirstOffset @@ -130,10 +153,7 @@ func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue } if len(c.Username) > 0 && len(c.Password) > 0 { readerConfig.Dialer = &kafka.Dialer{ - SASLMechanism: plain.Mechanism{ - Username: c.Username, - Password: c.Password, - }, + SASLMechanism: mechanism, } } if len(c.CaFile) > 0 { From 8cce566f5e45ad08ca373d0f9328845d653a6bca Mon Sep 17 00:00:00 2001 From: assamluo Date: Thu, 7 Aug 2025 17:34:34 +0800 Subject: [PATCH 2/3] chore: go mod tidy --- go.mod | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go.mod b/go.mod index 6b4c053..3466d7d 100644 --- a/go.mod +++ b/go.mod @@ -43,6 +43,9 @@ require ( github.com/redis/go-redis/v9 v9.5.3 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/stretchr/objx v0.5.2 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect From 9f24b09cc9d11c05f78c42eceb5af4bf08a57322 Mon Sep 17 00:00:00 2001 From: assamluo Date: Thu, 7 Aug 2025 17:37:13 +0800 Subject: [PATCH 3/3] fix: unhandled mechanism option --- kq/queue.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kq/queue.go b/kq/queue.go index a5388dd..9662113 100644 --- a/kq/queue.go +++ b/kq/queue.go @@ -123,6 +123,8 @@ func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.Mess if err != nil { return nil, err } + default: + return nil, errors.New("unsupported mechanism: " + c.Mechanism) } for i := 0; i < c.Conns; i++ {