diff --git a/pkg/config/config.go b/pkg/config/config.go index ee1a34bf0..ab193fe68 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -169,6 +169,9 @@ type NodeConfig struct { // Header configuration TrustedHash string `mapstructure:"trusted_hash" yaml:"trusted_hash" comment:"Initial trusted hash used to bootstrap the header exchange service. Allows nodes to start synchronizing from a specific trusted point in the chain instead of genesis. When provided, the node will fetch the corresponding header/block from peers using this hash and use it as a starting point for synchronization. If not provided, the node will attempt to fetch the genesis block instead."` + + // Pruning management configuration + Pruning PruningConfig `mapstructure:"pruning" yaml:"pruning"` } // LogConfig contains all logging configuration parameters @@ -244,6 +247,11 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Uint64(FlagMaxPendingBlocks, def.Node.MaxPendingBlocks, "maximum blocks pending DA confirmation before pausing block production (0 for no limit)") cmd.Flags().Duration(FlagLazyBlockTime, def.Node.LazyBlockTime.Duration, "maximum interval between blocks in lazy aggregation mode") + // Pruning configuration flags + cmd.Flags().String(FlagPruningStrategy, def.Node.Pruning.Strategy, "") + cmd.Flags().Uint64(FlagPruningKeepRecent, def.Node.Pruning.KeepRecent, "") + cmd.Flags().Uint64(FlagPruningInterval, def.Node.Pruning.Interval, "") + // Data Availability configuration flags cmd.Flags().String(FlagDAAddress, def.DA.Address, "DA address (host:port)") cmd.Flags().String(FlagDAAuthToken, def.DA.AuthToken, "DA auth token") diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index b3d52cd52..cca0fee87 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -51,6 +51,7 @@ var DefaultConfig = Config{ LazyBlockTime: DurationWrapper{60 * time.Second}, Light: false, TrustedHash: "", + Pruning: PruningConfigDefault, }, DA: DAConfig{ Address: "http://localhost:7980", diff --git a/pkg/config/pruning_config.go b/pkg/config/pruning_config.go new file mode 100644 index 000000000..a9475457e --- /dev/null +++ b/pkg/config/pruning_config.go @@ -0,0 +1,54 @@ +package config + +const ( + // Pruning configuration flags + + // FlagPruningStrategy is a flag for specifying strategy for pruning block store + FlagPruningStrategy = "rollkit.node.pruning.strategy" + // FlagPruningKeepRecent is a flag for specifying how many blocks need to keep in store + FlagPruningKeepRecent = "rollkit.node.pruning.keep_recent" + // FlagPruningInterval is a flag for specifying how offen prune blocks store + FlagPruningInterval = "rollkit.node.pruning.interval" +) + +const ( + PruningConfigStrategyNone = "none" + PruningConfigStrategyDefault = "default" + PruningConfigStrategyEverything = "everything" + PruningConfigStrategyCustom = "custom" +) + +var ( + PruningConfigNone = PruningConfig{ + Strategy: PruningConfigStrategyNone, + KeepRecent: 0, + Interval: 0, + } + PruningConfigDefault = PruningConfig{ + Strategy: PruningConfigStrategyDefault, + KeepRecent: 362880, + Interval: 10, + } + PruningConfigEverything = PruningConfig{ + Strategy: PruningConfigStrategyEverything, + KeepRecent: 2, + Interval: 10, + } + PruningConfigCustom = PruningConfig{ + Strategy: PruningConfigStrategyCustom, + KeepRecent: 100, + Interval: 100, + } +) + +// PruningConfig allows node operators to manage storage +type PruningConfig struct { + // todo: support volume-based strategy + Strategy string `mapstructure:"strategy" yaml:"strategy" comment:"Strategy determines the pruning approach (none, default, custom)"` + KeepRecent uint64 `mapstructure:"keep_recent" yaml:"keep_recent" comment:"Number of recent blocks to keep, used in \"default\" and \"custom\" strategies"` + Interval uint64 `mapstructure:"interval" yaml:"interval" comment:"how offen the pruning process should run"` + + // todo: support volume-based strategy + // VolumeConfig specifies configuration for volume-based storage + // VolumeConfig *VolumeStorageConfig `mapstructure:"volume_config" yaml:"volume_config"` +} diff --git a/pkg/store/pruning.go b/pkg/store/pruning.go new file mode 100644 index 000000000..cf88de06d --- /dev/null +++ b/pkg/store/pruning.go @@ -0,0 +1,132 @@ +package store + +import ( + "context" + "fmt" + + "github.com/rollkit/rollkit/pkg/config" + "github.com/rollkit/rollkit/types" +) + +type DefaultPruningStore struct { + Store + + Config config.PruningConfig +} + +var _ PruningStore = &DefaultPruningStore{} + +// NewDefaultPruningStore returns default pruning store. +func NewDefaultPruningStore(store Store, config config.PruningConfig) PruningStore { + return &DefaultPruningStore{ + Store: store, + Config: config, + } +} + +// Close safely closes underlying data storage, to ensure that data is actually saved. +func (s *DefaultPruningStore) Close() error { + return s.Store.Close() +} + +// SetHeight sets the height saved in the Store if it is higher than the existing height +func (s *DefaultPruningStore) SetHeight(ctx context.Context, height uint64) error { + return s.Store.SetHeight(ctx, height) +} + +// Height returns height of the highest block saved in the Store. +func (s *DefaultPruningStore) Height(ctx context.Context) (uint64, error) { + return s.Store.Height(ctx) +} + +// SaveBlockData adds block header and data to the store along with corresponding signature. +// Stored height is updated if block height is greater than stored value. +func (s *DefaultPruningStore) SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error { + if err := s.PruneBlockData(ctx); err != nil { + return fmt.Errorf("failed to prune block data: %w", err) + } + + return s.Store.SaveBlockData(ctx, header, data, signature) +} + +// DeleteBlockData deletes block at given height. +func (s *DefaultPruningStore) DeleteBlockData(ctx context.Context, height uint64) error { + return s.Store.DeleteBlockData(ctx, height) +} + +// GetBlockData returns block header and data at given height, or error if it's not found in Store. +func (s *DefaultPruningStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) { + return s.Store.GetBlockData(ctx, height) +} + +// GetBlockByHash returns block with given block header hash, or error if it's not found in Store. +func (s *DefaultPruningStore) GetBlockByHash(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) { + return s.Store.GetBlockByHash(ctx, hash) +} + +// GetSignatureByHash returns signature for a block at given height, or error if it's not found in Store. +func (s *DefaultPruningStore) GetSignatureByHash(ctx context.Context, hash []byte) (*types.Signature, error) { + return s.Store.GetSignatureByHash(ctx, hash) +} + +// GetSignature returns signature for a block with given block header hash, or error if it's not found in Store. +func (s *DefaultPruningStore) GetSignature(ctx context.Context, height uint64) (*types.Signature, error) { + return s.Store.GetSignature(ctx, height) +} + +// UpdateState updates state saved in Store. Only one State is stored. +// If there is no State in Store, state will be saved. +func (s *DefaultPruningStore) UpdateState(ctx context.Context, state types.State) error { + return s.Store.UpdateState(ctx, state) +} + +// GetState returns last state saved with UpdateState. +func (s *DefaultPruningStore) GetState(ctx context.Context) (types.State, error) { + return s.Store.GetState(ctx) +} + +// SetMetadata saves arbitrary value in the store. +// +// Metadata is separated from other data by using prefix in KV. +func (s *DefaultPruningStore) SetMetadata(ctx context.Context, key string, value []byte) error { + return s.Store.SetMetadata(ctx, key, value) +} + +// GetMetadata returns values stored for given key with SetMetadata. +func (s *DefaultPruningStore) GetMetadata(ctx context.Context, key string) ([]byte, error) { + return s.Store.GetMetadata(ctx, key) +} + +func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error { + var ( + err error + ) + + // Skip if strategy is none. + if s.Config.Strategy == config.PruningConfigStrategyNone { + return nil + } + + height, err := s.Height(ctx) + if err != nil { + return err + } + + // Skip if it's a correct interval or latest height is less or equal than number of blocks need to keep. + if height%s.Config.Interval != 0 || height <= s.Config.KeepRecent { + return nil + } + + // Must keep latest 2 blocks. + endHeight := height - 1 - s.Config.KeepRecent + startHeight := min(0, endHeight-s.Config.KeepRecent) + + for i := startHeight; i <= endHeight; i++ { + err = s.DeleteBlockData(ctx, i) + if err != nil { + return err + } + } + + return nil +} diff --git a/pkg/store/store.go b/pkg/store/store.go index be5d1d962..583e63363 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -92,7 +92,7 @@ func (s *DefaultStore) SaveBlockData(ctx context.Context, header *types.SignedHe batch, err := s.db.Batch(ctx) if err != nil { - return fmt.Errorf("failed to create a new batch: %w", err) + return fmt.Errorf("failed to create a new batch for saving block data: %w", err) } if err := batch.Put(ctx, ds.NewKey(getHeaderKey(height)), headerBlob); err != nil { @@ -114,6 +114,31 @@ func (s *DefaultStore) SaveBlockData(ctx context.Context, header *types.SignedHe return nil } +// DeleteBlockData deletes block at given height. +func (s *DefaultStore) DeleteBlockData(ctx context.Context, height uint64) error { + batch, err := s.db.Batch(ctx) + if err != nil { + return fmt.Errorf("failed to create a new batch for deleting block data: %w", err) + } + + if err := batch.Delete(ctx, ds.NewKey(getHeaderKey(height))); err != nil { + return fmt.Errorf("failed to delete header blob in batch: %w", err) + } + if err := batch.Delete(ctx, ds.NewKey(getDataKey(height))); err != nil { + return fmt.Errorf("failed to delete data blob in batch: %w", err) + } + if err := batch.Delete(ctx, ds.NewKey(getSignatureKey(height))); err != nil { + return fmt.Errorf("failed to delete signature of block blob in batch: %w", err) + } + if err := batch.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit batch: %w", err) + } + + return nil +} + +// TODO: We unmarshal the header and data here, but then we re-marshal them to proto to hash or send them to DA, we should not unmarshal them here and allow the caller to handle them as needed. + // GetBlockData returns block header and data at given height, or error if it's not found in Store. func (s *DefaultStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) { headerBlob, err := s.db.Get(ctx, ds.NewKey(getHeaderKey(height))) diff --git a/pkg/store/types.go b/pkg/store/types.go index efd768ddb..51304af18 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -16,6 +16,8 @@ type Store interface { // SaveBlock saves block along with its seen signature (which will be included in the next block). SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error + // DeleteBlockData deletes block at given height. + DeleteBlockData(ctx context.Context, height uint64) error // GetBlock returns block at given height, or error if it's not found in Store. GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) @@ -44,3 +46,9 @@ type Store interface { // Close safely closes underlying data storage, to ensure that data is actually saved. Close() error } + +type PruningStore interface { + Store + + PruneBlockData(ctx context.Context) error +}