Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions exp/api/remote/remote_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ type apiOpts struct {
compression Compression
path string
retryOnRateLimit bool
retryCallback RetryCallback
}

var defaultAPIOpts = &apiOpts{
Expand Down Expand Up @@ -116,15 +115,6 @@ func WithAPIBackoff(backoff backoff.Config) APIOption {
}
}

// WithAPIRetryCallback sets a callback to be invoked on each retry attempt.
// This is useful for tracking retry metrics and debugging retry behavior.
func WithAPIRetryCallback(callback RetryCallback) APIOption {
return func(o *apiOpts) error {
o.retryCallback = callback
return nil
}
}

type nopSlogHandler struct{}

func (n nopSlogHandler) Enabled(context.Context, slog.Level) bool { return false }
Expand Down Expand Up @@ -174,6 +164,21 @@ func (r retryableError) RetryAfter() time.Duration {
return r.retryAfter
}

// WriteOption represents an option for Write method.
type WriteOption func(o *writeOpts)

type writeOpts struct {
retryCallback RetryCallback
}

// WithWriteRetryCallback sets a retry callback for this Write request.
// The callback is invoked each time the request is retried.
func WithWriteRetryCallback(callback RetryCallback) WriteOption {
return func(o *writeOpts) {
o.retryCallback = callback
}
}

type vtProtoEnabled interface {
SizeVT() int
MarshalToSizedBufferVT(dAtA []byte) (int, error)
Expand All @@ -193,7 +198,13 @@ type gogoProtoEnabled interface {
// will be used
// - If neither is supported, it will marshaled using generic google.golang.org/protobuf methods and
// error out on unknown scheme.
func (r *API) Write(ctx context.Context, msgType WriteMessageType, msg any) (_ WriteResponseStats, err error) {
func (r *API) Write(ctx context.Context, msgType WriteMessageType, msg any, opts ...WriteOption) (_ WriteResponseStats, err error) {
// Parse write options.
var writeOpts writeOpts
for _, opt := range opts {
opt(&writeOpts)
}

buf := r.bufPool.Get().(*[]byte)

if err := msgType.Validate(); err != nil {
Expand Down Expand Up @@ -280,9 +291,9 @@ func (r *API) Write(ctx context.Context, msgType WriteMessageType, msg any) (_ W

backoffDelay := b.NextDelay() + retryableErr.RetryAfter()

// Invoke retry callback if provided (after NextDelay which increments the retry counter).
if r.opts.retryCallback != nil {
r.opts.retryCallback(retryableErr.error)
// Invoke retry callback if provided.
if writeOpts.retryCallback != nil {
writeOpts.retryCallback(retryableErr.error)
}

r.opts.logger.Error("failed to send remote write request; retrying after backoff", "err", err, "backoff", backoffDelay)
Expand Down
20 changes: 11 additions & 9 deletions exp/api/remote/remote_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,17 +231,18 @@ func TestRemoteAPI_Write_WithHandler(t *testing.T) {
Max: 1 * time.Millisecond,
MaxRetries: 3,
}),
WithAPIRetryCallback(func(err error) {
retryCount++
retryErrors = append(retryErrors, err)
}),
)
if err != nil {
t.Fatal(err)
}

req := testV2()
_, err = client.Write(context.Background(), WriteV2MessageType, req)
_, err = client.Write(context.Background(), WriteV2MessageType, req,
WithWriteRetryCallback(func(err error) {
retryCount++
retryErrors = append(retryErrors, err)
}),
)
if err == nil {
t.Fatal("expected error, got nil")
}
Expand Down Expand Up @@ -274,16 +275,17 @@ func TestRemoteAPI_Write_WithHandler(t *testing.T) {
WithAPIHTTPClient(srv.Client()),
WithAPILogger(tLogger),
WithAPIPath("api/v1/write"),
WithAPIRetryCallback(func(err error) {
callbackInvoked = true
}),
)
if err != nil {
t.Fatal(err)
}

req := testV2()
_, err = client.Write(context.Background(), WriteV2MessageType, req)
_, err = client.Write(context.Background(), WriteV2MessageType, req,
WithWriteRetryCallback(func(err error) {
callbackInvoked = true
}),
)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading