Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
291aea5
basic config for pruning
Eoous Apr 22, 2025
e5e8080
prune block data
Eoous Apr 22, 2025
6b0cc41
Merge branch 'rollkit:main' into main
Eoous Apr 22, 2025
7ca9b67
fix: use corresponding config for strategy
Eoous Apr 22, 2025
82c76be
fix: incorrect endHeight
Eoous Apr 22, 2025
cd92e3e
fix: incorrect range for pruning
Eoous Apr 23, 2025
9420944
chore: delete methods without any changes
Eoous Apr 23, 2025
21c0b13
chore: add comment
Eoous Apr 23, 2025
45acc3a
Merge branch 'main' into main
Eoous Apr 23, 2025
685dbe7
feat: use `PruningStore` instead of `Store`
Eoous Apr 28, 2025
d5b16c2
feat: pruning block data when everytime saving
Eoous Apr 28, 2025
350b21d
chore: format comment
Eoous Apr 28, 2025
7a04418
fix: no need to delete block at endHeight
Eoous Apr 28, 2025
a4d2fc1
chore: typo
Eoous May 2, 2025
29656c9
Merge branch 'main' into main
Eoous May 4, 2025
2422d00
Merge branch 'main' into main
Eoous May 7, 2025
8ed80b4
Merge branch 'main' into main
Eoous May 13, 2025
b16db8c
Merge branch 'main' into main
Eoous May 13, 2025
8f70c51
Merge branch 'main' into main
Eoous May 14, 2025
32be97e
Merge branch 'main' into main
Eoous May 21, 2025
1b3ffcf
format
Eoous May 22, 2025
da2dfa1
Merge branch 'main' into main
Eoous May 27, 2025
220f802
Merge branch 'main' of https://github.com/Eoous/rollkit
Eoous May 27, 2025
ab0c7df
clean codes and comments
Eoous May 28, 2025
7fcf6a8
async pruning store
Eoous May 28, 2025
4639bc9
add validation and comments for config
Eoous May 28, 2025
34d79c2
fix: custom strategy
Eoous May 28, 2025
99c5b39
Merge branch 'main' into main
Eoous Jun 4, 2025
99dd5fe
clean codes
Eoous Jun 4, 2025
cda2d67
feat: add AsyncPruner
Eoous Jun 4, 2025
8839aad
chore: clean codes
Eoous Jun 7, 2025
9585715
add log for async pruner
Eoous Jun 7, 2025
fe40d10
add mutex for latestBlockDataHeight
Eoous Jun 7, 2025
b1b2e72
chore: logger
Eoous Jun 7, 2025
561349e
add log
Eoous Jun 7, 2025
cca398e
Merge branch 'main' into main
Eoous Jun 7, 2025
8eb6588
fix: checking err
Eoous Jun 7, 2025
0c295a1
Merge branch 'main' into main
Eoous Jun 8, 2025
a6541d0
Merge branch 'main' into main
Eoous Jun 19, 2025
086904a
Merge branch 'rollkit:main' into main
Eoous Jul 17, 2025
680c1ba
Merge branch 'main' into main
Eoous Jul 18, 2025
2a61d47
Merge branch 'evstack:main' into main
Eoous Jul 26, 2025
9429c9f
Merge branch 'main' into main
Eoous Aug 22, 2025
6cc1f55
Merge branch 'evstack:main' into main
Eoous Sep 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions block/async_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package block

import (
"context"
"time"

"cosmossdk.io/log"
"github.com/rollkit/rollkit/pkg/store"
)

const DefaultFlushInterval = 10 * time.Second

// AsyncPruner is a service that periodically prunes block data in the background.
type AsyncPruner struct {
ps store.PruningStore

flushInterval time.Duration
logger log.Logger
}

func NewAsyncPruner(pruningStore store.PruningStore, flushInterval time.Duration, logger log.Logger) *AsyncPruner {
return &AsyncPruner{
ps: pruningStore,

flushInterval: flushInterval,
logger: logger,
}
}

// Start starts the async pruner that periodically prunes block data.
func (s *AsyncPruner) Start(ctx context.Context) {
ticker := time.NewTicker(s.flushInterval)
defer ticker.Stop()

s.logger.Info("AsyncPruner started", "interval", s.flushInterval)

for {
select {
case <-ctx.Done():
s.logger.Info("AsyncPruner stopped")
return
case <-ticker.C:
err := s.ps.PruneBlockData(ctx)
if err != nil {
s.logger.Error("Failed to prune block data", "error", err)
}
}
}
}
10 changes: 9 additions & 1 deletion node/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type FullNode struct {
Store store.Store
blockManager *block.Manager
reaper *block.Reaper
asyncPruner *block.AsyncPruner

prometheusSrv *http.Server
pprofSrv *http.Server
Expand Down Expand Up @@ -95,7 +96,7 @@ func newFullNode(
return nil, err
}

rktStore := store.New(mainKV)
rktStore := store.NewDefaultPruningStore(mainKV, nodeConfig.Node.Pruning)

blockManager, err := initBlockManager(
ctx,
Expand Down Expand Up @@ -129,12 +130,18 @@ func newFullNode(
// Connect the reaper to the manager for transaction notifications
reaper.SetManager(blockManager)

asyncPruner := block.NewAsyncPruner(
rktStore, block.DefaultFlushInterval,
logger.With("module", "AsyncPruner"),
)

node := &FullNode{
genesis: genesis,
nodeConfig: nodeConfig,
p2pClient: p2pClient,
blockManager: blockManager,
reaper: reaper,
asyncPruner: asyncPruner,
da: da,
Store: rktStore,
hSyncService: headerSyncService,
Expand Down Expand Up @@ -388,6 +395,7 @@ func (n *FullNode) Run(parentCtx context.Context) error {
spawnWorker(func() { n.blockManager.SyncLoop(ctx, errCh) })
spawnWorker(func() { n.blockManager.DAIncluderLoop(ctx, errCh) })
}
spawnWorker(func() { n.asyncPruner.Start(ctx) })

select {
case err := <-errCh:
Expand Down
2 changes: 1 addition & 1 deletion node/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func newLightNode(
return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err)
}

store := store.New(database)
store := store.NewDefaultPruningStore(database, conf.Node.Pruning)

node := &LightNode{
P2P: p2pClient,
Expand Down
5 changes: 5 additions & 0 deletions pkg/cmd/run_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func ParseConfig(cmd *cobra.Command) (rollconf.Config, error) {
return rollconf.Config{}, fmt.Errorf("failed to load node config: %w", err)
}

if nodeConfig.Node.Pruning.Strategy != rollconf.PruningConfigStrategyCustom {
pruningConfig := rollconf.GetPruningConfigFromStrategy(nodeConfig.Node.Pruning.Strategy)
nodeConfig.Node.Pruning = pruningConfig
}

if err := nodeConfig.Validate(); err != nil {
return rollconf.Config{}, fmt.Errorf("failed to validate node config: %w", err)
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ type NodeConfig struct {

// Header configuration
TrustedHash string `mapstructure:"trusted_hash" yaml:"trusted_hash" comment:"Initial trusted hash used to bootstrap the header exchange service. Allows nodes to start synchronizing from a specific trusted point in the chain instead of genesis. When provided, the node will fetch the corresponding header/block from peers using this hash and use it as a starting point for synchronization. If not provided, the node will attempt to fetch the genesis block instead."`

// Pruning management configuration
Pruning PruningConfig `mapstructure:"pruning" yaml:"pruning"`
}

// LogConfig contains all logging configuration parameters
Expand Down Expand Up @@ -243,6 +246,10 @@ func (c *Config) Validate() error {
return fmt.Errorf("could not create directory %q: %w", fullDir, err)
}

if err := c.Node.Pruning.Validate(); err != nil {
return fmt.Errorf("invalid pruning configuration: %w", err)
}

return nil
}

Expand Down Expand Up @@ -286,6 +293,11 @@ func AddFlags(cmd *cobra.Command) {
cmd.Flags().Uint64(FlagMaxPendingHeadersAndData, def.Node.MaxPendingHeadersAndData, "maximum headers or data pending DA confirmation before pausing block production (0 for no limit)")
cmd.Flags().Duration(FlagLazyBlockTime, def.Node.LazyBlockInterval.Duration, "maximum interval between blocks in lazy aggregation mode")

// Pruning configuration flags
cmd.Flags().String(FlagPruningStrategy, def.Node.Pruning.Strategy, "pruning strategy (none, default, everything, custom)")
cmd.Flags().Uint64(FlagPruningKeepRecent, def.Node.Pruning.KeepRecent, "number of recent blocks to keep")
cmd.Flags().Uint64(FlagPruningInterval, def.Node.Pruning.Interval, "frequency of pruning operations")

// Data Availability configuration flags
cmd.Flags().String(FlagDAAddress, def.DA.Address, "DA address (host:port)")
cmd.Flags().String(FlagDAAuthToken, def.DA.AuthToken, "DA auth token")
Expand Down
1 change: 1 addition & 0 deletions pkg/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var DefaultConfig = Config{
LazyBlockInterval: DurationWrapper{60 * time.Second},
Light: false,
TrustedHash: "",
Pruning: PruningConfigDefault,
},
DA: DAConfig{
Address: "http://localhost:7980",
Expand Down
82 changes: 82 additions & 0 deletions pkg/config/pruning_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package config

import (
"errors"
)

const (
// Pruning configuration flags

// FlagPruningStrategy is a flag for specifying strategy for pruning block store
FlagPruningStrategy = "rollkit.node.pruning.strategy"
// FlagPruningKeepRecent is a flag for specifying how many blocks need to keep in store
FlagPruningKeepRecent = "rollkit.node.pruning.keep_recent"
// FlagPruningInterval is a flag for specifying how often prune blocks store
FlagPruningInterval = "rollkit.node.pruning.interval"
)

const (
PruningConfigStrategyNone = "none"
PruningConfigStrategyDefault = "default"
PruningConfigStrategyEverything = "everything"
PruningConfigStrategyCustom = "custom"
)

var (
PruningConfigNone = PruningConfig{
Strategy: PruningConfigStrategyNone,
KeepRecent: 0,
Interval: 0,
}
PruningConfigDefault = PruningConfig{
Strategy: PruningConfigStrategyDefault,
KeepRecent: 362880,
Interval: 10,
}
PruningConfigEverything = PruningConfig{
Strategy: PruningConfigStrategyEverything,
KeepRecent: 2,
Interval: 10,
}
PruningConfigCustom = PruningConfig{
Strategy: PruningConfigStrategyCustom,
KeepRecent: 100,
Interval: 100,
}
)

// PruningConfig allows node operators to manage storage
type PruningConfig struct {
// todo: support volume-based strategy
Strategy string `mapstructure:"strategy" yaml:"strategy" comment:"Strategy determines the pruning approach (none, default, everything, custom)"`
KeepRecent uint64 `mapstructure:"keep_recent" yaml:"keep_recent" comment:"Number of recent blocks to keep, used in \"custom\" strategy, must be greater or equal than 2"`
Interval uint64 `mapstructure:"interval" yaml:"interval" comment:"How often the pruning process should run, used in \"custom\" strategy"`

// todo: support volume-based strategy
// VolumeConfig specifies configuration for volume-based storage
// VolumeConfig *VolumeStorageConfig `mapstructure:"volume_config" yaml:"volume_config"`
}

func (p PruningConfig) Validate() error {
// Only Custom strategy requires validation.
if p.Strategy != PruningConfigStrategyCustom {
return nil
}

if p.KeepRecent < 2 {
return errors.New("keep_recent must be greater or equal than 2 for custom pruning strategy")
}

return nil
}

func GetPruningConfigFromStrategy(strategy string) PruningConfig {
switch strategy {
case PruningConfigStrategyDefault:
return PruningConfigDefault
case PruningConfigStrategyEverything:
return PruningConfigEverything
default:
return PruningConfigNone
}
}
73 changes: 73 additions & 0 deletions pkg/store/pruning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package store

import (
"context"
"sync"

ds "github.com/ipfs/go-datastore"

"github.com/rollkit/rollkit/pkg/config"
"github.com/rollkit/rollkit/types"
)

// DefaultPruningStore is for a store that supports pruning of block data.
type DefaultPruningStore struct {
Store

config config.PruningConfig
latestBlockDataHeight uint64
mu sync.Mutex
}

var _ PruningStore = &DefaultPruningStore{}

func NewDefaultPruningStore(ds ds.Batching, config config.PruningConfig) PruningStore {
return &DefaultPruningStore{
Store: &DefaultStore{db: ds},

config: config,
}
}

// SaveBlockData saves the block data and updates the latest block data height.
func (s *DefaultPruningStore) SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error {
err := s.Store.SaveBlockData(ctx, header, data, signature)
if err == nil {
s.mu.Lock()
s.latestBlockDataHeight = header.Height()
s.mu.Unlock()
}

return err
}

func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error {
// Skip if strategy is none.
if s.config.Strategy == config.PruningConfigStrategyNone {
return nil
}

// Read latest height after calling SaveBlockData. There is a delay between SetHeight and SaveBlockData.
s.mu.Lock()
height := s.latestBlockDataHeight
s.mu.Unlock()

// Skip if not the correct interval or latest height is less or equal than number of blocks need to keep.
if height%s.config.Interval != 0 || height < s.config.KeepRecent {
return nil
}

// Must keep at least 2 blocks(while strategy is everything).
endHeight := height - 1 - s.config.KeepRecent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The calculation for endHeight seems to be off by one, which will result in not pruning a block that should be pruned.

If we want to keep KeepRecent blocks, the highest block to prune is height - KeepRecent. The loop for i := startHeight; i < endHeight; i++ prunes up to endHeight - 1.

With the current logic, endHeight is height - 1 - s.config.KeepRecent. This means the loop prunes up to height - s.config.KeepRecent - 2, leaving one block unpruned.

To fix this, endHeight should be height - s.config.KeepRecent. This makes the loop prune up to height - s.config.KeepRecent - 1, which is what we want.

Suggested change
endHeight := height - 1 - s.config.KeepRecent
endHeight := height - s.config.KeepRecent

startHeight := uint64(0)
if endHeight > s.config.Interval {
startHeight = endHeight - s.config.Interval
}

for i := startHeight; i < endHeight; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to protect against concurrent executions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is a public method, it can be called by other threads as well. It would not cost much to exit early when pruning is not completed.

Copy link
Contributor Author

@Eoous Eoous Jun 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other threads shouldn't call this method. It's only added for pruning thread

Maybe currently is no need to add some protections?

// Could ignore for errors like not found.
_ = s.DeleteBlockData(ctx, i)
}
Comment on lines +67 to +70
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Errors from s.DeleteBlockData are being ignored. The comment suggests that ErrNotFound could be ignored, but other errors (like issues with the underlying datastore) should be handled.

Ignoring these errors can hide serious problems and leave the pruning process in an incomplete state without any notification. The error should be propagated up to the AsyncPruner, which will log it.

With the recommended changes to DeleteBlockData in pkg/store/store.go, it will correctly handle ErrNotFound by returning nil, so any error returned from it will be significant.

if err := s.DeleteBlockData(ctx, i); err != nil {
			return err
		}


return nil
}
27 changes: 26 additions & 1 deletion pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s *DefaultStore) SaveBlockData(ctx context.Context, header *types.SignedHe

batch, err := s.db.Batch(ctx)
if err != nil {
return fmt.Errorf("failed to create a new batch: %w", err)
return fmt.Errorf("failed to create a new batch for saving block data: %w", err)
}

if err := batch.Put(ctx, ds.NewKey(getHeaderKey(height)), headerBlob); err != nil {
Expand All @@ -104,6 +104,31 @@ func (s *DefaultStore) SaveBlockData(ctx context.Context, header *types.SignedHe
return nil
}

// DeleteBlockData deletes block at given height.
func (s *DefaultStore) DeleteBlockData(ctx context.Context, height uint64) error {
batch, err := s.db.Batch(ctx)
if err != nil {
return fmt.Errorf("failed to create a new batch for deleting block data: %w", err)
}

if err := batch.Delete(ctx, ds.NewKey(getHeaderKey(height))); err != nil {
return fmt.Errorf("failed to delete header blob in batch: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getDataKey(height))); err != nil {
return fmt.Errorf("failed to delete data blob in batch: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getSignatureKey(height))); err != nil {
return fmt.Errorf("failed to delete signature of block blob in batch: %w", err)
}
if err := batch.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we ensure that we delete everything? there might be more prefix stores in the future?

Copy link
Contributor Author

@Eoous Eoous Jun 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Structures like block and header may need to refactor in future

I think currently it's enough to be same with SaveBlockData


return nil
}
Comment on lines +108 to +128
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The current implementation of DeleteBlockData does not delete the hash-to-height index entry that is created in SaveBlockData. This will lead to dangling index entries pointing to non-existent blocks, which can cause issues when trying to retrieve blocks by hash.

To fix this, DeleteBlockData should first retrieve the block header to get its hash, and then delete the corresponding index entry along with the other block data.

I've also added handling for ds.ErrNotFound when fetching the header, as the block might have already been pruned.

func (s *DefaultStore) DeleteBlockData(ctx context.Context, height uint64) error {
	// Get the header to retrieve the block hash for index deletion.
	header, err := s.GetHeader(ctx, height)
	if err != nil {
		// If block not found, it might have been already pruned.
		if errors.Is(err, ds.ErrNotFound) {
			return nil
		}
		return fmt.Errorf("failed to get header for block %d: %w", height, err)
	}

	hash := header.Hash()

	batch, err := s.db.Batch(ctx)
	if err != nil {
		return fmt.Errorf("failed to create a new batch for deleting block data: %w", err)
	}

	if err := batch.Delete(ctx, ds.NewKey(getHeaderKey(height))); err != nil {
		return fmt.Errorf("failed to delete header blob in batch: %w", err)
	}
	if err := batch.Delete(ctx, ds.NewKey(getDataKey(height))); err != nil {
		return fmt.Errorf("failed to delete data blob in batch: %w", err)
	}
	if err := batch.Delete(ctx, ds.NewKey(getSignatureKey(height))); err != nil {
		return fmt.Errorf("failed to delete signature of block blob in batch: %w", err)
	}
	if err := batch.Delete(ctx, ds.NewKey(getIndexKey(hash))); err != nil {
		return fmt.Errorf("failed to delete index key in batch: %w", err)
	}
	if err := batch.Commit(ctx); err != nil {
		return fmt.Errorf("failed to commit batch: %w", err)
	}

	return nil
}


// TODO: We unmarshal the header and data here, but then we re-marshal them to proto to hash or send them to DA, we should not unmarshal them here and allow the caller to handle them as needed.

// GetBlockData returns block header and data at given height, or error if it's not found in Store.
func (s *DefaultStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) {
header, err := s.GetHeader(ctx, height)
Expand Down
8 changes: 8 additions & 0 deletions pkg/store/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type Store interface {

// SaveBlockData saves block along with its seen signature (which will be included in the next block).
SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error
// DeleteBlockData deletes block at given height.
DeleteBlockData(ctx context.Context, height uint64) error

// GetBlockData returns block at given height, or error if it's not found in Store.
GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error)
Expand Down Expand Up @@ -52,3 +54,9 @@ type Store interface {
// Close safely closes underlying data storage, to ensure that data is actually saved.
Close() error
}

type PruningStore interface {
Store

PruneBlockData(ctx context.Context) error
}
Loading