diff --git a/block/async_store.go b/block/async_store.go new file mode 100644 index 000000000..d84c34171 --- /dev/null +++ b/block/async_store.go @@ -0,0 +1,49 @@ +package block + +import ( + "context" + "time" + + "cosmossdk.io/log" + "github.com/rollkit/rollkit/pkg/store" +) + +const DefaultFlushInterval = 10 * time.Second + +// AsyncPruner is a service that periodically prunes block data in the background. +type AsyncPruner struct { + ps store.PruningStore + + flushInterval time.Duration + logger log.Logger +} + +func NewAsyncPruner(pruningStore store.PruningStore, flushInterval time.Duration, logger log.Logger) *AsyncPruner { + return &AsyncPruner{ + ps: pruningStore, + + flushInterval: flushInterval, + logger: logger, + } +} + +// Start starts the async pruner that periodically prunes block data. +func (s *AsyncPruner) Start(ctx context.Context) { + ticker := time.NewTicker(s.flushInterval) + defer ticker.Stop() + + s.logger.Info("AsyncPruner started", "interval", s.flushInterval) + + for { + select { + case <-ctx.Done(): + s.logger.Info("AsyncPruner stopped") + return + case <-ticker.C: + err := s.ps.PruneBlockData(ctx) + if err != nil { + s.logger.Error("Failed to prune block data", "error", err) + } + } + } +} diff --git a/node/full.go b/node/full.go index ee8e223f2..5180be3d3 100644 --- a/node/full.go +++ b/node/full.go @@ -61,6 +61,7 @@ type FullNode struct { Store store.Store blockManager *block.Manager reaper *block.Reaper + asyncPruner *block.AsyncPruner prometheusSrv *http.Server pprofSrv *http.Server @@ -95,7 +96,7 @@ func newFullNode( return nil, err } - rktStore := store.New(mainKV) + rktStore := store.NewDefaultPruningStore(mainKV, nodeConfig.Node.Pruning) blockManager, err := initBlockManager( ctx, @@ -129,12 +130,18 @@ func newFullNode( // Connect the reaper to the manager for transaction notifications reaper.SetManager(blockManager) + asyncPruner := block.NewAsyncPruner( + rktStore, block.DefaultFlushInterval, + logger.With("module", "AsyncPruner"), + ) + node := &FullNode{ genesis: genesis, nodeConfig: nodeConfig, p2pClient: p2pClient, blockManager: blockManager, reaper: reaper, + asyncPruner: asyncPruner, da: da, Store: rktStore, hSyncService: headerSyncService, @@ -388,6 +395,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { spawnWorker(func() { n.blockManager.SyncLoop(ctx, errCh) }) spawnWorker(func() { n.blockManager.DAIncluderLoop(ctx, errCh) }) } + spawnWorker(func() { n.asyncPruner.Start(ctx) }) select { case err := <-errCh: diff --git a/node/light.go b/node/light.go index 75c85947b..60b25b2d3 100644 --- a/node/light.go +++ b/node/light.go @@ -47,7 +47,7 @@ func newLightNode( return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err) } - store := store.New(database) + store := store.NewDefaultPruningStore(database, conf.Node.Pruning) node := &LightNode{ P2P: p2pClient, diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index 49719bd5e..b5f0e4baf 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -32,6 +32,11 @@ func ParseConfig(cmd *cobra.Command) (rollconf.Config, error) { return rollconf.Config{}, fmt.Errorf("failed to load node config: %w", err) } + if nodeConfig.Node.Pruning.Strategy != rollconf.PruningConfigStrategyCustom { + pruningConfig := rollconf.GetPruningConfigFromStrategy(nodeConfig.Node.Pruning.Strategy) + nodeConfig.Node.Pruning = pruningConfig + } + if err := nodeConfig.Validate(); err != nil { return rollconf.Config{}, fmt.Errorf("failed to validate node config: %w", err) } diff --git a/pkg/config/config.go b/pkg/config/config.go index bdfaeb64a..554f04593 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -202,6 +202,9 @@ type NodeConfig struct { // Header configuration TrustedHash string `mapstructure:"trusted_hash" yaml:"trusted_hash" comment:"Initial trusted hash used to bootstrap the header exchange service. Allows nodes to start synchronizing from a specific trusted point in the chain instead of genesis. When provided, the node will fetch the corresponding header/block from peers using this hash and use it as a starting point for synchronization. If not provided, the node will attempt to fetch the genesis block instead."` + + // Pruning management configuration + Pruning PruningConfig `mapstructure:"pruning" yaml:"pruning"` } // LogConfig contains all logging configuration parameters @@ -243,6 +246,10 @@ func (c *Config) Validate() error { return fmt.Errorf("could not create directory %q: %w", fullDir, err) } + if err := c.Node.Pruning.Validate(); err != nil { + return fmt.Errorf("invalid pruning configuration: %w", err) + } + return nil } @@ -286,6 +293,11 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Uint64(FlagMaxPendingHeadersAndData, def.Node.MaxPendingHeadersAndData, "maximum headers or data pending DA confirmation before pausing block production (0 for no limit)") cmd.Flags().Duration(FlagLazyBlockTime, def.Node.LazyBlockInterval.Duration, "maximum interval between blocks in lazy aggregation mode") + // Pruning configuration flags + cmd.Flags().String(FlagPruningStrategy, def.Node.Pruning.Strategy, "pruning strategy (none, default, everything, custom)") + cmd.Flags().Uint64(FlagPruningKeepRecent, def.Node.Pruning.KeepRecent, "number of recent blocks to keep") + cmd.Flags().Uint64(FlagPruningInterval, def.Node.Pruning.Interval, "frequency of pruning operations") + // Data Availability configuration flags cmd.Flags().String(FlagDAAddress, def.DA.Address, "DA address (host:port)") cmd.Flags().String(FlagDAAuthToken, def.DA.AuthToken, "DA auth token") diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index a04bffa94..6cb03180f 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -50,6 +50,7 @@ var DefaultConfig = Config{ LazyBlockInterval: DurationWrapper{60 * time.Second}, Light: false, TrustedHash: "", + Pruning: PruningConfigDefault, }, DA: DAConfig{ Address: "http://localhost:7980", diff --git a/pkg/config/pruning_config.go b/pkg/config/pruning_config.go new file mode 100644 index 000000000..c8a67f3ba --- /dev/null +++ b/pkg/config/pruning_config.go @@ -0,0 +1,82 @@ +package config + +import ( + "errors" +) + +const ( + // Pruning configuration flags + + // FlagPruningStrategy is a flag for specifying strategy for pruning block store + FlagPruningStrategy = "rollkit.node.pruning.strategy" + // FlagPruningKeepRecent is a flag for specifying how many blocks need to keep in store + FlagPruningKeepRecent = "rollkit.node.pruning.keep_recent" + // FlagPruningInterval is a flag for specifying how often prune blocks store + FlagPruningInterval = "rollkit.node.pruning.interval" +) + +const ( + PruningConfigStrategyNone = "none" + PruningConfigStrategyDefault = "default" + PruningConfigStrategyEverything = "everything" + PruningConfigStrategyCustom = "custom" +) + +var ( + PruningConfigNone = PruningConfig{ + Strategy: PruningConfigStrategyNone, + KeepRecent: 0, + Interval: 0, + } + PruningConfigDefault = PruningConfig{ + Strategy: PruningConfigStrategyDefault, + KeepRecent: 362880, + Interval: 10, + } + PruningConfigEverything = PruningConfig{ + Strategy: PruningConfigStrategyEverything, + KeepRecent: 2, + Interval: 10, + } + PruningConfigCustom = PruningConfig{ + Strategy: PruningConfigStrategyCustom, + KeepRecent: 100, + Interval: 100, + } +) + +// PruningConfig allows node operators to manage storage +type PruningConfig struct { + // todo: support volume-based strategy + Strategy string `mapstructure:"strategy" yaml:"strategy" comment:"Strategy determines the pruning approach (none, default, everything, custom)"` + KeepRecent uint64 `mapstructure:"keep_recent" yaml:"keep_recent" comment:"Number of recent blocks to keep, used in \"custom\" strategy, must be greater or equal than 2"` + Interval uint64 `mapstructure:"interval" yaml:"interval" comment:"How often the pruning process should run, used in \"custom\" strategy"` + + // todo: support volume-based strategy + // VolumeConfig specifies configuration for volume-based storage + // VolumeConfig *VolumeStorageConfig `mapstructure:"volume_config" yaml:"volume_config"` +} + +func (p PruningConfig) Validate() error { + // Only Custom strategy requires validation. + if p.Strategy != PruningConfigStrategyCustom { + return nil + } + + if p.KeepRecent < 2 { + return errors.New("keep_recent must be greater or equal than 2 for custom pruning strategy") + } + + return nil +} + +func GetPruningConfigFromStrategy(strategy string) PruningConfig { + switch strategy { + case PruningConfigStrategyDefault: + return PruningConfigDefault + case PruningConfigStrategyEverything: + return PruningConfigEverything + default: + return PruningConfigNone + } +} diff --git a/pkg/store/pruning.go b/pkg/store/pruning.go new file mode 100644 index 000000000..5c3f4adde --- /dev/null +++ b/pkg/store/pruning.go @@ -0,0 +1,73 @@ +package store + +import ( + "context" + "sync" + + ds "github.com/ipfs/go-datastore" + + "github.com/rollkit/rollkit/pkg/config" + "github.com/rollkit/rollkit/types" +) + +// DefaultPruningStore is for a store that supports pruning of block data. +type DefaultPruningStore struct { + Store + + config config.PruningConfig + latestBlockDataHeight uint64 + mu sync.Mutex +} + +var _ PruningStore = &DefaultPruningStore{} + +func NewDefaultPruningStore(ds ds.Batching, config config.PruningConfig) PruningStore { + return &DefaultPruningStore{ + Store: &DefaultStore{db: ds}, + + config: config, + } +} + +// SaveBlockData saves the block data and updates the latest block data height. +func (s *DefaultPruningStore) SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error { + err := s.Store.SaveBlockData(ctx, header, data, signature) + if err == nil { + s.mu.Lock() + s.latestBlockDataHeight = header.Height() + s.mu.Unlock() + } + + return err +} + +func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error { + // Skip if strategy is none. + if s.config.Strategy == config.PruningConfigStrategyNone { + return nil + } + + // Read latest height after calling SaveBlockData. There is a delay between SetHeight and SaveBlockData. + s.mu.Lock() + height := s.latestBlockDataHeight + s.mu.Unlock() + + // Skip if not the correct interval or latest height is less or equal than number of blocks need to keep. + if height%s.config.Interval != 0 || height < s.config.KeepRecent { + return nil + } + + // Must keep at least 2 blocks(while strategy is everything). + endHeight := height - 1 - s.config.KeepRecent + startHeight := uint64(0) + if endHeight > s.config.Interval { + startHeight = endHeight - s.config.Interval + } + + for i := startHeight; i < endHeight; i++ { + // Could ignore for errors like not found. + _ = s.DeleteBlockData(ctx, i) + } + + return nil +} diff --git a/pkg/store/store.go b/pkg/store/store.go index 1d911f1e8..6dedfec44 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -82,7 +82,7 @@ func (s *DefaultStore) SaveBlockData(ctx context.Context, header *types.SignedHe batch, err := s.db.Batch(ctx) if err != nil { - return fmt.Errorf("failed to create a new batch: %w", err) + return fmt.Errorf("failed to create a new batch for saving block data: %w", err) } if err := batch.Put(ctx, ds.NewKey(getHeaderKey(height)), headerBlob); err != nil { @@ -104,6 +104,31 @@ func (s *DefaultStore) SaveBlockData(ctx context.Context, header *types.SignedHe return nil } +// DeleteBlockData deletes block at given height. +func (s *DefaultStore) DeleteBlockData(ctx context.Context, height uint64) error { + batch, err := s.db.Batch(ctx) + if err != nil { + return fmt.Errorf("failed to create a new batch for deleting block data: %w", err) + } + + if err := batch.Delete(ctx, ds.NewKey(getHeaderKey(height))); err != nil { + return fmt.Errorf("failed to delete header blob in batch: %w", err) + } + if err := batch.Delete(ctx, ds.NewKey(getDataKey(height))); err != nil { + return fmt.Errorf("failed to delete data blob in batch: %w", err) + } + if err := batch.Delete(ctx, ds.NewKey(getSignatureKey(height))); err != nil { + return fmt.Errorf("failed to delete signature of block blob in batch: %w", err) + } + if err := batch.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit batch: %w", err) + } + + return nil +} + +// TODO: We unmarshal the header and data here, but then we re-marshal them to proto to hash or send them to DA, we should not unmarshal them here and allow the caller to handle them as needed. + // GetBlockData returns block header and data at given height, or error if it's not found in Store. func (s *DefaultStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) { header, err := s.GetHeader(ctx, height) diff --git a/pkg/store/types.go b/pkg/store/types.go index ca935c08b..8b57913f6 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -16,6 +16,8 @@ type Store interface { // SaveBlockData saves block along with its seen signature (which will be included in the next block). SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error + // DeleteBlockData deletes block at given height. + DeleteBlockData(ctx context.Context, height uint64) error // GetBlockData returns block at given height, or error if it's not found in Store. GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) @@ -52,3 +54,9 @@ type Store interface { // Close safely closes underlying data storage, to ensure that data is actually saved. Close() error } + +type PruningStore interface { + Store + + PruneBlockData(ctx context.Context) error +}