Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ type NodeConfig struct {

// Header configuration
TrustedHash string `mapstructure:"trusted_hash" yaml:"trusted_hash" comment:"Initial trusted hash used to bootstrap the header exchange service. Allows nodes to start synchronizing from a specific trusted point in the chain instead of genesis. When provided, the node will fetch the corresponding header/block from peers using this hash and use it as a starting point for synchronization. If not provided, the node will attempt to fetch the genesis block instead."`

// Pruning management configuration
Pruning PruningConfig `mapstructure:"pruning" yaml:"pruning"`
}

// LogConfig contains all logging configuration parameters
Expand Down Expand Up @@ -244,6 +247,11 @@ func AddFlags(cmd *cobra.Command) {
cmd.Flags().Uint64(FlagMaxPendingBlocks, def.Node.MaxPendingBlocks, "maximum blocks pending DA confirmation before pausing block production (0 for no limit)")
cmd.Flags().Duration(FlagLazyBlockTime, def.Node.LazyBlockTime.Duration, "maximum interval between blocks in lazy aggregation mode")

// Pruning configuration flags
cmd.Flags().String(FlagPruningStrategy, def.Node.Pruning.Strategy, "")
cmd.Flags().Uint64(FlagPruningKeepRecent, def.Node.Pruning.KeepRecent, "")
cmd.Flags().Uint64(FlagPruningInterval, def.Node.Pruning.Interval, "")

// Data Availability configuration flags
cmd.Flags().String(FlagDAAddress, def.DA.Address, "DA address (host:port)")
cmd.Flags().String(FlagDAAuthToken, def.DA.AuthToken, "DA auth token")
Expand Down
1 change: 1 addition & 0 deletions pkg/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var DefaultConfig = Config{
LazyBlockTime: DurationWrapper{60 * time.Second},
Light: false,
TrustedHash: "",
Pruning: PruningConfigDefault,
},
DA: DAConfig{
Address: "http://localhost:7980",
Expand Down
54 changes: 54 additions & 0 deletions pkg/config/pruning_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package config

const (
// Pruning configuration flags

// FlagPruningStrategy is a flag for specifying strategy for pruning block store
FlagPruningStrategy = "rollkit.node.pruning.strategy"
// FlagPruningKeepRecent is a flag for specifying how many blocks need to keep in store
FlagPruningKeepRecent = "rollkit.node.pruning.keep_recent"
// FlagPruningInterval is a flag for specifying how offen prune blocks store
FlagPruningInterval = "rollkit.node.pruning.interval"
)

const (
PruningConfigStrategyNone = "none"
PruningConfigStrategyDefault = "default"
PruningConfigStrategyEverything = "everything"
PruningConfigStrategyCustom = "custom"
)

var (
PruningConfigNone = PruningConfig{
Strategy: PruningConfigStrategyNone,
KeepRecent: 0,
Interval: 0,
}
PruningConfigDefault = PruningConfig{
Strategy: PruningConfigStrategyDefault,
KeepRecent: 362880,
Interval: 10,
}
PruningConfigEverything = PruningConfig{
Strategy: PruningConfigStrategyEverything,
KeepRecent: 2,
Interval: 10,
}
PruningConfigCustom = PruningConfig{
Strategy: PruningConfigStrategyCustom,
KeepRecent: 100,
Interval: 100,
}
)

// PruningConfig allows node operators to manage storage
type PruningConfig struct {
// todo: support volume-based strategy
Strategy string `mapstructure:"strategy" yaml:"strategy" comment:"Strategy determines the pruning approach (none, default, custom)"`
KeepRecent uint64 `mapstructure:"keep_recent" yaml:"keep_recent" comment:"Number of recent blocks to keep, used in \"default\" and \"custom\" strategies"`
Interval uint64 `mapstructure:"interval" yaml:"interval" comment:"how offen the pruning process should run"`

// todo: support volume-based strategy
// VolumeConfig specifies configuration for volume-based storage
// VolumeConfig *VolumeStorageConfig `mapstructure:"volume_config" yaml:"volume_config"`
}
132 changes: 132 additions & 0 deletions pkg/store/pruning.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package store

import (
"context"
"fmt"

"github.com/rollkit/rollkit/pkg/config"
"github.com/rollkit/rollkit/types"
)

type DefaultPruningStore struct {
Store

Config config.PruningConfig
}

var _ PruningStore = &DefaultPruningStore{}

// NewDefaultPruningStore returns default pruning store.
func NewDefaultPruningStore(store Store, config config.PruningConfig) PruningStore {
return &DefaultPruningStore{
Store: store,
Config: config,
}
}

// Close safely closes underlying data storage, to ensure that data is actually saved.
func (s *DefaultPruningStore) Close() error {
return s.Store.Close()
}

// SetHeight sets the height saved in the Store if it is higher than the existing height
func (s *DefaultPruningStore) SetHeight(ctx context.Context, height uint64) error {
return s.Store.SetHeight(ctx, height)
}

// Height returns height of the highest block saved in the Store.
func (s *DefaultPruningStore) Height(ctx context.Context) (uint64, error) {
return s.Store.Height(ctx)
}

// SaveBlockData adds block header and data to the store along with corresponding signature.
// Stored height is updated if block height is greater than stored value.
func (s *DefaultPruningStore) SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error {
if err := s.PruneBlockData(ctx); err != nil {
return fmt.Errorf("failed to prune block data: %w", err)
}

return s.Store.SaveBlockData(ctx, header, data, signature)
}

// DeleteBlockData deletes block at given height.
func (s *DefaultPruningStore) DeleteBlockData(ctx context.Context, height uint64) error {
return s.Store.DeleteBlockData(ctx, height)
}

// GetBlockData returns block header and data at given height, or error if it's not found in Store.
func (s *DefaultPruningStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) {
return s.Store.GetBlockData(ctx, height)
}

// GetBlockByHash returns block with given block header hash, or error if it's not found in Store.
func (s *DefaultPruningStore) GetBlockByHash(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) {
return s.Store.GetBlockByHash(ctx, hash)
}

// GetSignatureByHash returns signature for a block at given height, or error if it's not found in Store.
func (s *DefaultPruningStore) GetSignatureByHash(ctx context.Context, hash []byte) (*types.Signature, error) {
return s.Store.GetSignatureByHash(ctx, hash)
}

// GetSignature returns signature for a block with given block header hash, or error if it's not found in Store.
func (s *DefaultPruningStore) GetSignature(ctx context.Context, height uint64) (*types.Signature, error) {
return s.Store.GetSignature(ctx, height)
}

// UpdateState updates state saved in Store. Only one State is stored.
// If there is no State in Store, state will be saved.
func (s *DefaultPruningStore) UpdateState(ctx context.Context, state types.State) error {
return s.Store.UpdateState(ctx, state)
}

// GetState returns last state saved with UpdateState.
func (s *DefaultPruningStore) GetState(ctx context.Context) (types.State, error) {
return s.Store.GetState(ctx)
}

// SetMetadata saves arbitrary value in the store.
//
// Metadata is separated from other data by using prefix in KV.
func (s *DefaultPruningStore) SetMetadata(ctx context.Context, key string, value []byte) error {
return s.Store.SetMetadata(ctx, key, value)
}

// GetMetadata returns values stored for given key with SetMetadata.
func (s *DefaultPruningStore) GetMetadata(ctx context.Context, key string) ([]byte, error) {
return s.Store.GetMetadata(ctx, key)
}

func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error {
var (
err error
)

// Skip if strategy is none.
if s.Config.Strategy == config.PruningConfigStrategyNone {
return nil
}

height, err := s.Height(ctx)
if err != nil {
return err
}

// Skip if it's a correct interval or latest height is less or equal than number of blocks need to keep.
if height%s.Config.Interval != 0 || height <= s.Config.KeepRecent {
return nil
}

// Must keep latest 2 blocks.
endHeight := height - 1 - s.Config.KeepRecent
startHeight := min(0, endHeight-s.Config.KeepRecent)

for i := startHeight; i <= endHeight; i++ {
err = s.DeleteBlockData(ctx, i)
if err != nil {
return err
}
}

return nil
}
27 changes: 26 additions & 1 deletion pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s *DefaultStore) SaveBlockData(ctx context.Context, header *types.SignedHe

batch, err := s.db.Batch(ctx)
if err != nil {
return fmt.Errorf("failed to create a new batch: %w", err)
return fmt.Errorf("failed to create a new batch for saving block data: %w", err)
}

if err := batch.Put(ctx, ds.NewKey(getHeaderKey(height)), headerBlob); err != nil {
Expand All @@ -114,6 +114,31 @@ func (s *DefaultStore) SaveBlockData(ctx context.Context, header *types.SignedHe
return nil
}

// DeleteBlockData deletes block at given height.
func (s *DefaultStore) DeleteBlockData(ctx context.Context, height uint64) error {
batch, err := s.db.Batch(ctx)
if err != nil {
return fmt.Errorf("failed to create a new batch for deleting block data: %w", err)
}

if err := batch.Delete(ctx, ds.NewKey(getHeaderKey(height))); err != nil {
return fmt.Errorf("failed to delete header blob in batch: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getDataKey(height))); err != nil {
return fmt.Errorf("failed to delete data blob in batch: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getSignatureKey(height))); err != nil {
return fmt.Errorf("failed to delete signature of block blob in batch: %w", err)
}
if err := batch.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}

return nil
}

// TODO: We unmarshal the header and data here, but then we re-marshal them to proto to hash or send them to DA, we should not unmarshal them here and allow the caller to handle them as needed.

// GetBlockData returns block header and data at given height, or error if it's not found in Store.
func (s *DefaultStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) {
headerBlob, err := s.db.Get(ctx, ds.NewKey(getHeaderKey(height)))
Expand Down
8 changes: 8 additions & 0 deletions pkg/store/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type Store interface {

// SaveBlock saves block along with its seen signature (which will be included in the next block).
SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error
// DeleteBlockData deletes block at given height.
DeleteBlockData(ctx context.Context, height uint64) error

// GetBlock returns block at given height, or error if it's not found in Store.
GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error)
Expand Down Expand Up @@ -44,3 +46,9 @@ type Store interface {
// Close safely closes underlying data storage, to ensure that data is actually saved.
Close() error
}

type PruningStore interface {
Store

PruneBlockData(ctx context.Context) error
}
Loading