Skip to content

Commit c84eb12

Browse files
authored
Change DDB kv to watch indefinetly (#7088)
1 parent ec404d2 commit c84eb12

File tree

3 files changed

+43
-2
lines changed

3 files changed

+43
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
* [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072
88
* [BUGFIX] Compactor: Avoid race condition which allow a grouper to not compact all partitions. #7082
99
* [BUGFIX] Fix bug where validating metric names uses the wrong validation logic. #7086
10+
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinetly for WatchKey. #7088
1011

1112
## 1.20.0 in progress
1213

pkg/ring/kv/dynamodb/client.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,9 @@ func (c *Client) CAS(ctx context.Context, key string, f func(in any) (out any, r
230230
}
231231

232232
func (c *Client) WatchKey(ctx context.Context, key string, f func(any) bool) {
233-
bo := backoff.New(ctx, c.backoffConfig)
233+
watchBackoffConfig := c.backoffConfig
234+
watchBackoffConfig.MaxRetries = 0
235+
bo := backoff.New(ctx, watchBackoffConfig)
234236

235237
for bo.Ongoing() {
236238
out, _, err := c.kv.Query(ctx, dynamodbKey{
@@ -272,7 +274,9 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(any) bool) {
272274
}
273275

274276
func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, any) bool) {
275-
bo := backoff.New(ctx, c.backoffConfig)
277+
watchBackoffConfig := c.backoffConfig
278+
watchBackoffConfig.MaxRetries = 0
279+
bo := backoff.New(ctx, watchBackoffConfig)
276280

277281
for bo.Ongoing() {
278282
out, _, err := c.kv.Query(ctx, dynamodbKey{

pkg/ring/kv/dynamodb/client_test.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,42 @@ func Test_WatchKey_UpdateStale(t *testing.T) {
263263
})
264264
}
265265

266+
func Test_WatchKey_AlwaysRetry(t *testing.T) {
267+
casBackoffConfig := backoff.Config{
268+
MinBackoff: 1 * time.Millisecond,
269+
MaxBackoff: 1 * time.Millisecond,
270+
MaxRetries: 5, // CAS should retry, but WatchKey should not
271+
}
272+
273+
ddbMock := NewDynamodbClientMock()
274+
codecMock := &CodecMock{}
275+
c := NewClientMock(ddbMock, codecMock, TestLogger{}, prometheus.NewPedanticRegistry(), defaultPullTime, casBackoffConfig)
276+
277+
// Mock Query to always fail
278+
ddbMock.On("Query").Return(map[string]dynamodbItem{}, errors.Errorf("query failed"))
279+
280+
// WatchKey should not retry on failure (MaxRetries=0), so it should only call Query once
281+
// and then fall back to stale data
282+
staleData := &DescMock{}
283+
staleData.On("Clone").Return(staleData).Once()
284+
285+
// Set up some stale data first
286+
c.updateStaleData(key, staleData, time.Now())
287+
288+
callCount := 0
289+
c.WatchKey(context.TODO(), key, func(i any) bool {
290+
callCount++
291+
// Should only be called once with stale data after the first query fails
292+
require.EqualValues(t, staleData, i)
293+
return false // Stop watching
294+
})
295+
296+
// Verify that Query was called exactly 11 times (1 initial + 10 retries due to hardcoded limit in WatchKey)
297+
// This confirms WatchKey has its own retry logic separate from backoff MaxRetries
298+
ddbMock.AssertNumberOfCalls(t, "Query", 11)
299+
require.Equal(t, 1, callCount, "Callback should be called once with stale data")
300+
}
301+
266302
func Test_CAS_UpdateStale(t *testing.T) {
267303
ddbMock := NewDynamodbClientMock()
268304
codecMock := &CodecMock{}

0 commit comments

Comments
 (0)