Skip to content

Commit 8453edc

Browse files
Merge pull request #110 from bitrise-io/retries
fix: ACI-3629 ACI-3603 Add missing retries and error handling
2 parents 4faba4d + f7d4b08 commit 8453edc

File tree

6 files changed

+160
-64
lines changed

6 files changed

+160
-64
lines changed

cmd/client.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package cmd
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"time"
78

89
"github.com/bitrise-io/bitrise-build-cache-cli/internal/build_cache/kv"
910
"github.com/bitrise-io/bitrise-build-cache-cli/internal/config/common"
11+
"github.com/bitrise-io/go-utils/retry"
1012
"github.com/bitrise-io/go-utils/v2/log"
1113
)
1214

@@ -52,8 +54,24 @@ func createKVClient(ctx context.Context,
5254
return nil, fmt.Errorf("new kv client: %w", err)
5355
}
5456

55-
if err := kvClient.GetCapabilities(ctx); err != nil {
56-
return nil, fmt.Errorf("get capabilities: %w", err)
57+
err = retry.Times(10).Wait(3 * time.Second).TryWithAbort(func(attempt uint) (error, bool) {
58+
if attempt > 0 {
59+
params.Logger.Debugf("Retrying GetCapabilities... (attempt %d)", attempt)
60+
}
61+
62+
if err := kvClient.GetCapabilities(ctx); err != nil {
63+
params.Logger.Errorf("Error in GetCapabilities attempt %d: %s", attempt, err)
64+
if errors.Is(err, kv.ErrCacheUnauthenticated) {
65+
return kv.ErrCacheUnauthenticated, true
66+
}
67+
68+
return fmt.Errorf("get capabilities: %w", err), false
69+
}
70+
71+
return nil, false
72+
})
73+
if err != nil {
74+
return nil, fmt.Errorf("with retries: %w", err)
5775
}
5876

5977
return kvClient, nil

internal/build_cache/kv/download.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
// ErrCacheNotFound ...
1818
var ErrCacheNotFound = errors.New("no cache archive found for the provided keys")
19+
var ErrCacheUnauthenticated = errors.New("unauthenticated")
1920

2021
// ErrFileExistsAndNotWritable ...
2122
var ErrFileExistsAndNotWritable = errors.New("file already exists and is not writable")
@@ -94,6 +95,9 @@ func (c *Client) DownloadStream(ctx context.Context, destination io.Writer, key
9495
if ok && st.Code() == codes.NotFound {
9596
return ErrCacheNotFound
9697
}
98+
if ok && st.Code() == codes.Unauthenticated {
99+
return ErrCacheUnauthenticated
100+
}
97101

98102
return fmt.Errorf("download archive: %w", err)
99103
}

internal/build_cache/kv/download_multi.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,16 @@ package kv
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"os"
78
"sync"
9+
"sync/atomic"
810
"time"
911

12+
"github.com/bitrise-io/bitrise-build-cache-cli/internal/filegroup"
1013
"github.com/bitrise-io/go-utils/retry"
1114
"github.com/dustin/go-humanize"
12-
13-
"errors"
14-
15-
"sync/atomic"
16-
17-
"github.com/bitrise-io/bitrise-build-cache-cli/internal/filegroup"
1815
)
1916

2017
type DownloadFilesStats struct {
@@ -56,25 +53,35 @@ func (c *Client) DownloadFileGroupFromBuildCache(ctx context.Context, dd filegro
5653
defer func() { <-semaphore }() // Release a slot in the semaphore
5754

5855
const retries = 3
59-
err := retry.Times(retries).Wait(3 * time.Second).TryWithAbort(func(_ uint) (error, bool) {
56+
err := retry.Times(retries).Wait(3 * time.Second).TryWithAbort(func(attempt uint) (error, bool) {
57+
if attempt > 0 {
58+
c.logger.Debugf("Retrying download... (attempt %d)", attempt)
59+
}
60+
6061
skipped, err := c.DownloadFile(ctx, file.Path, file.Hash, file.Mode, isDebugLogMode, forceOverwrite, skipExisting)
6162
if skipped {
6263
skippedFiles.Add(1)
6364

6465
return nil, false
6566
}
6667

67-
if errors.Is(err, ErrCacheNotFound) {
68+
if err != nil {
69+
c.logger.Errorf("Error in download file attempt %d: %s", attempt, err)
70+
}
71+
72+
switch {
73+
case errors.Is(err, ErrCacheUnauthenticated):
6874
return err, true
69-
} else if errors.Is(err, ErrFileExistsAndNotWritable) {
75+
case errors.Is(err, ErrCacheNotFound):
7076
return err, true
71-
}
72-
if err != nil {
77+
case errors.Is(err, ErrFileExistsAndNotWritable):
78+
return err, true
79+
case err != nil:
7380
return fmt.Errorf("download file: %w", err), false
7481
}
7582

7683
if err := os.Chtimes(file.Path, file.ModTime, file.ModTime); err != nil {
77-
return fmt.Errorf("failed to set file mod time: %w", err), true
84+
return fmt.Errorf("set file mod time: %w", err), true
7885
}
7986

8087
return nil, false

internal/build_cache/kv/methods.go

Lines changed: 84 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,12 @@ import (
99
"time"
1010

1111
remoteexecution "github.com/bitrise-io/bitrise-build-cache-cli/proto/build/bazel/remote/execution/v2"
12+
"github.com/bitrise-io/go-utils/retry"
1213
"github.com/dustin/go-humanize"
1314
"google.golang.org/genproto/googleapis/bytestream"
15+
"google.golang.org/grpc/codes"
1416
"google.golang.org/grpc/metadata"
17+
"google.golang.org/grpc/status"
1518
)
1619

1720
type PutParams struct {
@@ -26,12 +29,17 @@ type FileDigest struct {
2629
}
2730

2831
func (c *Client) GetCapabilities(ctx context.Context) error {
29-
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
32+
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
3033
defer cancel()
3134
callCtx := metadata.NewOutgoingContext(timeoutCtx, c.getMethodCallMetadata())
3235

3336
_, err := c.capabilitiesClient.GetCapabilities(callCtx, &remoteexecution.GetCapabilitiesRequest{})
3437
if err != nil {
38+
st, ok := status.FromError(err)
39+
if ok && st.Code() == codes.Unauthenticated {
40+
return ErrCacheUnauthenticated
41+
}
42+
3543
return fmt.Errorf("get capabilities: %w", err)
3644
}
3745

@@ -49,6 +57,11 @@ func (c *Client) InitiatePut(ctx context.Context, params PutParams) (io.WriteClo
4957

5058
stream, err := c.bitriseKVClient.Put(ctx)
5159
if err != nil {
60+
st, ok := status.FromError(err)
61+
if ok && st.Code() == codes.Unauthenticated {
62+
return nil, ErrCacheUnauthenticated
63+
}
64+
5265
return nil, fmt.Errorf("initiate put: %w", err)
5366
}
5467

@@ -75,6 +88,11 @@ func (c *Client) InitiateGet(ctx context.Context, name string) (io.ReadCloser, e
7588
}
7689
stream, err := c.bitriseKVClient.Get(ctx, readReq)
7790
if err != nil {
91+
st, ok := status.FromError(err)
92+
if ok && st.Code() == codes.Unauthenticated {
93+
return nil, ErrCacheUnauthenticated
94+
}
95+
7896
return nil, fmt.Errorf("initiate get: %w", err)
7997
}
8098

@@ -104,55 +122,86 @@ func (c *Client) Delete(ctx context.Context, name string) error {
104122
return nil
105123
}
106124

107-
func (c *Client) FindMissing(ctx context.Context, digests []*FileDigest) ([]*FileDigest, error) {
108-
var missingBlobs []*FileDigest
109-
blobDigests := convertToBlobDigests(digests)
110-
req := &remoteexecution.FindMissingBlobsRequest{
111-
BlobDigests: blobDigests,
112-
}
113-
c.logger.Debugf("Size of FindMissingBlobs request for %d blobs is %s", len(digests), humanize.Bytes(uint64(len(req.String()))))
114-
gRPCLimitBytes := 4 * 1024 * 1024 // gRPC limit is 4 MiB
115-
if len(req.String()) > gRPCLimitBytes {
116-
// Chunk up request blobs to fit into gRPC limits
117-
// Calculate the unit size of a blob (in practice can differ to the theoretical sha256(32 bytes) + size(8 bytes) = 40 bytes)
118-
digestUnitSize := float64(len(req.String())) / float64(len(digests))
119-
maxDigests := int(float64(gRPCLimitBytes) / digestUnitSize)
120-
for startIndex := 0; startIndex < len(digests); startIndex += maxDigests {
121-
endIndex := startIndex + maxDigests
122-
if endIndex > len(digests) {
123-
endIndex = len(digests)
124-
}
125-
req.BlobDigests = blobDigests[startIndex:endIndex]
126-
127-
timeoutCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
128-
callCtx := metadata.NewOutgoingContext(timeoutCtx, c.getMethodCallMetadata())
129-
130-
c.logger.Debugf("Calling FindMissingBlobs for chunk: digests[%d:%d]", startIndex, endIndex)
131-
resp, err := c.casClient.FindMissingBlobs(callCtx, req)
132-
133-
cancel()
134-
if err != nil {
135-
return nil, fmt.Errorf("find missing blobs[%d:%d]: %w", startIndex, endIndex, err)
136-
}
137-
missingBlobs = append(missingBlobs, convertToFileDigests(resp.GetMissingBlobDigests())...)
125+
func (c *Client) findMissing(ctx context.Context,
126+
req *remoteexecution.FindMissingBlobsRequest) ([]*FileDigest, error) {
127+
var resp *remoteexecution.FindMissingBlobsResponse
128+
err := retry.Times(3).Wait(3 * time.Second).TryWithAbort(func(attempt uint) (error, bool) {
129+
if attempt > 0 {
130+
c.logger.Debugf("Retrying FindMissingBlobs... (attempt %d)", attempt)
138131
}
139-
} else {
132+
140133
timeoutCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
141134
callCtx := metadata.NewOutgoingContext(timeoutCtx, c.getMethodCallMetadata())
142135

143-
resp, err := c.casClient.FindMissingBlobs(callCtx, req)
136+
var err error
137+
resp, err = c.casClient.FindMissingBlobs(callCtx, req)
144138

145139
cancel()
146140

147141
if err != nil {
142+
c.logger.Errorf("Error in FindMissingBlobs attempt %d: %s", attempt, err)
143+
144+
st, ok := status.FromError(err)
145+
if ok && st.Code() == codes.Unauthenticated {
146+
return ErrCacheUnauthenticated, false
147+
}
148+
149+
return fmt.Errorf("find missing blobs: %w", err), true
150+
}
151+
152+
return nil, false
153+
})
154+
if err != nil {
155+
return nil, fmt.Errorf("with retries: %w", err)
156+
}
157+
158+
return convertToFileDigests(resp.GetMissingBlobDigests()), nil
159+
}
160+
161+
func (c *Client) findMissingChunked(ctx context.Context,
162+
req *remoteexecution.FindMissingBlobsRequest,
163+
digests []*FileDigest,
164+
blobDigests []*remoteexecution.Digest,
165+
gRPCLimitBytes int) ([]*FileDigest, error) {
166+
var missingBlobs []*FileDigest
167+
// Chunk up request blobs to fit into gRPC limits
168+
// Calculate the unit size of a blob (in practice can differ to the theoretical sha256(32 bytes) + size(8 bytes) = 40 bytes)
169+
digestUnitSize := float64(len(req.String())) / float64(len(digests))
170+
maxDigests := int(float64(gRPCLimitBytes) / digestUnitSize)
171+
for startIndex := 0; startIndex < len(digests); startIndex += maxDigests {
172+
endIndex := startIndex + maxDigests
173+
if endIndex > len(digests) {
174+
endIndex = len(digests)
175+
}
176+
req.BlobDigests = blobDigests[startIndex:endIndex]
177+
c.logger.Debugf("Calling FindMissingBlobs for chunk: digests[%d:%d]", startIndex, endIndex)
178+
179+
var resp []*FileDigest
180+
var err error
181+
if resp, err = c.findMissing(ctx, req); err != nil {
148182
return nil, fmt.Errorf("find missing blobs: %w", err)
149183
}
150-
missingBlobs = convertToFileDigests(resp.GetMissingBlobDigests())
184+
185+
missingBlobs = append(missingBlobs, resp...)
151186
}
152187

153188
return missingBlobs, nil
154189
}
155190

191+
func (c *Client) FindMissing(ctx context.Context, digests []*FileDigest) ([]*FileDigest, error) {
192+
blobDigests := convertToBlobDigests(digests)
193+
req := &remoteexecution.FindMissingBlobsRequest{
194+
BlobDigests: blobDigests,
195+
}
196+
c.logger.Debugf("Size of FindMissingBlobs request for %d blobs is %s", len(digests), humanize.Bytes(uint64(len(req.String()))))
197+
gRPCLimitBytes := 4 * 1024 * 1024 // gRPC limit is 4 MiB
198+
if len(req.String()) > gRPCLimitBytes {
199+
return c.findMissingChunked(ctx, req, digests, blobDigests, gRPCLimitBytes)
200+
}
201+
202+
return c.findMissing(ctx, req)
203+
}
204+
156205
func convertToBlobDigests(digests []*FileDigest) []*remoteexecution.Digest {
157206
out := make([]*remoteexecution.Digest, 0, len(digests))
158207

internal/build_cache/kv/upload.go

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,17 @@ package kv
33
import (
44
"bytes"
55
"context"
6+
"errors"
67
"fmt"
78
"io"
89
"os"
910
"time"
1011

11-
"github.com/dustin/go-humanize"
12-
1312
"github.com/bitrise-io/bitrise-build-cache-cli/internal/hash"
1413
"github.com/bitrise-io/go-utils/retry"
14+
"github.com/dustin/go-humanize"
15+
"google.golang.org/grpc/codes"
16+
"google.golang.org/grpc/status"
1517
)
1618

1719
func (c *Client) UploadFileToBuildCache(ctx context.Context, filePath, key string) error {
@@ -58,11 +60,17 @@ func (c *Client) uploadToBuildCache(ctx context.Context, upload func(ctx context
5860
const retries = 3
5961
err := retry.Times(retries).Wait(5 * time.Second).TryWithAbort(func(attempt uint) (error, bool) {
6062
if attempt != 0 {
61-
c.logger.Debugf("Retrying archive upload... (attempt %d)", attempt+1)
63+
c.logger.Debugf("Retrying upload... (attempt %d)", attempt)
6264
}
6365

6466
if err := upload(ctx); err != nil {
65-
return err, false
67+
c.logger.Errorf("Error in upload attempt %d: %s", attempt, err)
68+
69+
if errors.Is(err, ErrCacheUnauthenticated) {
70+
return ErrCacheUnauthenticated, true
71+
}
72+
73+
return fmt.Errorf("upload: %w", err), false
6674
}
6775

6876
return nil, false
@@ -106,14 +114,18 @@ func (c *Client) uploadStream(ctx context.Context, source io.Reader, key, checks
106114
}
107115

108116
if size > 0 {
109-
if _, err := io.Copy(kvWriter, source); err != nil {
110-
return fmt.Errorf("upload archive: %w", err)
111-
}
117+
_, err = io.Copy(kvWriter, source)
112118
} else {
113119
// io.Copy does not write if there was no read
114-
if _, err := kvWriter.Write([]byte{}); err != nil {
115-
return fmt.Errorf("upload archive: %w", err)
116-
}
120+
_, err = kvWriter.Write([]byte{})
121+
}
122+
123+
st, ok := status.FromError(err)
124+
if ok && st.Code() == codes.Unauthenticated {
125+
return ErrCacheUnauthenticated
126+
}
127+
if err != nil {
128+
return fmt.Errorf("upload archive: %w", err)
117129
}
118130

119131
if err := kvWriter.Close(); err != nil {

0 commit comments

Comments
 (0)