Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 32 additions & 18 deletions checkpoint/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"path/filepath"
"slices"
"time"

"github.com/spf13/afero"
"go.uber.org/zap"
Expand Down Expand Up @@ -39,22 +40,31 @@ type Config struct {

// set to false if atxs are not compatible before and after the checkpoint recovery.
PreserveOwnAtx bool `mapstructure:"preserve-own-atx"`

RetryMax int `mapstructure:"retry-max"`
RetryDelay time.Duration `mapstructure:"retry-delay"`
IgnoreCheckpointReqErrors bool `mapstructure:"ignore-checkpoint-req-errors"`
}

func DefaultConfig() Config {
return Config{
PreserveOwnAtx: true,
RetryMax: 5,
RetryDelay: 3 * time.Second,
}
}

type RecoverConfig struct {
GoldenAtx types.ATXID
DataDir string
DbFile string
LocalDbFile string
NodeIDs []types.NodeID // IDs to preserve own ATXs
Uri string
Restore types.LayerID
GoldenAtx types.ATXID
DataDir string
DbFile string
LocalDbFile string
NodeIDs []types.NodeID // IDs to preserve own ATXs
Uri string
Restore types.LayerID
RetryMax int
RetryDelay time.Duration
IgnoreCheckpointReqErrors bool
}

func (c *RecoverConfig) DbPath() string {
Expand All @@ -73,25 +83,25 @@ func copyToLocalFile(
ctx context.Context,
logger *zap.Logger,
fs afero.Fs,
dataDir, uri string,
restore types.LayerID,
cfg *RecoverConfig,
) (string, error) {
parsed, err := url.Parse(uri)
parsed, err := url.Parse(cfg.Uri)
if err != nil {
return "", fmt.Errorf("%w: parse recovery URI %v", err, uri)
return "", fmt.Errorf("%w: parse recovery URI %v", err, cfg.Uri)
}
if parsed.Scheme != "http" && parsed.Scheme != "https" {
return "", fmt.Errorf("%w: %s", ErrUrlSchemeNotSupported, uri)
return "", fmt.Errorf("%w: %s", ErrUrlSchemeNotSupported, cfg.Uri)
}
if bdir, err := backupRecovery(fs, RecoveryDir(dataDir)); err != nil {
if bdir, err := backupRecovery(fs, RecoveryDir(cfg.DataDir)); err != nil {
return "", err
} else if bdir != "" {
logger.Info("old recovery data backed up", log.ZContext(ctx), zap.String("dir", bdir))
}
dst := RecoveryFilename(dataDir, filepath.Base(parsed.String()), restore)
if err = httpToLocalFile(ctx, parsed, fs, dst); err != nil {
dst := RecoveryFilename(cfg.DataDir, filepath.Base(parsed.String()), cfg.Restore)
if err = httpToLocalFile(ctx, parsed, fs, dst, cfg.RetryMax, cfg.RetryDelay); err != nil {
return "", err
}

logger.Info("checkpoint data persisted", log.ZContext(ctx), zap.String("file", dst))
return dst, nil
}
Expand Down Expand Up @@ -144,10 +154,14 @@ func Recover(
case errors.Is(err, ErrCheckpointNotFound):
logger.Info("no checkpoint file available. not recovering", zap.String("uri", cfg.Uri))
return nil, nil
case err != nil:
case err == nil:
return preserve, nil
case cfg.IgnoreCheckpointReqErrors && errors.Is(err, ErrCheckpointRequestFailed):
logger.Error("ignoring checkpoint request error", zap.Error(err))
return nil, nil
default:
return nil, err
}
return preserve, nil
}

func RecoverWithDb(
Expand All @@ -170,7 +184,7 @@ func RecoverWithDb(
return nil, fmt.Errorf("remove old bootstrap data: %w", err)
}
logger.Info("recover from uri", zap.String("uri", cfg.Uri))
cpFile, err := copyToLocalFile(ctx, logger, fs, cfg.DataDir, cfg.Uri, cfg.Restore)
cpFile, err := copyToLocalFile(ctx, logger, fs, cfg)
if err != nil {
return nil, err
}
Expand Down
160 changes: 107 additions & 53 deletions checkpoint/recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"net/http"
"net/http/httptest"
"path/filepath"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -131,65 +133,117 @@ func checkpointServer(t testing.TB) string {

func TestRecover(t *testing.T) {
t.Parallel()
url := checkpointServer(t)

tt := []struct {
name string
uri string
expErr error
}{
{
name: "http",
uri: fmt.Sprintf("%s/snapshot-15", url),
},
{
name: "url unreachable",
uri: "http://nowhere/snapshot-15",
expErr: checkpoint.ErrCheckpointNotFound,
},
{
name: "ftp",
uri: "ftp://snapshot-15",
expErr: checkpoint.ErrUrlSchemeNotSupported,
},
setup := func(t *testing.T, handler http.HandlerFunc) (afero.Fs, *checkpoint.RecoverConfig) {
t.Helper()
fs := afero.NewMemMapFs()

mux := http.NewServeMux()
mux.HandleFunc("GET /snapshot-15", handler)
ts := httptest.NewServer(mux)
t.Cleanup(ts.Close)
cfg := &checkpoint.RecoverConfig{
GoldenAtx: goldenAtx,
DataDir: t.TempDir(),
DbFile: "test.sql",
LocalDbFile: "local.sql",
NodeIDs: []types.NodeID{types.RandomNodeID()},
Uri: fmt.Sprintf("%s/snapshot-15", ts.URL),
Restore: types.LayerID(recoverLayer),
RetryMax: 5,
RetryDelay: 100 * time.Millisecond,
}
require.NoError(t, fs.MkdirAll(filepath.Join(cfg.DataDir, bootstrap.DirName), 0o700))
return fs, cfg
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
fs := afero.NewMemMapFs()
cfg := &checkpoint.RecoverConfig{
GoldenAtx: goldenAtx,
DataDir: t.TempDir(),
DbFile: "test.sql",
LocalDbFile: "local.sql",
NodeIDs: []types.NodeID{types.RandomNodeID()},
Uri: tc.uri,
Restore: types.LayerID(recoverLayer),
}
bsdir := filepath.Join(cfg.DataDir, bootstrap.DirName)
require.NoError(t, fs.MkdirAll(bsdir, 0o700))
db := sql.InMemory()
localDB := localsql.InMemory()
data, err := checkpoint.RecoverWithDb(context.Background(), zaptest.NewLogger(t), db, localDB, fs, cfg)
if tc.expErr != nil {
require.ErrorIs(t, err, tc.expErr)
t.Run("http", func(t *testing.T) {
db := sql.InMemory()
localDB := localsql.InMemory()
fs, cfg := setup(t, func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(checkpointData))
})

data, err := checkpoint.RecoverWithDb(context.Background(), zaptest.NewLogger(t), db, localDB, fs, cfg)
require.NoError(t, err)
require.Nil(t, data)

newDB, err := sql.Open("file:" + filepath.Join(cfg.DataDir, cfg.DbFile))
require.NoError(t, err)
require.NotNil(t, newDB)
defer newDB.Close()
verifyDbContent(t, newDB)

restore, err := recovery.CheckpointInfo(newDB)
require.NoError(t, err)
require.EqualValues(t, recoverLayer, restore)
})

t.Run("http+retry", func(t *testing.T) {
t.Parallel()
db := sql.InMemory()
localDB := localsql.InMemory()
var fail atomic.Bool
fail.Store(true)
fs, cfg := setup(t, func(w http.ResponseWriter, r *http.Request) {
if fail.CompareAndSwap(true, false) { // fail on first request
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("service unavailable"))
return
}
require.NoError(t, err)
require.Nil(t, data)
newDB, err := sql.Open("file:" + filepath.Join(cfg.DataDir, cfg.DbFile))
require.NoError(t, err)
require.NotNil(t, newDB)
defer newDB.Close()
verifyDbContent(t, newDB)
restore, err := recovery.CheckpointInfo(newDB)
require.NoError(t, err)
require.EqualValues(t, recoverLayer, restore)
exist, err := afero.Exists(fs, bsdir)
require.NoError(t, err)
require.False(t, exist)
w.Write([]byte(checkpointData))
})
}

data, err := checkpoint.RecoverWithDb(context.Background(), zaptest.NewLogger(t), db, localDB, fs, cfg)
require.NoError(t, err)
require.Nil(t, data)

newDB, err := sql.Open("file:" + filepath.Join(cfg.DataDir, cfg.DbFile))
require.NoError(t, err)
require.NotNil(t, newDB)
defer newDB.Close()
verifyDbContent(t, newDB)

restore, err := recovery.CheckpointInfo(newDB)
require.NoError(t, err)
require.EqualValues(t, recoverLayer, restore)
})

t.Run("not found", func(t *testing.T) {
t.Parallel()
db := sql.InMemory()
localDB := localsql.InMemory()
fs, cfg := setup(t, func(w http.ResponseWriter, r *http.Request) {})
cfg.Uri = strings.Replace(cfg.Uri, "/snapshot-15", "/snapshot-42", -1) // unavailable snapshot

data, err := checkpoint.RecoverWithDb(context.Background(), zaptest.NewLogger(t), db, localDB, fs, cfg)
require.ErrorIs(t, err, checkpoint.ErrCheckpointNotFound)
require.Nil(t, data)
})

t.Run("url unreachable", func(t *testing.T) {
t.Parallel()
db := sql.InMemory()
localDB := localsql.InMemory()
fs, cfg := setup(t, func(w http.ResponseWriter, r *http.Request) {})
cfg.Uri = "http://nowhere/snapshot-15" // unreachable url

data, err := checkpoint.RecoverWithDb(context.Background(), zaptest.NewLogger(t), db, localDB, fs, cfg)
require.ErrorIs(t, err, checkpoint.ErrCheckpointRequestFailed)
require.Nil(t, data)
})

t.Run("unsupported scheme", func(t *testing.T) {
t.Parallel()
db := sql.InMemory()
localDB := localsql.InMemory()
fs, cfg := setup(t, func(w http.ResponseWriter, r *http.Request) {})
cfg.Uri = "ftp://snapshot-15" // unsupported scheme

data, err := checkpoint.RecoverWithDb(context.Background(), zaptest.NewLogger(t), db, localDB, fs, cfg)
require.ErrorIs(t, err, checkpoint.ErrUrlSchemeNotSupported)
require.Nil(t, data)
})
}

func TestRecover_SameRecoveryInfo(t *testing.T) {
Expand Down
40 changes: 28 additions & 12 deletions checkpoint/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"path/filepath"
"time"

"github.com/hashicorp/go-retryablehttp"
"github.com/santhosh-tekuri/jsonschema/v5"
"github.com/spf13/afero"

Expand All @@ -24,8 +25,9 @@ import (
)

var (
ErrCheckpointNotFound = errors.New("checkpoint not found")
ErrUrlSchemeNotSupported = errors.New("url scheme not supported")
ErrCheckpointNotFound = errors.New("checkpoint not found")
ErrCheckpointRequestFailed = errors.New("checkpoint request failed")
ErrUrlSchemeNotSupported = errors.New("url scheme not supported")
)

type RecoveryFile struct {
Expand Down Expand Up @@ -109,22 +111,36 @@ func CopyFile(fs afero.Fs, src, dst string) error {
return rf.Copy(fs, srcf)
}

func httpToLocalFile(ctx context.Context, resource *url.URL, fs afero.Fs, dst string) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, resource.String(), nil)
func httpToLocalFile(
ctx context.Context,
resource *url.URL,
fs afero.Fs,
dst string,
retryMax int,
retryDelay time.Duration,
) error {
c := retryablehttp.NewClient()
c.RetryMax = retryMax
c.RetryWaitMin = retryDelay
c.RetryWaitMax = retryDelay * 2
c.Backoff = retryablehttp.LinearJitterBackoff

req, err := retryablehttp.NewRequestWithContext(ctx, http.MethodGet, resource.String(), nil)
if err != nil {
return fmt.Errorf("create http request: %w", err)
}
resp, err := (&http.Client{}).Do(req)
urlErr := &url.Error{}
switch {
case errors.As(err, &urlErr):
return ErrCheckpointNotFound
case err != nil:
return fmt.Errorf("http get recovery file: %w", err)

resp, err := c.Do(req)
if err != nil {
return fmt.Errorf("%w: %w", ErrCheckpointRequestFailed, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
switch resp.StatusCode {
case http.StatusOK:
case http.StatusNotFound:
return ErrCheckpointNotFound
default:
return fmt.Errorf("%w: status code %d", ErrCheckpointRequestFailed, resp.StatusCode)
}
rf, err := NewRecoveryFile(fs, dst)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ func AddFlags(flagSet *pflag.FlagSet, cfg *config.Config) (configPath *string) {
"recovery-uri", cfg.Recovery.Uri, "reset the node state based on the supplied checkpoint file")
flagSet.Uint32Var(&cfg.Recovery.Restore,
"recovery-layer", cfg.Recovery.Restore, "restart the mesh with the checkpoint file at this layer")
flagSet.BoolVar(&cfg.Recovery.IgnoreCheckpointReqErrors,
"ignore-checkpoint-req-errors", cfg.Recovery.IgnoreCheckpointReqErrors,
"ignore checkpoint request errors")

/** ======================== BaseConfig Flags ========================== **/
flagSet.StringVarP(&cfg.BaseConfig.DataDirParent, "data-folder", "d",
Expand Down
17 changes: 10 additions & 7 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,16 @@ func (app *App) loadCheckpoint(ctx context.Context) (*checkpoint.PreservedData,
}
}
cfg := &checkpoint.RecoverConfig{
GoldenAtx: types.ATXID(app.Config.Genesis.GoldenATX()),
DataDir: app.Config.DataDir(),
DbFile: dbFile,
LocalDbFile: localDbFile,
NodeIDs: nodeIDs,
Uri: app.Config.Recovery.Uri,
Restore: types.LayerID(app.Config.Recovery.Restore),
GoldenAtx: types.ATXID(app.Config.Genesis.GoldenATX()),
DataDir: app.Config.DataDir(),
DbFile: dbFile,
LocalDbFile: localDbFile,
NodeIDs: nodeIDs,
Uri: app.Config.Recovery.Uri,
Restore: types.LayerID(app.Config.Recovery.Restore),
RetryMax: app.Config.Recovery.RetryMax,
RetryDelay: app.Config.Recovery.RetryDelay,
IgnoreCheckpointReqErrors: app.Config.Recovery.IgnoreCheckpointReqErrors,
}

return checkpoint.Recover(ctx, app.log.Zap(), afero.NewOsFs(), cfg)
Expand Down
Loading