Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 26 additions & 30 deletions internal/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"bytes"
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/producer"
"runtime"
"strconv"
"strings"
Expand Down Expand Up @@ -237,8 +239,8 @@ type traceDispatcher struct {
// support deliver trace message to other cluster.
namesrvs *namesrvs
// round robin index
rrindex int32
cli RMQClient
rrindex int32
traceProducer rocketmq.Producer
}

func NewTraceDispatcher(traceCfg *primitive.TraceConfig) *traceDispatcher {
Expand Down Expand Up @@ -273,28 +275,30 @@ func NewTraceDispatcher(traceCfg *primitive.TraceConfig) *traceDispatcher {
srvs.SetCredentials(traceCfg.Credentials)
}

cliOp := DefaultClientOptions()
cliOp.GroupName = traceCfg.GroupName
cliOp.NameServerAddrs = traceCfg.NamesrvAddrs
cliOp.InstanceName = "INNER_TRACE_CLIENT_DEFAULT"
cliOp.RetryTimes = 0
cliOp.Namesrv = srvs
cliOp.Credentials = traceCfg.Credentials
cli := GetOrNewRocketMQClient(cliOp, nil)
if cli == nil {
traceProducer, err := producer.NewDefaultProducer(
producer.WithGroupName(traceCfg.GroupName),
producer.WithNameServerAddrs(traceCfg.NamesrvAddrs),
producer.WithInstanceName("INNER_TRACE_CLIENT_DEFAULT"),
producer.WithRetry(0),
producer.WithNamesrv(srvs),
producer.WithCredentials(traceCfg.Credentials),
)
if err != nil {
panic(errors.Wrap(err, "new producer failed."))
}
if traceProducer == nil {
return nil
}
cliOp.Namesrv = cli.GetNameSrv()
return &traceDispatcher{
ctx: ctx,
cancel: cancel,

traceTopic: t,
access: traceCfg.Access,
input: make(chan TraceContext, 1024),
batchCh: make(chan []*TraceContext, 2048),
cli: cli,
namesrvs: srvs,
traceTopic: t,
access: traceCfg.Access,
input: make(chan TraceContext, 1024),
batchCh: make(chan []*TraceContext, 2048),
namesrvs: srvs,
traceProducer: traceProducer,
}
}

Expand All @@ -304,7 +308,7 @@ func (td *traceDispatcher) GetTraceTopicName() string {

func (td *traceDispatcher) Start() {
td.running = true
td.cli.Start()
td.traceProducer.Start()
maxWaitDuration := 5 * time.Millisecond
td.ticker = time.NewTicker(maxWaitDuration)
maxWaitTime := maxWaitDuration.Nanoseconds()
Expand Down Expand Up @@ -462,28 +466,20 @@ func (td *traceDispatcher) sendTraceDataByMQ(keySet Keyset, regionID string, dat
msg := primitive.NewMessage(traceTopic, []byte(data))
msg.WithKeys(keySet.slice())

mq, addr := td.findMq(regionID)
if mq == nil {
return
}

var req = td.buildSendRequest(mq, msg)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := td.cli.InvokeAsync(ctx, addr, req, func(command *remote.RemotingCommand, e error) {
err := td.traceProducer.SendAsync(ctx, func(ctx context.Context, resp *primitive.SendResult, err error) {
cancel()
resp := primitive.NewSendResult()
if e != nil {
if err != nil {
rlog.Info("send trace data error.", map[string]interface{}{
"traceData": data,
})
} else {
td.cli.ProcessSendResponse(mq.BrokerName, command, resp, msg)
rlog.Debug("send trace data success:", map[string]interface{}{
"SendResult": resp,
"traceData": data,
})
}
})
}, msg)
if err != nil {
cancel()
rlog.Info("send trace data error when invoke", map[string]interface{}{
Expand Down
12 changes: 12 additions & 0 deletions producer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ func WithGroupName(group string) Option {
}
}

func WithNameServerAddrs(nameServerAddrs primitive.NamesrvAddr) Option {
return func(opts *producerOptions) {
opts.NameServerAddrs = nameServerAddrs
}
}

func WithNamesrv(namesrv internal.Namesrvs) Option {
return func(opts *producerOptions) {
opts.Namesrv = namesrv
}
}

func WithInstanceName(name string) Option {
return func(opts *producerOptions) {
opts.InstanceName = name
Expand Down