-
Notifications
You must be signed in to change notification settings - Fork 228
feat(store): support pruning store #2208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 18 commits
291aea5
e5e8080
6b0cc41
7ca9b67
82c76be
cd92e3e
9420944
21c0b13
45acc3a
685dbe7
d5b16c2
350b21d
7a04418
a4d2fc1
29656c9
2422d00
8ed80b4
b16db8c
8f70c51
32be97e
1b3ffcf
da2dfa1
220f802
ab0c7df
7fcf6a8
4639bc9
34d79c2
99c5b39
99dd5fe
cda2d67
8839aad
9585715
fe40d10
b1b2e72
561349e
cca398e
8eb6588
0c295a1
a6541d0
086904a
680c1ba
2a61d47
9429c9f
6cc1f55
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
package config | ||
|
||
const ( | ||
// Pruning configuration flags | ||
|
||
// FlagPruningStrategy is a flag for specifying strategy for pruning block store | ||
FlagPruningStrategy = "rollkit.node.pruning.strategy" | ||
// FlagPruningKeepRecent is a flag for specifying how many blocks need to keep in store | ||
FlagPruningKeepRecent = "rollkit.node.pruning.keep_recent" | ||
// FlagPruningInterval is a flag for specifying how often prune blocks store | ||
FlagPruningInterval = "rollkit.node.pruning.interval" | ||
) | ||
|
||
const ( | ||
PruningConfigStrategyNone = "none" | ||
PruningConfigStrategyDefault = "default" | ||
PruningConfigStrategyEverything = "everything" | ||
PruningConfigStrategyCustom = "custom" | ||
) | ||
|
||
var ( | ||
PruningConfigNone = PruningConfig{ | ||
Strategy: PruningConfigStrategyNone, | ||
KeepRecent: 0, | ||
Interval: 0, | ||
} | ||
PruningConfigDefault = PruningConfig{ | ||
Strategy: PruningConfigStrategyDefault, | ||
KeepRecent: 362880, | ||
Interval: 10, | ||
} | ||
PruningConfigEverything = PruningConfig{ | ||
Strategy: PruningConfigStrategyEverything, | ||
KeepRecent: 2, | ||
Interval: 10, | ||
} | ||
PruningConfigCustom = PruningConfig{ | ||
Strategy: PruningConfigStrategyCustom, | ||
KeepRecent: 100, | ||
Interval: 100, | ||
} | ||
) | ||
|
||
// PruningConfig allows node operators to manage storage | ||
type PruningConfig struct { | ||
// todo: support volume-based strategy | ||
Strategy string `mapstructure:"strategy" yaml:"strategy" comment:"Strategy determines the pruning approach (none, default, everything, custom)"` | ||
KeepRecent uint64 `mapstructure:"keep_recent" yaml:"keep_recent" comment:"Number of recent blocks to keep, used in \"custom\" strategy"` | ||
Interval uint64 `mapstructure:"interval" yaml:"interval" comment:"how often the pruning process should run, used in \"custom\" strategy"` | ||
|
||
// todo: support volume-based strategy | ||
// VolumeConfig specifies configuration for volume-based storage | ||
// VolumeConfig *VolumeStorageConfig `mapstructure:"volume_config" yaml:"volume_config"` | ||
} | ||
|
||
func GetPruningConfigFromStrategy(strategy string) PruningConfig { | ||
switch strategy { | ||
case PruningConfigStrategyDefault: | ||
return PruningConfigDefault | ||
case PruningConfigStrategyEverything: | ||
return PruningConfigEverything | ||
case PruningConfigStrategyCustom: | ||
return PruningConfigCustom | ||
} | ||
|
||
// Return strategy "none" if unknown. | ||
return PruningConfigNone | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,75 @@ | ||||||||||||||||||||||||||||||||||||||||||||
package store | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
import ( | ||||||||||||||||||||||||||||||||||||||||||||
"context" | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
ds "github.com/ipfs/go-datastore" | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
"github.com/rollkit/rollkit/pkg/config" | ||||||||||||||||||||||||||||||||||||||||||||
"github.com/rollkit/rollkit/types" | ||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The import block should follow the standard Go convention of grouping imports into three sections separated by blank lines:
For better readability and adherence to Go style conventions, consider restructuring the imports like this: import (
"context"
ds "github.com/ipfs/go-datastore"
"github.com/rollkit/rollkit/pkg/config"
"github.com/rollkit/rollkit/types"
) Spotted by Diamond (based on custom rules) |
||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
type DefaultPruningStore struct { | ||||||||||||||||||||||||||||||||||||||||||||
Store | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
Config config.PruningConfig | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
var _ PruningStore = &DefaultPruningStore{} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
// NewDefaultPruningStore returns default pruning store. | ||||||||||||||||||||||||||||||||||||||||||||
func NewDefaultPruningStore(ds ds.Batching, config config.PruningConfig) PruningStore { | ||||||||||||||||||||||||||||||||||||||||||||
return &DefaultPruningStore{ | ||||||||||||||||||||||||||||||||||||||||||||
Store: &DefaultStore{db: ds}, | ||||||||||||||||||||||||||||||||||||||||||||
Config: config, | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
// SaveBlockData adds block header and data to the store along with corresponding signature. | ||||||||||||||||||||||||||||||||||||||||||||
// It also prunes the block data if needed. | ||||||||||||||||||||||||||||||||||||||||||||
func (s *DefaultPruningStore) SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error { | ||||||||||||||||||||||||||||||||||||||||||||
if err := s.PruneBlockData(ctx); err != nil { | ||||||||||||||||||||||||||||||||||||||||||||
Eoous marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
Eoous marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
return s.Store.SaveBlockData(ctx, header, data, signature) | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current implementation prioritizes pruning over saving new block data, which creates a risk of data loss. Consider reversing this order to ensure block persistence even when pruning encounters issues. Either:
This approach would maintain data integrity by ensuring new blocks are always saved regardless of pruning status. // Option 1: Save first, then prune
func (s *DefaultPruningStore) SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error {
if err := s.Store.SaveBlockData(ctx, header, data, signature); err != nil {
return err
}
// Pruning errors won't prevent block from being saved
if err := s.PruneBlockData(ctx); err != nil {
return err // or log error and continue
}
return nil
}
Suggested change
Spotted by Diamond |
||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error { | ||||||||||||||||||||||||||||||||||||||||||||
var ( | ||||||||||||||||||||||||||||||||||||||||||||
err error | ||||||||||||||||||||||||||||||||||||||||||||
Eoous marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
// Skip if strategy is none. | ||||||||||||||||||||||||||||||||||||||||||||
if s.Config.Strategy == config.PruningConfigStrategyNone { | ||||||||||||||||||||||||||||||||||||||||||||
return nil | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
height, err := s.Height(ctx) | ||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||
return err | ||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||
// Skip if it's a correct interval or latest height is less or equal than number of blocks need to keep. | ||||||||||||||||||||||||||||||||||||||||||||
Eoous marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||
if height%s.Config.Interval != 0 || height < s.Config.KeepRecent { | ||||||||||||||||||||||||||||||||||||||||||||
Eoous marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
if height%s.Config.Interval != 0 || height < s.Config.KeepRecent { | |
if height%s.Config.Interval != 0 || height <= s.Config.KeepRecent { |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This calculation could result in a uint64 underflow if height+1 < s.Config.KeepRecent
. Since uint64 underflow wraps around to a very large number, this would cause incorrect pruning behavior. Consider adding a safety check before this calculation:
var endHeight uint64
if height+1 <= s.Config.KeepRecent {
// Nothing to prune yet
return nil
}
endHeight = height + 1 - s.Config.KeepRecent
This ensures the pruning logic only executes when there are actually blocks that can be safely pruned.
endHeight := height + 1 - s.Config.KeepRecent | |
var endHeight uint64 | |
if height+1 <= s.Config.KeepRecent { | |
// Nothing to prune yet | |
return nil | |
} | |
endHeight = height + 1 - s.Config.KeepRecent | |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
Eoous marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
Eoous marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pruning range calculation logic needs adjustment. Currently, it may not prune the intended number of blocks:
endHeight := height + 1 - s.Config.KeepRecent
startHeight := uint64(0)
if endHeight > s.Config.Interval {
startHeight = endHeight - s.Config.Interval
}
For correct behavior, consider:
startHeight := uint64(0)
if height > s.Config.KeepRecent {
endHeight := height + 1 - s.Config.KeepRecent
startHeight = endHeight - s.Config.Interval
if startHeight < 0 {
startHeight = 0
}
}
This ensures exactly s.Config.Interval
blocks are pruned in each operation while maintaining the s.Config.KeepRecent
most recent blocks. The current implementation might prune blocks that should be kept when endHeight
is close to s.Config.Interval
.
endHeight := height + 1 - s.Config.KeepRecent | |
startHeight := uint64(0) | |
if endHeight > s.Config.Interval { | |
startHeight = endHeight - s.Config.Interval | |
} | |
startHeight := uint64(0) | |
if height > s.Config.KeepRecent { | |
endHeight := height + 1 - s.Config.KeepRecent | |
startHeight = endHeight - s.Config.Interval | |
if startHeight < 0 { | |
startHeight = 0 | |
} | |
} | |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to protect against concurrent executions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this is a public method, it can be called by other threads as well. It would not cost much to exit early when pruning is not completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other threads shouldn't call this method. It's only added for pruning thread
Maybe currently is no need to add some protections?
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pruning should be async, if we have it here it could end up blocking the main thread for an unknown amount of time.
Eoous marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
Eoous marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -92,7 +92,7 @@ func (s *DefaultStore) SaveBlockData(ctx context.Context, header *types.SignedHe | |
|
||
batch, err := s.db.Batch(ctx) | ||
if err != nil { | ||
return fmt.Errorf("failed to create a new batch: %w", err) | ||
return fmt.Errorf("failed to create a new batch for saving block data: %w", err) | ||
} | ||
|
||
if err := batch.Put(ctx, ds.NewKey(getHeaderKey(height)), headerBlob); err != nil { | ||
|
@@ -114,6 +114,31 @@ func (s *DefaultStore) SaveBlockData(ctx context.Context, header *types.SignedHe | |
return nil | ||
} | ||
|
||
// DeleteBlockData deletes block at given height. | ||
func (s *DefaultStore) DeleteBlockData(ctx context.Context, height uint64) error { | ||
batch, err := s.db.Batch(ctx) | ||
if err != nil { | ||
return fmt.Errorf("failed to create a new batch for deleting block data: %w", err) | ||
} | ||
|
||
if err := batch.Delete(ctx, ds.NewKey(getHeaderKey(height))); err != nil { | ||
return fmt.Errorf("failed to delete header blob in batch: %w", err) | ||
} | ||
if err := batch.Delete(ctx, ds.NewKey(getDataKey(height))); err != nil { | ||
return fmt.Errorf("failed to delete data blob in batch: %w", err) | ||
} | ||
if err := batch.Delete(ctx, ds.NewKey(getSignatureKey(height))); err != nil { | ||
return fmt.Errorf("failed to delete signature of block blob in batch: %w", err) | ||
} | ||
if err := batch.Commit(ctx); err != nil { | ||
return fmt.Errorf("failed to commit batch: %w", err) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how do we ensure that we delete everything? there might be more prefix stores in the future? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Structures like block and header may need to refactor in future I think currently it's enough to be same with |
||
|
||
return nil | ||
} | ||
Eoous marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+108
to
+128
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current implementation of To fix this, I've also added handling for func (s *DefaultStore) DeleteBlockData(ctx context.Context, height uint64) error {
// Get the header to retrieve the block hash for index deletion.
header, err := s.GetHeader(ctx, height)
if err != nil {
// If block not found, it might have been already pruned.
if errors.Is(err, ds.ErrNotFound) {
return nil
}
return fmt.Errorf("failed to get header for block %d: %w", height, err)
}
hash := header.Hash()
batch, err := s.db.Batch(ctx)
if err != nil {
return fmt.Errorf("failed to create a new batch for deleting block data: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getHeaderKey(height))); err != nil {
return fmt.Errorf("failed to delete header blob in batch: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getDataKey(height))); err != nil {
return fmt.Errorf("failed to delete data blob in batch: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getSignatureKey(height))); err != nil {
return fmt.Errorf("failed to delete signature of block blob in batch: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getIndexKey(hash))); err != nil {
return fmt.Errorf("failed to delete index key in batch: %w", err)
}
if err := batch.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}
return nil
} |
||
|
||
// TODO: We unmarshal the header and data here, but then we re-marshal them to proto to hash or send them to DA, we should not unmarshal them here and allow the caller to handle them as needed. | ||
|
||
// GetBlockData returns block header and data at given height, or error if it's not found in Store. | ||
func (s *DefaultStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) { | ||
headerBlob, err := s.db.Get(ctx, ds.NewKey(getHeaderKey(height))) | ||
|
Uh oh!
There was an error while loading. Please reload this page.