-
Notifications
You must be signed in to change notification settings - Fork 219
feat(store): support pruning store #2208
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
291aea5
e5e8080
6b0cc41
7ca9b67
82c76be
cd92e3e
9420944
21c0b13
45acc3a
685dbe7
d5b16c2
350b21d
7a04418
a4d2fc1
29656c9
2422d00
8ed80b4
b16db8c
8f70c51
32be97e
1b3ffcf
da2dfa1
220f802
ab0c7df
7fcf6a8
4639bc9
34d79c2
99c5b39
99dd5fe
cda2d67
8839aad
9585715
fe40d10
b1b2e72
561349e
cca398e
8eb6588
0c295a1
a6541d0
086904a
680c1ba
2a61d47
9429c9f
6cc1f55
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
package block | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"cosmossdk.io/log" | ||
"github.com/rollkit/rollkit/pkg/store" | ||
) | ||
|
||
const DefaultFlushInterval = 10 * time.Second | ||
|
||
// AsyncPruner is a service that periodically prunes block data in the background. | ||
type AsyncPruner struct { | ||
ps store.PruningStore | ||
|
||
flushInterval time.Duration | ||
logger log.Logger | ||
} | ||
|
||
func NewAsyncPruner(pruningStore store.PruningStore, flushInterval time.Duration, logger log.Logger) *AsyncPruner { | ||
return &AsyncPruner{ | ||
ps: pruningStore, | ||
|
||
flushInterval: flushInterval, | ||
logger: logger, | ||
} | ||
} | ||
|
||
// Start starts the async pruner that periodically prunes block data. | ||
func (s *AsyncPruner) Start(ctx context.Context) { | ||
ticker := time.NewTicker(s.flushInterval) | ||
defer ticker.Stop() | ||
|
||
s.logger.Info("AsyncPruner started", "interval", s.flushInterval) | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
s.logger.Info("AsyncPruner stopped") | ||
return | ||
case <-ticker.C: | ||
err := s.ps.PruneBlockData(ctx) | ||
if err != nil { | ||
s.logger.Error("Failed to prune block data", "error", err) | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
package config | ||
|
||
import ( | ||
"errors" | ||
) | ||
|
||
const ( | ||
// Pruning configuration flags | ||
|
||
// FlagPruningStrategy is a flag for specifying strategy for pruning block store | ||
FlagPruningStrategy = "rollkit.node.pruning.strategy" | ||
// FlagPruningKeepRecent is a flag for specifying how many blocks need to keep in store | ||
FlagPruningKeepRecent = "rollkit.node.pruning.keep_recent" | ||
// FlagPruningInterval is a flag for specifying how often prune blocks store | ||
FlagPruningInterval = "rollkit.node.pruning.interval" | ||
) | ||
|
||
const ( | ||
PruningConfigStrategyNone = "none" | ||
PruningConfigStrategyDefault = "default" | ||
PruningConfigStrategyEverything = "everything" | ||
PruningConfigStrategyCustom = "custom" | ||
) | ||
|
||
var ( | ||
PruningConfigNone = PruningConfig{ | ||
Strategy: PruningConfigStrategyNone, | ||
KeepRecent: 0, | ||
Interval: 0, | ||
} | ||
PruningConfigDefault = PruningConfig{ | ||
Strategy: PruningConfigStrategyDefault, | ||
KeepRecent: 362880, | ||
Interval: 10, | ||
} | ||
PruningConfigEverything = PruningConfig{ | ||
Strategy: PruningConfigStrategyEverything, | ||
KeepRecent: 2, | ||
Interval: 10, | ||
} | ||
PruningConfigCustom = PruningConfig{ | ||
Strategy: PruningConfigStrategyCustom, | ||
KeepRecent: 100, | ||
Interval: 100, | ||
} | ||
) | ||
|
||
// PruningConfig allows node operators to manage storage | ||
type PruningConfig struct { | ||
// todo: support volume-based strategy | ||
Strategy string `mapstructure:"strategy" yaml:"strategy" comment:"Strategy determines the pruning approach (none, default, everything, custom)"` | ||
KeepRecent uint64 `mapstructure:"keep_recent" yaml:"keep_recent" comment:"Number of recent blocks to keep, used in \"custom\" strategy, must be greater or equal than 2"` | ||
Interval uint64 `mapstructure:"interval" yaml:"interval" comment:"How often the pruning process should run, used in \"custom\" strategy"` | ||
|
||
// todo: support volume-based strategy | ||
// VolumeConfig specifies configuration for volume-based storage | ||
// VolumeConfig *VolumeStorageConfig `mapstructure:"volume_config" yaml:"volume_config"` | ||
} | ||
|
||
func (p PruningConfig) Validate() error { | ||
// Only Custom strategy requires validation. | ||
if p.Strategy != PruningConfigStrategyCustom { | ||
return nil | ||
} | ||
|
||
if p.KeepRecent < 2 { | ||
return errors.New("keep_recent must be greater or equal than 2 for custom pruning strategy") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func GetPruningConfigFromStrategy(strategy string) PruningConfig { | ||
switch strategy { | ||
case PruningConfigStrategyDefault: | ||
return PruningConfigDefault | ||
case PruningConfigStrategyEverything: | ||
return PruningConfigEverything | ||
default: | ||
return PruningConfigNone | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,73 @@ | ||||||
package store | ||||||
|
||||||
import ( | ||||||
"context" | ||||||
"sync" | ||||||
|
||||||
ds "github.com/ipfs/go-datastore" | ||||||
|
||||||
"github.com/rollkit/rollkit/pkg/config" | ||||||
"github.com/rollkit/rollkit/types" | ||||||
) | ||||||
|
||||||
// DefaultPruningStore is for a store that supports pruning of block data. | ||||||
type DefaultPruningStore struct { | ||||||
Store | ||||||
|
||||||
config config.PruningConfig | ||||||
latestBlockDataHeight uint64 | ||||||
mu sync.Mutex | ||||||
} | ||||||
|
||||||
var _ PruningStore = &DefaultPruningStore{} | ||||||
|
||||||
func NewDefaultPruningStore(ds ds.Batching, config config.PruningConfig) PruningStore { | ||||||
return &DefaultPruningStore{ | ||||||
Store: &DefaultStore{db: ds}, | ||||||
|
||||||
config: config, | ||||||
} | ||||||
} | ||||||
|
||||||
// SaveBlockData saves the block data and updates the latest block data height. | ||||||
func (s *DefaultPruningStore) SaveBlockData(ctx context.Context, header *types.SignedHeader, data *types.Data, signature *types.Signature) error { | ||||||
err := s.Store.SaveBlockData(ctx, header, data, signature) | ||||||
if err == nil { | ||||||
s.mu.Lock() | ||||||
s.latestBlockDataHeight = header.Height() | ||||||
Eoous marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
s.mu.Unlock() | ||||||
} | ||||||
|
||||||
return err | ||||||
} | ||||||
|
||||||
func (s *DefaultPruningStore) PruneBlockData(ctx context.Context) error { | ||||||
// Skip if strategy is none. | ||||||
if s.config.Strategy == config.PruningConfigStrategyNone { | ||||||
return nil | ||||||
} | ||||||
|
||||||
// Read latest height after calling SaveBlockData. There is a delay between SetHeight and SaveBlockData. | ||||||
s.mu.Lock() | ||||||
height := s.latestBlockDataHeight | ||||||
s.mu.Unlock() | ||||||
|
||||||
// Skip if not the correct interval or latest height is less or equal than number of blocks need to keep. | ||||||
if height%s.config.Interval != 0 || height < s.config.KeepRecent { | ||||||
return nil | ||||||
} | ||||||
|
||||||
// Must keep at least 2 blocks(while strategy is everything). | ||||||
endHeight := height - 1 - s.config.KeepRecent | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The calculation for If we want to keep With the current logic, To fix this,
Suggested change
|
||||||
startHeight := uint64(0) | ||||||
if endHeight > s.config.Interval { | ||||||
startHeight = endHeight - s.config.Interval | ||||||
} | ||||||
|
||||||
for i := startHeight; i < endHeight; i++ { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you need to protect against concurrent executions? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As this is a public method, it can be called by other threads as well. It would not cost much to exit early when pruning is not completed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Other threads shouldn't call this method. It's only added for pruning thread Maybe currently is no need to add some protections? |
||||||
// Could ignore for errors like not found. | ||||||
_ = s.DeleteBlockData(ctx, i) | ||||||
} | ||||||
Comment on lines
+67
to
+70
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Errors from Ignoring these errors can hide serious problems and leave the pruning process in an incomplete state without any notification. The error should be propagated up to the With the recommended changes to if err := s.DeleteBlockData(ctx, i); err != nil {
return err
} |
||||||
|
||||||
return nil | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -82,7 +82,7 @@ func (s *DefaultStore) SaveBlockData(ctx context.Context, header *types.SignedHe | |
|
||
batch, err := s.db.Batch(ctx) | ||
if err != nil { | ||
return fmt.Errorf("failed to create a new batch: %w", err) | ||
return fmt.Errorf("failed to create a new batch for saving block data: %w", err) | ||
} | ||
|
||
if err := batch.Put(ctx, ds.NewKey(getHeaderKey(height)), headerBlob); err != nil { | ||
|
@@ -104,6 +104,31 @@ func (s *DefaultStore) SaveBlockData(ctx context.Context, header *types.SignedHe | |
return nil | ||
} | ||
|
||
// DeleteBlockData deletes block at given height. | ||
func (s *DefaultStore) DeleteBlockData(ctx context.Context, height uint64) error { | ||
batch, err := s.db.Batch(ctx) | ||
if err != nil { | ||
return fmt.Errorf("failed to create a new batch for deleting block data: %w", err) | ||
} | ||
|
||
if err := batch.Delete(ctx, ds.NewKey(getHeaderKey(height))); err != nil { | ||
return fmt.Errorf("failed to delete header blob in batch: %w", err) | ||
} | ||
if err := batch.Delete(ctx, ds.NewKey(getDataKey(height))); err != nil { | ||
return fmt.Errorf("failed to delete data blob in batch: %w", err) | ||
} | ||
if err := batch.Delete(ctx, ds.NewKey(getSignatureKey(height))); err != nil { | ||
return fmt.Errorf("failed to delete signature of block blob in batch: %w", err) | ||
} | ||
if err := batch.Commit(ctx); err != nil { | ||
return fmt.Errorf("failed to commit batch: %w", err) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how do we ensure that we delete everything? there might be more prefix stores in the future? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Structures like block and header may need to refactor in future I think currently it's enough to be same with |
||
|
||
return nil | ||
} | ||
Eoous marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
+108
to
+128
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current implementation of To fix this, I've also added handling for func (s *DefaultStore) DeleteBlockData(ctx context.Context, height uint64) error {
// Get the header to retrieve the block hash for index deletion.
header, err := s.GetHeader(ctx, height)
if err != nil {
// If block not found, it might have been already pruned.
if errors.Is(err, ds.ErrNotFound) {
return nil
}
return fmt.Errorf("failed to get header for block %d: %w", height, err)
}
hash := header.Hash()
batch, err := s.db.Batch(ctx)
if err != nil {
return fmt.Errorf("failed to create a new batch for deleting block data: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getHeaderKey(height))); err != nil {
return fmt.Errorf("failed to delete header blob in batch: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getDataKey(height))); err != nil {
return fmt.Errorf("failed to delete data blob in batch: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getSignatureKey(height))); err != nil {
return fmt.Errorf("failed to delete signature of block blob in batch: %w", err)
}
if err := batch.Delete(ctx, ds.NewKey(getIndexKey(hash))); err != nil {
return fmt.Errorf("failed to delete index key in batch: %w", err)
}
if err := batch.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}
return nil
} |
||
|
||
// TODO: We unmarshal the header and data here, but then we re-marshal them to proto to hash or send them to DA, we should not unmarshal them here and allow the caller to handle them as needed. | ||
|
||
// GetBlockData returns block header and data at given height, or error if it's not found in Store. | ||
func (s *DefaultStore) GetBlockData(ctx context.Context, height uint64) (*types.SignedHeader, *types.Data, error) { | ||
header, err := s.GetHeader(ctx, height) | ||
|
Uh oh!
There was an error while loading. Please reload this page.