Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ require (
github.com/redis/go-redis/v9 v9.5.3 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions kq/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ type KqConf struct {
Password string `json:",optional"`
ForceCommit bool `json:",default=true"`
CommitInOrder bool `json:",default=false"`
Mechanism string `json:",options=plain|scram-sha-256|scram-sha-512,default=plain"`
}
34 changes: 28 additions & 6 deletions kq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"github.com/segmentio/kafka-go"
_ "github.com/segmentio/kafka-go/gzip"
_ "github.com/segmentio/kafka-go/lz4"
"github.com/segmentio/kafka-go/sasl"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"
_ "github.com/segmentio/kafka-go/snappy"
"github.com/zeromicro/go-queue/kq/internal"
"github.com/zeromicro/go-zero/core/contextx"
Expand Down Expand Up @@ -102,14 +104,37 @@ func NewQueue(c KqConf, handler ConsumeHandler, opts ...QueueOption) (queue.Mess
q := kafkaQueues{
group: service.NewServiceGroup(),
}

var mechanism sasl.Mechanism
var err error
switch c.Mechanism {
case "plain":
mechanism = plain.Mechanism{
Username: c.Username,
Password: c.Password,
}
case "scram-sha-256":
mechanism, err = scram.Mechanism(scram.SHA256, c.Username, c.Password)
if err != nil {
return nil, err
}
case "scram-sha-512":
mechanism, err = scram.Mechanism(scram.SHA512, c.Username, c.Password)
if err != nil {
return nil, err
}
default:
return nil, errors.New("unsupported mechanism: " + c.Mechanism)
}

for i := 0; i < c.Conns; i++ {
q.queues = append(q.queues, newKafkaQueue(c, handler, options))
q.queues = append(q.queues, newKafkaQueue(c, handler, options, mechanism))
}

return q, nil
}

func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue.MessageQueue {
func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions, mechanism sasl.Mechanism) queue.MessageQueue {
var offset int64
if c.Offset == firstOffset {
offset = kafka.FirstOffset
Expand All @@ -130,10 +155,7 @@ func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue
}
if len(c.Username) > 0 && len(c.Password) > 0 {
readerConfig.Dialer = &kafka.Dialer{
SASLMechanism: plain.Mechanism{
Username: c.Username,
Password: c.Password,
},
SASLMechanism: mechanism,
}
}
if len(c.CaFile) > 0 {
Expand Down