Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ require (
github.com/redis/go-redis/v9 v9.5.3 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions kq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ type KqConf struct {
Password string `json:",optional"`
ForceCommit bool `json:",default=true"`
CommitInOrder bool `json:",default=false"`
Mechanism string `json:",options=plain|scram-sha-256|scram-sha-512,default=plain"`
}
32 changes: 26 additions & 6 deletions kq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"github.com/segmentio/kafka-go"
_ "github.com/segmentio/kafka-go/gzip"
_ "github.com/segmentio/kafka-go/lz4"
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
_ "github.com/segmentio/kafka-go/snappy"
"github.com/zeromicro/go-queue/kq/internal"
"github.com/zeromicro/go-zero/core/contextx"
Expand Down Expand Up @@ -102,14 +104,35 @@ func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.Mess
q := kafkaQueues{
group: service.NewServiceGroup(),
}

var mechanism sasl.Mechanism
var err error
switch c.Mechanism {
case "plain":
mechanism = plain.Mechanism{
Username: c.Username,
Password: c.Password,
}
case "scram-sha-256":
mechanism, err = scram.Mechanism(scram.SHA256, c.Username, c.Password)
if err != nil {
return nil, err
}
case "scram-sha-512":
mechanism, err = scram.Mechanism(scram.SHA512, c.Username, c.Password)
if err != nil {
return nil, err
}
}

for i := 0; i < c.Conns; i++ {
q.queues = append(q.queues, newKafkaQueue(c, handler, options))
q.queues = append(q.queues, newKafkaQueue(c, handler, options, mechanism))
}

return q, nil
}

func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue.MessageQueue {
func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions, mechanism sasl.Mechanism) queue.MessageQueue {
var offset int64
if c.Offset == firstOffset {
offset = kafka.FirstOffset
Expand All @@ -130,10 +153,7 @@ func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue
}
if len(c.Username) > 0 && len(c.Password) > 0 {
readerConfig.Dialer = &kafka.Dialer{
SASLMechanism: plain.Mechanism{
Username: c.Username,
Password: c.Password,
},
SASLMechanism: mechanism,
}
}
if len(c.CaFile) > 0 {
Expand Down