From 291aea5d5f3d5f06baf92727184e8fd74eabaf31 Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Tue, 22 Apr 2025 19:17:21 +0800 Subject: [PATCH 01/25] basic config for pruning --- pkg/config/config.go | 8 ++++++ pkg/config/defaults.go | 1 + pkg/config/pruning_config.go | 54 ++++++++++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+) create mode 100644 pkg/config/pruning_config.go diff --git a/pkg/config/config.go b/pkg/config/config.go index ee1a34bf08..ab193fe686 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -169,6 +169,9 @@ type NodeConfig struct { // Header configuration TrustedHash string `mapstructure:"trusted_hash" yaml:"trusted_hash" comment:"Initial trusted hash used to bootstrap the header exchange service. Allows nodes to start synchronizing from a specific trusted point in the chain instead of genesis. When provided, the node will fetch the corresponding header/block from peers using this hash and use it as a starting point for synchronization. If not provided, the node will attempt to fetch the genesis block instead."` + + // Pruning management configuration + Pruning PruningConfig `mapstructure:"pruning" yaml:"pruning"` } // LogConfig contains all logging configuration parameters @@ -244,6 +247,11 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Uint64(FlagMaxPendingBlocks, def.Node.MaxPendingBlocks, "maximum blocks pending DA confirmation before pausing block production (0 for no limit)") cmd.Flags().Duration(FlagLazyBlockTime, def.Node.LazyBlockTime.Duration, "maximum interval between blocks in lazy aggregation mode") + // Pruning configuration flags + cmd.Flags().String(FlagPruningStrategy, def.Node.Pruning.Strategy, "") + cmd.Flags().Uint64(FlagPruningKeepRecent, def.Node.Pruning.KeepRecent, "") + cmd.Flags().Uint64(FlagPruningInterval, def.Node.Pruning.Interval, "") + // Data Availability configuration flags cmd.Flags().String(FlagDAAddress, def.DA.Address, "DA address (host:port)") cmd.Flags().String(FlagDAAuthToken, def.DA.AuthToken, "DA auth token") diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index b3d52cd521..cca0fee87e 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -51,6 +51,7 @@ var DefaultConfig = Config{ LazyBlockTime: DurationWrapper{60 * time.Second}, Light: false, TrustedHash: "", + Pruning: PruningConfigDefault, }, DA: DAConfig{ Address: "http://localhost:7980", diff --git a/pkg/config/pruning_config.go b/pkg/config/pruning_config.go new file mode 100644 index 0000000000..a9475457ed --- /dev/null +++ b/pkg/config/pruning_config.go @@ -0,0 +1,54 @@ +package config + +const ( + // Pruning configuration flags + + // FlagPruningStrategy is a flag for specifying strategy for pruning block store + FlagPruningStrategy = "rollkit.node.pruning.strategy" + // FlagPruningKeepRecent is a flag for specifying how many blocks need to keep in store + FlagPruningKeepRecent = "rollkit.node.pruning.keep_recent" + // FlagPruningInterval is a flag for specifying how offen prune blocks store + FlagPruningInterval = "rollkit.node.pruning.interval" +) + +const ( + PruningConfigStrategyNone = "none" + PruningConfigStrategyDefault = "default" + PruningConfigStrategyEverything = "everything" + PruningConfigStrategyCustom = "custom" +) + +var ( + PruningConfigNone = PruningConfig{ + Strategy: PruningConfigStrategyNone, + KeepRecent: 0, + Interval: 0, + } + PruningConfigDefault = PruningConfig{ + Strategy: PruningConfigStrategyDefault, + KeepRecent: 362880, + Interval: 10, + } + PruningConfigEverything = PruningConfig{ + Strategy: PruningConfigStrategyEverything, + KeepRecent: 2, + Interval: 10, + } + PruningConfigCustom = PruningConfig{ + Strategy: PruningConfigStrategyCustom, + KeepRecent: 100, + Interval: 100, + } +) + +// PruningConfig allows node operators to manage storage +type PruningConfig struct { + // todo: support volume-based strategy + Strategy string `mapstructure:"strategy" yaml:"strategy" comment:"Strategy determines the pruning approach (none, default, custom)"` + KeepRecent uint64 `mapstructure:"keep_recent" yaml:"keep_recent" comment:"Number of recent blocks to keep, used in \"default\" and \"custom\" strategies"` + Interval uint64 `mapstructure:"interval" yaml:"interval" comment:"how offen the pruning process should run"` + + // todo: support volume-based strategy + // VolumeConfig specifies configuration for volume-based storage + // VolumeConfig *VolumeStorageConfig `mapstructure:"volume_config" yaml:"volume_config"` +} From e5e808050ee5d2c98b1c26838e3c3b9557910981 Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Tue, 22 Apr 2025 19:18:41 +0800 Subject: [PATCH 02/25] prune block data --- pkg/store/pruning.go | 132 +++++++++++++++++++++++++++++++++++++++++++ pkg/store/store.go | 27 ++++++++- pkg/store/types.go | 8 +++ 3 files changed, 166 insertions(+), 1 deletion(-) create mode 100644 pkg/store/pruning.go diff --git a/pkg/store/pruning.go b/pkg/store/pruning.go new file mode 100644 index 0000000000..cf88de06d0 --- /dev/null +++ b/pkg/store/pruning.go @@ -0,0 +1,132 @@ +package store + +import ( + "context" + "fmt" + + "github.com/rollkit/rollkit/pkg/config" + "github.com/rollkit/rollkit/types" +) + +type DefaultPruningStore struct { + Store + + Config config.PruningConfig +} + +var _ PruningStore = &DefaultPruningStore{} + +// NewDefaultPruningStore returns default pruning store. +func NewDefaultPruningStore(store Store, config config.PruningConfig) PruningStore { + return &DefaultPruningStore{ + Store: store, + Config: config, + } +} + +// Close safely closes underlying data storage, to ensure that data is actually saved. +func (s *DefaultPruningStore) Close() error { + return s.Store.Close() +} + +// SetHeight sets the height saved in the Store if it is higher than the existing height +func (s *DefaultPruningStore) SetHeight(ctx context.Context, height uint64) error { + return s.Store.SetHeight(ctx, height) +} + +// Height returns height of the highest block saved in the Store. +func (s *DefaultPruningStore) Height(ctx context.Context) (uint64, error) { + return s.Store.Height(ctx) +} + +// SaveBlockData adds block header and data to the store along with corresponding signature. +// Stored height is updated if block height is greater than stored value. +func (s *DefaultPruningStore) SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error { + if err := s.PruneBlockData(ctx); err != nil { + return fmt.Errorf("failed to prune block data: %w", err) + } + + return s.Store.SaveBlockData(ctx, header, data, signature) +} + +// DeleteBlockData deletes block at given height. +func (s *DefaultPruningStore) DeleteBlockData(ctx context.Context, height uint64) error { + return s.Store.DeleteBlockData(ctx, height) +} + +// GetBlockData returns block header and data at given height, or error if it's not found in Store. +func (s *DefaultPruningStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) { + return s.Store.GetBlockData(ctx, height) +} + +// GetBlockByHash returns block with given block header hash, or error if it's not found in Store. +func (s *DefaultPruningStore) GetBlockByHash(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) { + return s.Store.GetBlockByHash(ctx, hash) +} + +// GetSignatureByHash returns signature for a block at given height, or error if it's not found in Store. +func (s *DefaultPruningStore) GetSignatureByHash(ctx context.Context, hash []byte) (*types.Signature, error) { + return s.Store.GetSignatureByHash(ctx, hash) +} + +// GetSignature returns signature for a block with given block header hash, or error if it's not found in Store. +func (s *DefaultPruningStore) GetSignature(ctx context.Context, height uint64) (*types.Signature, error) { + return s.Store.GetSignature(ctx, height) +} + +// UpdateState updates state saved in Store. Only one State is stored. +// If there is no State in Store, state will be saved. +func (s *DefaultPruningStore) UpdateState(ctx context.Context, state types.State) error { + return s.Store.UpdateState(ctx, state) +} + +// GetState returns last state saved with UpdateState. +func (s *DefaultPruningStore) GetState(ctx context.Context) (types.State, error) { + return s.Store.GetState(ctx) +} + +// SetMetadata saves arbitrary value in the store. +// +// Metadata is separated from other data by using prefix in KV. +func (s *DefaultPruningStore) SetMetadata(ctx context.Context, key string, value []byte) error { + return s.Store.SetMetadata(ctx, key, value) +} + +// GetMetadata returns values stored for given key with SetMetadata. +func (s *DefaultPruningStore) GetMetadata(ctx context.Context, key string) ([]byte, error) { + return s.Store.GetMetadata(ctx, key) +} + +func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error { + var ( + err error + ) + + // Skip if strategy is none. + if s.Config.Strategy == config.PruningConfigStrategyNone { + return nil + } + + height, err := s.Height(ctx) + if err != nil { + return err + } + + // Skip if it's a correct interval or latest height is less or equal than number of blocks need to keep. + if height%s.Config.Interval != 0 || height <= s.Config.KeepRecent { + return nil + } + + // Must keep latest 2 blocks. + endHeight := height - 1 - s.Config.KeepRecent + startHeight := min(0, endHeight-s.Config.KeepRecent) + + for i := startHeight; i <= endHeight; i++ { + err = s.DeleteBlockData(ctx, i) + if err != nil { + return err + } + } + + return nil +} diff --git a/pkg/store/store.go b/pkg/store/store.go index be5d1d962c..583e63363c 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -92,7 +92,7 @@ func (s *DefaultStore) SaveBlockData(ctx context.Context, header *types.SignedHe batch, err := s.db.Batch(ctx) if err != nil { - return fmt.Errorf("failed to create a new batch: %w", err) + return fmt.Errorf("failed to create a new batch for saving block data: %w", err) } if err := batch.Put(ctx, ds.NewKey(getHeaderKey(height)), headerBlob); err != nil { @@ -114,6 +114,31 @@ func (s *DefaultStore) SaveBlockData(ctx context.Context, header *types.SignedHe return nil } +// DeleteBlockData deletes block at given height. +func (s *DefaultStore) DeleteBlockData(ctx context.Context, height uint64) error { + batch, err := s.db.Batch(ctx) + if err != nil { + return fmt.Errorf("failed to create a new batch for deleting block data: %w", err) + } + + if err := batch.Delete(ctx, ds.NewKey(getHeaderKey(height))); err != nil { + return fmt.Errorf("failed to delete header blob in batch: %w", err) + } + if err := batch.Delete(ctx, ds.NewKey(getDataKey(height))); err != nil { + return fmt.Errorf("failed to delete data blob in batch: %w", err) + } + if err := batch.Delete(ctx, ds.NewKey(getSignatureKey(height))); err != nil { + return fmt.Errorf("failed to delete signature of block blob in batch: %w", err) + } + if err := batch.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit batch: %w", err) + } + + return nil +} + +// TODO: We unmarshal the header and data here, but then we re-marshal them to proto to hash or send them to DA, we should not unmarshal them here and allow the caller to handle them as needed. + // GetBlockData returns block header and data at given height, or error if it's not found in Store. func (s *DefaultStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) { headerBlob, err := s.db.Get(ctx, ds.NewKey(getHeaderKey(height))) diff --git a/pkg/store/types.go b/pkg/store/types.go index efd768ddb3..51304af186 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -16,6 +16,8 @@ type Store interface { // SaveBlock saves block along with its seen signature (which will be included in the next block). SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error + // DeleteBlockData deletes block at given height. + DeleteBlockData(ctx context.Context, height uint64) error // GetBlock returns block at given height, or error if it's not found in Store. GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) @@ -44,3 +46,9 @@ type Store interface { // Close safely closes underlying data storage, to ensure that data is actually saved. Close() error } + +type PruningStore interface { + Store + + PruneBlockData(ctx context.Context) error +} From 7ca9b679a2d7f38abc9010981d99044322c928b9 Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Tue, 22 Apr 2025 19:46:22 +0800 Subject: [PATCH 03/25] fix: use corresponding config for strategy add comments --- pkg/cmd/run_node.go | 3 +++ pkg/config/config.go | 6 +++--- pkg/config/pruning_config.go | 20 +++++++++++++++++--- 3 files changed, 23 insertions(+), 6 deletions(-) diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index cba4a362c3..76c3e49fb0 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -34,6 +34,9 @@ func ParseConfig(cmd *cobra.Command) (rollconf.Config, error) { return rollconf.Config{}, fmt.Errorf("failed to load node config: %w", err) } + pruningConfig := rollconf.GetPruningConfigFromStrategy(nodeConfig.Node.Pruning.Strategy) + nodeConfig.Node.Pruning = pruningConfig + if err := nodeConfig.Validate(); err != nil { return rollconf.Config{}, fmt.Errorf("failed to validate node config: %w", err) } diff --git a/pkg/config/config.go b/pkg/config/config.go index ab193fe686..56a9cc4b1b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -248,9 +248,9 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Duration(FlagLazyBlockTime, def.Node.LazyBlockTime.Duration, "maximum interval between blocks in lazy aggregation mode") // Pruning configuration flags - cmd.Flags().String(FlagPruningStrategy, def.Node.Pruning.Strategy, "") - cmd.Flags().Uint64(FlagPruningKeepRecent, def.Node.Pruning.KeepRecent, "") - cmd.Flags().Uint64(FlagPruningInterval, def.Node.Pruning.Interval, "") + cmd.Flags().String(FlagPruningStrategy, def.Node.Pruning.Strategy, "pruning strategy (none, default, everything, custom)") + cmd.Flags().Uint64(FlagPruningKeepRecent, def.Node.Pruning.KeepRecent, "number of recent blocks to keep") + cmd.Flags().Uint64(FlagPruningInterval, def.Node.Pruning.Interval, "frequency of pruning operations") // Data Availability configuration flags cmd.Flags().String(FlagDAAddress, def.DA.Address, "DA address (host:port)") diff --git a/pkg/config/pruning_config.go b/pkg/config/pruning_config.go index a9475457ed..36728f420e 100644 --- a/pkg/config/pruning_config.go +++ b/pkg/config/pruning_config.go @@ -44,11 +44,25 @@ var ( // PruningConfig allows node operators to manage storage type PruningConfig struct { // todo: support volume-based strategy - Strategy string `mapstructure:"strategy" yaml:"strategy" comment:"Strategy determines the pruning approach (none, default, custom)"` - KeepRecent uint64 `mapstructure:"keep_recent" yaml:"keep_recent" comment:"Number of recent blocks to keep, used in \"default\" and \"custom\" strategies"` - Interval uint64 `mapstructure:"interval" yaml:"interval" comment:"how offen the pruning process should run"` + Strategy string `mapstructure:"strategy" yaml:"strategy" comment:"Strategy determines the pruning approach (none, default, everything, custom)"` + KeepRecent uint64 `mapstructure:"keep_recent" yaml:"keep_recent" comment:"Number of recent blocks to keep, used in \"custom\" strategy"` + Interval uint64 `mapstructure:"interval" yaml:"interval" comment:"how offen the pruning process should run, used in \"custom\" strategy"` // todo: support volume-based strategy // VolumeConfig specifies configuration for volume-based storage // VolumeConfig *VolumeStorageConfig `mapstructure:"volume_config" yaml:"volume_config"` } + +func GetPruningConfigFromStrategy(strategy string) PruningConfig { + switch strategy { + case PruningConfigStrategyDefault: + return PruningConfigDefault + case PruningConfigStrategyEverything: + return PruningConfigEverything + case PruningConfigStrategyCustom: + return PruningConfigCustom + } + + // Return strategy "none" if unknown. + return PruningConfigNone +} From 82c76beb1ce186adc50f00edd289b5aec0d65611 Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Wed, 23 Apr 2025 02:13:35 +0800 Subject: [PATCH 04/25] fix: incorrect endHeight --- pkg/store/pruning.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/store/pruning.go b/pkg/store/pruning.go index cf88de06d0..98c5c40ca2 100644 --- a/pkg/store/pruning.go +++ b/pkg/store/pruning.go @@ -117,8 +117,8 @@ func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error { return nil } - // Must keep latest 2 blocks. - endHeight := height - 1 - s.Config.KeepRecent + // Must keep at least 2 blocks(while strategy is everything). + endHeight := height - s.Config.KeepRecent startHeight := min(0, endHeight-s.Config.KeepRecent) for i := startHeight; i <= endHeight; i++ { From cd92e3e7f392ba2fc2e9dd9927f937de4e0605df Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Thu, 24 Apr 2025 05:09:31 +0800 Subject: [PATCH 05/25] fix: incorrect range for pruning --- pkg/store/pruning.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/pkg/store/pruning.go b/pkg/store/pruning.go index 98c5c40ca2..2de50ec40a 100644 --- a/pkg/store/pruning.go +++ b/pkg/store/pruning.go @@ -2,8 +2,6 @@ package store import ( "context" - "fmt" - "github.com/rollkit/rollkit/pkg/config" "github.com/rollkit/rollkit/types" ) @@ -42,10 +40,6 @@ func (s *DefaultPruningStore) Height(ctx context.Context) (uint64, error) { // SaveBlockData adds block header and data to the store along with corresponding signature. // Stored height is updated if block height is greater than stored value. func (s *DefaultPruningStore) SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error { - if err := s.PruneBlockData(ctx); err != nil { - return fmt.Errorf("failed to prune block data: %w", err) - } - return s.Store.SaveBlockData(ctx, header, data, signature) } @@ -113,13 +107,16 @@ func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error { } // Skip if it's a correct interval or latest height is less or equal than number of blocks need to keep. - if height%s.Config.Interval != 0 || height <= s.Config.KeepRecent { + if height%s.Config.Interval != 0 || height < s.Config.KeepRecent { return nil } // Must keep at least 2 blocks(while strategy is everything). - endHeight := height - s.Config.KeepRecent - startHeight := min(0, endHeight-s.Config.KeepRecent) + endHeight := height + 1 - s.Config.KeepRecent + startHeight := uint64(0) + if endHeight > s.Config.Interval { + startHeight = endHeight - s.Config.Interval + } for i := startHeight; i <= endHeight; i++ { err = s.DeleteBlockData(ctx, i) From 942094478dda73d0fb990887857ccbb080acb468 Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Thu, 24 Apr 2025 05:11:34 +0800 Subject: [PATCH 06/25] chore: delete methods without any changes --- pkg/store/pruning.go | 71 +------------------------------------------- 1 file changed, 1 insertion(+), 70 deletions(-) diff --git a/pkg/store/pruning.go b/pkg/store/pruning.go index 2de50ec40a..6fd6cdb735 100644 --- a/pkg/store/pruning.go +++ b/pkg/store/pruning.go @@ -2,8 +2,8 @@ package store import ( "context" + "github.com/rollkit/rollkit/pkg/config" - "github.com/rollkit/rollkit/types" ) type DefaultPruningStore struct { @@ -22,75 +22,6 @@ func NewDefaultPruningStore(store Store, config config.PruningConfig) PruningSto } } -// Close safely closes underlying data storage, to ensure that data is actually saved. -func (s *DefaultPruningStore) Close() error { - return s.Store.Close() -} - -// SetHeight sets the height saved in the Store if it is higher than the existing height -func (s *DefaultPruningStore) SetHeight(ctx context.Context, height uint64) error { - return s.Store.SetHeight(ctx, height) -} - -// Height returns height of the highest block saved in the Store. -func (s *DefaultPruningStore) Height(ctx context.Context) (uint64, error) { - return s.Store.Height(ctx) -} - -// SaveBlockData adds block header and data to the store along with corresponding signature. -// Stored height is updated if block height is greater than stored value. -func (s *DefaultPruningStore) SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error { - return s.Store.SaveBlockData(ctx, header, data, signature) -} - -// DeleteBlockData deletes block at given height. -func (s *DefaultPruningStore) DeleteBlockData(ctx context.Context, height uint64) error { - return s.Store.DeleteBlockData(ctx, height) -} - -// GetBlockData returns block header and data at given height, or error if it's not found in Store. -func (s *DefaultPruningStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) { - return s.Store.GetBlockData(ctx, height) -} - -// GetBlockByHash returns block with given block header hash, or error if it's not found in Store. -func (s *DefaultPruningStore) GetBlockByHash(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) { - return s.Store.GetBlockByHash(ctx, hash) -} - -// GetSignatureByHash returns signature for a block at given height, or error if it's not found in Store. -func (s *DefaultPruningStore) GetSignatureByHash(ctx context.Context, hash []byte) (*types.Signature, error) { - return s.Store.GetSignatureByHash(ctx, hash) -} - -// GetSignature returns signature for a block with given block header hash, or error if it's not found in Store. -func (s *DefaultPruningStore) GetSignature(ctx context.Context, height uint64) (*types.Signature, error) { - return s.Store.GetSignature(ctx, height) -} - -// UpdateState updates state saved in Store. Only one State is stored. -// If there is no State in Store, state will be saved. -func (s *DefaultPruningStore) UpdateState(ctx context.Context, state types.State) error { - return s.Store.UpdateState(ctx, state) -} - -// GetState returns last state saved with UpdateState. -func (s *DefaultPruningStore) GetState(ctx context.Context) (types.State, error) { - return s.Store.GetState(ctx) -} - -// SetMetadata saves arbitrary value in the store. -// -// Metadata is separated from other data by using prefix in KV. -func (s *DefaultPruningStore) SetMetadata(ctx context.Context, key string, value []byte) error { - return s.Store.SetMetadata(ctx, key, value) -} - -// GetMetadata returns values stored for given key with SetMetadata. -func (s *DefaultPruningStore) GetMetadata(ctx context.Context, key string) ([]byte, error) { - return s.Store.GetMetadata(ctx, key) -} - func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error { var ( err error From 21c0b13a6da9b2a9de4c82fe90a5dd062a3d5597 Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Thu, 24 Apr 2025 05:17:43 +0800 Subject: [PATCH 07/25] chore: add comment --- pkg/store/pruning.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/store/pruning.go b/pkg/store/pruning.go index 6fd6cdb735..219d93ceee 100644 --- a/pkg/store/pruning.go +++ b/pkg/store/pruning.go @@ -52,7 +52,9 @@ func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error { for i := startHeight; i <= endHeight; i++ { err = s.DeleteBlockData(ctx, i) if err != nil { - return err + // Could ignore for errors like not found. + // This err is a placeholder for warning if there is a logger in the Store. + continue } } From 685dbe723ed38b9ccb3b6d49dd8c849f57846a95 Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Tue, 29 Apr 2025 07:23:15 +0800 Subject: [PATCH 08/25] feat: use `PruningStore` instead of `Store` --- node/full.go | 2 +- node/light.go | 2 +- pkg/store/pruning.go | 6 ++++-- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/node/full.go b/node/full.go index b2a8d9815f..8ebec4dc43 100644 --- a/node/full.go +++ b/node/full.go @@ -94,7 +94,7 @@ func newFullNode( return nil, err } - store := store.New(mainKV) + store := store.NewDefaultPruningStore(mainKV, nodeConfig.Node.Pruning) reaper := block.NewReaper( ctx, diff --git a/node/light.go b/node/light.go index ae88f7cf48..536fdb28d8 100644 --- a/node/light.go +++ b/node/light.go @@ -47,7 +47,7 @@ func newLightNode( return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err) } - store := store.New(database) + store := store.NewDefaultPruningStore(database, conf.Node.Pruning) node := &LightNode{ P2P: p2pClient, diff --git a/pkg/store/pruning.go b/pkg/store/pruning.go index 219d93ceee..250ae05b1a 100644 --- a/pkg/store/pruning.go +++ b/pkg/store/pruning.go @@ -3,6 +3,8 @@ package store import ( "context" + ds "github.com/ipfs/go-datastore" + "github.com/rollkit/rollkit/pkg/config" ) @@ -15,9 +17,9 @@ type DefaultPruningStore struct { var _ PruningStore = &DefaultPruningStore{} // NewDefaultPruningStore returns default pruning store. -func NewDefaultPruningStore(store Store, config config.PruningConfig) PruningStore { +func NewDefaultPruningStore(ds ds.Batching, config config.PruningConfig) PruningStore { return &DefaultPruningStore{ - Store: store, + Store: &DefaultStore{db: ds}, Config: config, } } From d5b16c23eab9b13e7e11f20891d95d68ae981b4d Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Tue, 29 Apr 2025 07:40:39 +0800 Subject: [PATCH 09/25] feat: pruning block data when everytime saving --- pkg/store/pruning.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/store/pruning.go b/pkg/store/pruning.go index 250ae05b1a..fd21aca520 100644 --- a/pkg/store/pruning.go +++ b/pkg/store/pruning.go @@ -6,6 +6,7 @@ import ( ds "github.com/ipfs/go-datastore" "github.com/rollkit/rollkit/pkg/config" + "github.com/rollkit/rollkit/types" ) type DefaultPruningStore struct { @@ -24,6 +25,16 @@ func NewDefaultPruningStore(ds ds.Batching, config config.PruningConfig) Pruning } } +// SaveBlockData adds block header and data to the store along with corresponding signature. +// It also prunes the block data if needed. +func (s *DefaultPruningStore) SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error { + if err := s.PruneBlockData(ctx); err != nil { + return err + } + + return s.Store.SaveBlockData(ctx, header, data, signature) +} + func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error { var ( err error From 350b21d45edd20b619c9a542cb691abb99a00246 Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Tue, 29 Apr 2025 07:41:40 +0800 Subject: [PATCH 10/25] chore: format comment --- pkg/store/types.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/store/types.go b/pkg/store/types.go index 51304af186..c006988639 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -14,12 +14,12 @@ type Store interface { // SetHeight sets the height saved in the Store if it is higher than the existing height. SetHeight(ctx context.Context, height uint64) error - // SaveBlock saves block along with its seen signature (which will be included in the next block). + // SaveBlockData saves block along with its seen signature (which will be included in the next block). SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error // DeleteBlockData deletes block at given height. DeleteBlockData(ctx context.Context, height uint64) error - // GetBlock returns block at given height, or error if it's not found in Store. + // GetBlockData returns block at given height, or error if it's not found in Store. GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) // GetBlockByHash returns block with given block header hash, or error if it's not found in Store. GetBlockByHash(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) From 7a044180a552813cab4df9c6db72ae90cb321dba Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Tue, 29 Apr 2025 07:56:20 +0800 Subject: [PATCH 11/25] fix: no need to delete block at endHeight --- pkg/store/pruning.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/store/pruning.go b/pkg/store/pruning.go index fd21aca520..e0d993d4aa 100644 --- a/pkg/store/pruning.go +++ b/pkg/store/pruning.go @@ -62,7 +62,7 @@ func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error { startHeight = endHeight - s.Config.Interval } - for i := startHeight; i <= endHeight; i++ { + for i := startHeight; i < endHeight; i++ { err = s.DeleteBlockData(ctx, i) if err != nil { // Could ignore for errors like not found. From a4d2fc1518e63fa7ffa8cb9cac087deecc111db2 Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Sat, 3 May 2025 00:34:03 +0800 Subject: [PATCH 12/25] chore: typo --- pkg/config/pruning_config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/config/pruning_config.go b/pkg/config/pruning_config.go index 36728f420e..b8330874fe 100644 --- a/pkg/config/pruning_config.go +++ b/pkg/config/pruning_config.go @@ -7,7 +7,7 @@ const ( FlagPruningStrategy = "rollkit.node.pruning.strategy" // FlagPruningKeepRecent is a flag for specifying how many blocks need to keep in store FlagPruningKeepRecent = "rollkit.node.pruning.keep_recent" - // FlagPruningInterval is a flag for specifying how offen prune blocks store + // FlagPruningInterval is a flag for specifying how often prune blocks store FlagPruningInterval = "rollkit.node.pruning.interval" ) @@ -46,7 +46,7 @@ type PruningConfig struct { // todo: support volume-based strategy Strategy string `mapstructure:"strategy" yaml:"strategy" comment:"Strategy determines the pruning approach (none, default, everything, custom)"` KeepRecent uint64 `mapstructure:"keep_recent" yaml:"keep_recent" comment:"Number of recent blocks to keep, used in \"custom\" strategy"` - Interval uint64 `mapstructure:"interval" yaml:"interval" comment:"how offen the pruning process should run, used in \"custom\" strategy"` + Interval uint64 `mapstructure:"interval" yaml:"interval" comment:"how often the pruning process should run, used in \"custom\" strategy"` // todo: support volume-based strategy // VolumeConfig specifies configuration for volume-based storage From 1b3ffcf501eec0ee6bb91fd84bf04eff75213d23 Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Fri, 23 May 2025 00:30:45 +0800 Subject: [PATCH 13/25] format --- pkg/config/defaults.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index d481bd1264..61a79bbf1e 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -51,7 +51,7 @@ var DefaultConfig = Config{ LazyBlockInterval: DurationWrapper{60 * time.Second}, Light: false, TrustedHash: "", - Pruning: PruningConfigDefault, + Pruning: PruningConfigDefault, }, DA: DAConfig{ Address: "http://localhost:7980", From ab0c7df2af12a832638e2c0ce8e6ffa33c1e24fe Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Wed, 28 May 2025 09:55:06 +0800 Subject: [PATCH 14/25] clean codes and comments --- pkg/store/pruning.go | 32 ++++---------------------------- pkg/store/types.go | 2 +- 2 files changed, 5 insertions(+), 29 deletions(-) diff --git a/pkg/store/pruning.go b/pkg/store/pruning.go index e0d993d4aa..a6d2e5ef86 100644 --- a/pkg/store/pruning.go +++ b/pkg/store/pruning.go @@ -6,7 +6,6 @@ import ( ds "github.com/ipfs/go-datastore" "github.com/rollkit/rollkit/pkg/config" - "github.com/rollkit/rollkit/types" ) type DefaultPruningStore struct { @@ -25,32 +24,13 @@ func NewDefaultPruningStore(ds ds.Batching, config config.PruningConfig) Pruning } } -// SaveBlockData adds block header and data to the store along with corresponding signature. -// It also prunes the block data if needed. -func (s *DefaultPruningStore) SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error { - if err := s.PruneBlockData(ctx); err != nil { - return err - } - - return s.Store.SaveBlockData(ctx, header, data, signature) -} - -func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error { - var ( - err error - ) - +func (s *DefaultPruningStore) PruneBlockData(ctx context.Context, height uint64) error { // Skip if strategy is none. if s.Config.Strategy == config.PruningConfigStrategyNone { return nil } - height, err := s.Height(ctx) - if err != nil { - return err - } - - // Skip if it's a correct interval or latest height is less or equal than number of blocks need to keep. + // Skip if not the correct interval or latest height is less or equal than number of blocks need to keep. if height%s.Config.Interval != 0 || height < s.Config.KeepRecent { return nil } @@ -63,12 +43,8 @@ func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error { } for i := startHeight; i < endHeight; i++ { - err = s.DeleteBlockData(ctx, i) - if err != nil { - // Could ignore for errors like not found. - // This err is a placeholder for warning if there is a logger in the Store. - continue - } + // Could ignore for errors like not found. + _ = s.DeleteBlockData(ctx, i) } return nil diff --git a/pkg/store/types.go b/pkg/store/types.go index c006988639..54fa6e0b4c 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -50,5 +50,5 @@ type Store interface { type PruningStore interface { Store - PruneBlockData(ctx context.Context) error + PruneBlockData(ctx context.Context, height uint64) error } From 7fcf6a8b2778a0db29adcbb748c947b8dffb42b2 Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Wed, 28 May 2025 10:34:02 +0800 Subject: [PATCH 15/25] async pruning store --- node/full.go | 2 +- node/light.go | 2 +- pkg/store/pruning.go | 63 +++++++++++++++++++++++++++++++++++++++----- 3 files changed, 59 insertions(+), 8 deletions(-) diff --git a/node/full.go b/node/full.go index afcd13e938..9d9d904bf0 100644 --- a/node/full.go +++ b/node/full.go @@ -93,7 +93,7 @@ func newFullNode( return nil, err } - store := store.NewDefaultPruningStore(mainKV, nodeConfig.Node.Pruning) + store := store.NewAsyncPruningStore(mainKV, nodeConfig.Node.Pruning) blockManager, err := initBlockManager( ctx, diff --git a/node/light.go b/node/light.go index 9513c966d8..6a4099ecc2 100644 --- a/node/light.go +++ b/node/light.go @@ -45,7 +45,7 @@ func newLightNode( return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err) } - store := store.NewDefaultPruningStore(database, conf.Node.Pruning) + store := store.NewAsyncPruningStore(database, conf.Node.Pruning) node := &LightNode{ P2P: p2pClient, diff --git a/pkg/store/pruning.go b/pkg/store/pruning.go index a6d2e5ef86..c603144c58 100644 --- a/pkg/store/pruning.go +++ b/pkg/store/pruning.go @@ -2,29 +2,33 @@ package store import ( "context" + "time" ds "github.com/ipfs/go-datastore" "github.com/rollkit/rollkit/pkg/config" + "github.com/rollkit/rollkit/types" ) -type DefaultPruningStore struct { +const DefaultFlushInterval = 1 * time.Second + +type defaultPruningStore struct { Store Config config.PruningConfig } -var _ PruningStore = &DefaultPruningStore{} +var _ PruningStore = &defaultPruningStore{} -// NewDefaultPruningStore returns default pruning store. -func NewDefaultPruningStore(ds ds.Batching, config config.PruningConfig) PruningStore { - return &DefaultPruningStore{ +// newDefaultPruningStore returns default pruning store. +func newDefaultPruningStore(ds ds.Batching, config config.PruningConfig) PruningStore { + return &defaultPruningStore{ Store: &DefaultStore{db: ds}, Config: config, } } -func (s *DefaultPruningStore) PruneBlockData(ctx context.Context, height uint64) error { +func (s *defaultPruningStore) PruneBlockData(ctx context.Context, height uint64) error { // Skip if strategy is none. if s.Config.Strategy == config.PruningConfigStrategyNone { return nil @@ -49,3 +53,50 @@ func (s *DefaultPruningStore) PruneBlockData(ctx context.Context, height uint64) return nil } + +type AsyncPruningStore struct { + PruningStore + + latestBlockDataHeight uint64 + flushInterval time.Duration +} + +var _ PruningStore = &AsyncPruningStore{} + +func NewAsyncPruningStore(ds ds.Batching, config config.PruningConfig) *AsyncPruningStore { + pruningStore := newDefaultPruningStore(ds, config) + + // todo: initialize latestBlockDataHeight from the store + return &AsyncPruningStore{ + PruningStore: pruningStore, + + flushInterval: DefaultFlushInterval, + } +} + +// SaveBlockData saves the block data and updates the latest block data height. +func (s *AsyncPruningStore) SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error { + err := s.PruningStore.SaveBlockData(ctx, header, data, signature) + if err == nil { + s.latestBlockDataHeight = header.Height() + } + + return err +} + +// Start begins the pruning process at the specified interval. +func (s *AsyncPruningStore) Start(ctx context.Context) { + // todo: use ctx for cancellation + ticker := time.NewTicker(s.flushInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // Currently PruneBlockData only returns nil. + _ = s.PruneBlockData(ctx, s.latestBlockDataHeight) + } + } +} From 4639bc9004e974ae210128d8a94e59d4d6a5b216 Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Wed, 28 May 2025 10:43:33 +0800 Subject: [PATCH 16/25] add validation and comments for config --- pkg/config/config.go | 4 ++++ pkg/config/pruning_config.go | 21 +++++++++++++++++++-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index e1d73f9348..22a586246f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -212,6 +212,10 @@ func (c *Config) Validate() error { return fmt.Errorf("could not create directory %q: %w", fullDir, err) } + if err := c.Node.Pruning.Validate(); err != nil { + return fmt.Errorf("invalid pruning configuration: %w", err) + } + return nil } diff --git a/pkg/config/pruning_config.go b/pkg/config/pruning_config.go index b8330874fe..39dbb16d39 100644 --- a/pkg/config/pruning_config.go +++ b/pkg/config/pruning_config.go @@ -1,5 +1,9 @@ package config +import ( + "errors" +) + const ( // Pruning configuration flags @@ -45,14 +49,27 @@ var ( type PruningConfig struct { // todo: support volume-based strategy Strategy string `mapstructure:"strategy" yaml:"strategy" comment:"Strategy determines the pruning approach (none, default, everything, custom)"` - KeepRecent uint64 `mapstructure:"keep_recent" yaml:"keep_recent" comment:"Number of recent blocks to keep, used in \"custom\" strategy"` - Interval uint64 `mapstructure:"interval" yaml:"interval" comment:"how often the pruning process should run, used in \"custom\" strategy"` + KeepRecent uint64 `mapstructure:"keep_recent" yaml:"keep_recent" comment:"Number of recent blocks to keep, used in \"custom\" strategy, must be greater or equal than 2"` + Interval uint64 `mapstructure:"interval" yaml:"interval" comment:"How often the pruning process should run, used in \"custom\" strategy"` // todo: support volume-based strategy // VolumeConfig specifies configuration for volume-based storage // VolumeConfig *VolumeStorageConfig `mapstructure:"volume_config" yaml:"volume_config"` } +func (p PruningConfig) Validate() error { + // Only Custom strategy requires validation. + if p.Strategy != PruningConfigStrategyCustom { + return nil + } + + if p.KeepRecent < 2 { + return errors.New("keep_recent must be greater or equal than 2 for custom pruning strategy") + } + + return nil +} + func GetPruningConfigFromStrategy(strategy string) PruningConfig { switch strategy { case PruningConfigStrategyDefault: From 34d79c280de86c13013e34e9e908a4f4741ce2f7 Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Wed, 28 May 2025 10:53:29 +0800 Subject: [PATCH 17/25] fix: custom strategy --- pkg/cmd/run_node.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index ada6e33b82..912152eb80 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -33,8 +33,10 @@ func ParseConfig(cmd *cobra.Command) (rollconf.Config, error) { return rollconf.Config{}, fmt.Errorf("failed to load node config: %w", err) } - pruningConfig := rollconf.GetPruningConfigFromStrategy(nodeConfig.Node.Pruning.Strategy) - nodeConfig.Node.Pruning = pruningConfig + if nodeConfig.Node.Pruning.Strategy != rollconf.PruningConfigStrategyCustom { + pruningConfig := rollconf.GetPruningConfigFromStrategy(nodeConfig.Node.Pruning.Strategy) + nodeConfig.Node.Pruning = pruningConfig + } if err := nodeConfig.Validate(); err != nil { return rollconf.Config{}, fmt.Errorf("failed to validate node config: %w", err) From 99dd5fe87d5f26b72ec6708d9df692d7b265b487 Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Wed, 4 Jun 2025 10:52:10 +0800 Subject: [PATCH 18/25] clean codes --- node/full.go | 2 +- node/light.go | 2 +- pkg/store/pruning.go | 93 ++++++++++++++------------------------------ pkg/store/types.go | 2 +- 4 files changed, 32 insertions(+), 67 deletions(-) diff --git a/node/full.go b/node/full.go index 3fb21bdc73..acfbcccaf9 100644 --- a/node/full.go +++ b/node/full.go @@ -94,7 +94,7 @@ func newFullNode( return nil, err } - rktStore := store.NewAsyncPruningStore(mainKV, nodeConfig.Node.Pruning) + rktStore := store.NewDefaultPruningStore(mainKV, nodeConfig.Node.Pruning) blockManager, err := initBlockManager( ctx, diff --git a/node/light.go b/node/light.go index 6a4099ecc2..9513c966d8 100644 --- a/node/light.go +++ b/node/light.go @@ -45,7 +45,7 @@ func newLightNode( return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err) } - store := store.NewAsyncPruningStore(database, conf.Node.Pruning) + store := store.NewDefaultPruningStore(database, conf.Node.Pruning) node := &LightNode{ P2P: p2pClient, diff --git a/pkg/store/pruning.go b/pkg/store/pruning.go index c603144c58..aa06e889d5 100644 --- a/pkg/store/pruning.go +++ b/pkg/store/pruning.go @@ -2,7 +2,6 @@ package store import ( "context" - "time" ds "github.com/ipfs/go-datastore" @@ -10,40 +9,53 @@ import ( "github.com/rollkit/rollkit/types" ) -const DefaultFlushInterval = 1 * time.Second - -type defaultPruningStore struct { +// DefaultPruningStore is for a store that supports pruning of block data. +type DefaultPruningStore struct { Store - Config config.PruningConfig + config config.PruningConfig + latestBlockDataHeight uint64 } -var _ PruningStore = &defaultPruningStore{} +var _ PruningStore = &DefaultPruningStore{} + +func NewDefaultPruningStore(ds ds.Batching, config config.PruningConfig) PruningStore { + return &DefaultPruningStore{ + Store: &DefaultStore{db: ds}, -// newDefaultPruningStore returns default pruning store. -func newDefaultPruningStore(ds ds.Batching, config config.PruningConfig) PruningStore { - return &defaultPruningStore{ - Store: &DefaultStore{db: ds}, - Config: config, + config: config, } } -func (s *defaultPruningStore) PruneBlockData(ctx context.Context, height uint64) error { +// SaveBlockData saves the block data and updates the latest block data height. +func (s *DefaultPruningStore) SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error { + err := s.SaveBlockData(ctx, header, data, signature) + if err == nil { + s.latestBlockDataHeight = header.Height() + } + + return err +} + +func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error { // Skip if strategy is none. - if s.Config.Strategy == config.PruningConfigStrategyNone { + if s.config.Strategy == config.PruningConfigStrategyNone { return nil } + // Read latest height after calling SaveBlockData. There is a delay between SetHeight and SaveBlockData. + height := s.latestBlockDataHeight + // Skip if not the correct interval or latest height is less or equal than number of blocks need to keep. - if height%s.Config.Interval != 0 || height < s.Config.KeepRecent { + if height%s.config.Interval != 0 || height < s.config.KeepRecent { return nil } // Must keep at least 2 blocks(while strategy is everything). - endHeight := height + 1 - s.Config.KeepRecent + endHeight := height + 1 - s.config.KeepRecent startHeight := uint64(0) - if endHeight > s.Config.Interval { - startHeight = endHeight - s.Config.Interval + if endHeight > s.config.Interval { + startHeight = endHeight - s.config.Interval } for i := startHeight; i < endHeight; i++ { @@ -53,50 +65,3 @@ func (s *defaultPruningStore) PruneBlockData(ctx context.Context, height uint64) return nil } - -type AsyncPruningStore struct { - PruningStore - - latestBlockDataHeight uint64 - flushInterval time.Duration -} - -var _ PruningStore = &AsyncPruningStore{} - -func NewAsyncPruningStore(ds ds.Batching, config config.PruningConfig) *AsyncPruningStore { - pruningStore := newDefaultPruningStore(ds, config) - - // todo: initialize latestBlockDataHeight from the store - return &AsyncPruningStore{ - PruningStore: pruningStore, - - flushInterval: DefaultFlushInterval, - } -} - -// SaveBlockData saves the block data and updates the latest block data height. -func (s *AsyncPruningStore) SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error { - err := s.PruningStore.SaveBlockData(ctx, header, data, signature) - if err == nil { - s.latestBlockDataHeight = header.Height() - } - - return err -} - -// Start begins the pruning process at the specified interval. -func (s *AsyncPruningStore) Start(ctx context.Context) { - // todo: use ctx for cancellation - ticker := time.NewTicker(s.flushInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - // Currently PruneBlockData only returns nil. - _ = s.PruneBlockData(ctx, s.latestBlockDataHeight) - } - } -} diff --git a/pkg/store/types.go b/pkg/store/types.go index 54fa6e0b4c..c006988639 100644 --- a/pkg/store/types.go +++ b/pkg/store/types.go @@ -50,5 +50,5 @@ type Store interface { type PruningStore interface { Store - PruneBlockData(ctx context.Context, height uint64) error + PruneBlockData(ctx context.Context) error } From cda2d6719ace8f3617731190fe075890a402fc82 Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Wed, 4 Jun 2025 10:56:49 +0800 Subject: [PATCH 19/25] feat: add AsyncPruner --- block/async_store.go | 45 ++++++++++++++++++++++++++++++++++++++++++++ node/full.go | 7 +++++++ 2 files changed, 52 insertions(+) create mode 100644 block/async_store.go diff --git a/block/async_store.go b/block/async_store.go new file mode 100644 index 0000000000..e590c6bf2a --- /dev/null +++ b/block/async_store.go @@ -0,0 +1,45 @@ +package block + +import ( + "context" + "time" + + "github.com/rollkit/rollkit/pkg/store" +) + +const DefaultFlushInterval = 1 * time.Second + +// AsyncPruner is a service that periodically prunes block data in the background. +type AsyncPruner struct { + ps store.PruningStore + + flushInterval time.Duration +} + +func NewAsyncPruner(pruningStore store.PruningStore, flushInterval time.Duration) *AsyncPruner { + if flushInterval <= 0 { + flushInterval = DefaultFlushInterval + } + + return &AsyncPruner{ + ps: pruningStore, + + flushInterval: flushInterval, + } +} + +// Start starts the async pruner that periodically prunes block data. +func (s *AsyncPruner) Start(ctx context.Context) { + ticker := time.NewTicker(s.flushInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + // Currently PruneBlockData only returns nil. + _ = s.ps.PruneBlockData(ctx) + } + } +} diff --git a/node/full.go b/node/full.go index acfbcccaf9..3dd1a5d993 100644 --- a/node/full.go +++ b/node/full.go @@ -61,6 +61,7 @@ type FullNode struct { Store store.Store blockManager *block.Manager reaper *block.Reaper + asyncPruner *block.AsyncPruner prometheusSrv *http.Server pprofSrv *http.Server @@ -129,12 +130,17 @@ func newFullNode( // Connect the reaper to the manager for transaction notifications reaper.SetManager(blockManager) + asyncPruner := block.NewAsyncPruner( + rktStore, block.DefaultFlushInterval, + ) + node := &FullNode{ genesis: genesis, nodeConfig: nodeConfig, p2pClient: p2pClient, blockManager: blockManager, reaper: reaper, + asyncPruner: asyncPruner, da: da, Store: rktStore, hSyncService: headerSyncService, @@ -390,6 +396,7 @@ func (n *FullNode) Run(parentCtx context.Context) error { spawnWorker(func() { n.blockManager.SyncLoop(ctx, errCh) }) spawnWorker(func() { n.blockManager.DAIncluderLoop(ctx, errCh) }) } + spawnWorker(func() { n.asyncPruner.Start(ctx) }) select { case err := <-errCh: From 8839aad86695cdfed62bef2c23a9f88022496178 Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Sat, 7 Jun 2025 18:50:24 +0000 Subject: [PATCH 20/25] chore: clean codes --- block/async_store.go | 6 +----- pkg/config/pruning_config.go | 7 ++----- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/block/async_store.go b/block/async_store.go index e590c6bf2a..433b8033b6 100644 --- a/block/async_store.go +++ b/block/async_store.go @@ -7,7 +7,7 @@ import ( "github.com/rollkit/rollkit/pkg/store" ) -const DefaultFlushInterval = 1 * time.Second +const DefaultFlushInterval = 10 * time.Second // AsyncPruner is a service that periodically prunes block data in the background. type AsyncPruner struct { @@ -17,10 +17,6 @@ type AsyncPruner struct { } func NewAsyncPruner(pruningStore store.PruningStore, flushInterval time.Duration) *AsyncPruner { - if flushInterval <= 0 { - flushInterval = DefaultFlushInterval - } - return &AsyncPruner{ ps: pruningStore, diff --git a/pkg/config/pruning_config.go b/pkg/config/pruning_config.go index 39dbb16d39..c8a67f3bad 100644 --- a/pkg/config/pruning_config.go +++ b/pkg/config/pruning_config.go @@ -76,10 +76,7 @@ func GetPruningConfigFromStrategy(strategy string) PruningConfig { return PruningConfigDefault case PruningConfigStrategyEverything: return PruningConfigEverything - case PruningConfigStrategyCustom: - return PruningConfigCustom + default: + return PruningConfigNone } - - // Return strategy "none" if unknown. - return PruningConfigNone } From 9585715df3ddb5686b69ee953a06fc16a10ecab1 Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Sat, 7 Jun 2025 19:10:23 +0000 Subject: [PATCH 21/25] add log for async pruner --- block/async_store.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/block/async_store.go b/block/async_store.go index 433b8033b6..3aa2427997 100644 --- a/block/async_store.go +++ b/block/async_store.go @@ -4,6 +4,7 @@ import ( "context" "time" + "cosmossdk.io/log" "github.com/rollkit/rollkit/pkg/store" ) @@ -14,13 +15,15 @@ type AsyncPruner struct { ps store.PruningStore flushInterval time.Duration + logger log.Logger } -func NewAsyncPruner(pruningStore store.PruningStore, flushInterval time.Duration) *AsyncPruner { +func NewAsyncPruner(pruningStore store.PruningStore, flushInterval time.Duration, logger log.Logger) *AsyncPruner { return &AsyncPruner{ ps: pruningStore, flushInterval: flushInterval, + logger: logger, } } @@ -29,13 +32,15 @@ func (s *AsyncPruner) Start(ctx context.Context) { ticker := time.NewTicker(s.flushInterval) defer ticker.Stop() + s.logger.Info("AsyncPruner started", "interval", s.flushInterval) + for { select { case <-ctx.Done(): return case <-ticker.C: - // Currently PruneBlockData only returns nil. - _ = s.ps.PruneBlockData(ctx) + err := s.ps.PruneBlockData(ctx) + s.logger.Error("Failed to prune block data", "error", err) } } } From fe40d10e748f353a2239fad11c3776ca5f39a31a Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Sat, 7 Jun 2025 19:14:29 +0000 Subject: [PATCH 22/25] add mutex for latestBlockDataHeight --- pkg/store/pruning.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/store/pruning.go b/pkg/store/pruning.go index aa06e889d5..5c3f4adde6 100644 --- a/pkg/store/pruning.go +++ b/pkg/store/pruning.go @@ -2,6 +2,7 @@ package store import ( "context" + "sync" ds "github.com/ipfs/go-datastore" @@ -15,6 +16,7 @@ type DefaultPruningStore struct { config config.PruningConfig latestBlockDataHeight uint64 + mu sync.Mutex } var _ PruningStore = &DefaultPruningStore{} @@ -29,9 +31,11 @@ func NewDefaultPruningStore(ds ds.Batching, config config.PruningConfig) Pruning // SaveBlockData saves the block data and updates the latest block data height. func (s *DefaultPruningStore) SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error { - err := s.SaveBlockData(ctx, header, data, signature) + err := s.Store.SaveBlockData(ctx, header, data, signature) if err == nil { + s.mu.Lock() s.latestBlockDataHeight = header.Height() + s.mu.Unlock() } return err @@ -44,7 +48,9 @@ func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error { } // Read latest height after calling SaveBlockData. There is a delay between SetHeight and SaveBlockData. + s.mu.Lock() height := s.latestBlockDataHeight + s.mu.Unlock() // Skip if not the correct interval or latest height is less or equal than number of blocks need to keep. if height%s.config.Interval != 0 || height < s.config.KeepRecent { @@ -52,7 +58,7 @@ func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error { } // Must keep at least 2 blocks(while strategy is everything). - endHeight := height + 1 - s.config.KeepRecent + endHeight := height - 1 - s.config.KeepRecent startHeight := uint64(0) if endHeight > s.config.Interval { startHeight = endHeight - s.config.Interval From b1b2e72ee13a1ec8bb096212778671bc3498750e Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Sat, 7 Jun 2025 19:31:34 +0000 Subject: [PATCH 23/25] chore: logger --- node/full.go | 1 + 1 file changed, 1 insertion(+) diff --git a/node/full.go b/node/full.go index 3dd1a5d993..a6a1ae6936 100644 --- a/node/full.go +++ b/node/full.go @@ -132,6 +132,7 @@ func newFullNode( asyncPruner := block.NewAsyncPruner( rktStore, block.DefaultFlushInterval, + logger.With("module", "AsyncPruner"), ) node := &FullNode{ From 561349e0b4aafe2afedb970eb939ce4bab87f9ed Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Sat, 7 Jun 2025 19:37:12 +0000 Subject: [PATCH 24/25] add log --- block/async_store.go | 1 + 1 file changed, 1 insertion(+) diff --git a/block/async_store.go b/block/async_store.go index 3aa2427997..1482a9cf6f 100644 --- a/block/async_store.go +++ b/block/async_store.go @@ -37,6 +37,7 @@ func (s *AsyncPruner) Start(ctx context.Context) { for { select { case <-ctx.Done(): + s.logger.Info("AsyncPruner stopped") return case <-ticker.C: err := s.ps.PruneBlockData(ctx) From 8eb6588334ec3acfdb456da6d88b7f7dde46b6e7 Mon Sep 17 00:00:00 2001 From: Eoous <38656355+Eoous@users.noreply.github.com> Date: Sat, 7 Jun 2025 19:54:07 +0000 Subject: [PATCH 25/25] fix: checking err --- block/async_store.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/block/async_store.go b/block/async_store.go index 1482a9cf6f..d84c34171d 100644 --- a/block/async_store.go +++ b/block/async_store.go @@ -41,7 +41,9 @@ func (s *AsyncPruner) Start(ctx context.Context) { return case <-ticker.C: err := s.ps.PruneBlockData(ctx) - s.logger.Error("Failed to prune block data", "error", err) + if err != nil { + s.logger.Error("Failed to prune block data", "error", err) + } } } }