Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1b93aac
duties: log more details
iurii-ssv Sep 25, 2025
a3d6730
move stuff around
iurii-ssv Sep 25, 2025
54068da
adjustments
iurii-ssv Sep 26, 2025
e467736
simplify dutyMeasurements struct
iurii-ssv Sep 26, 2025
a641687
Merge branch 'stage' into logs-log-more-details-per-validator-slot-epoch
iurii-ssv Sep 29, 2025
b87fadd
remove duplicate current_slot entry in some log lines
iurii-ssv Sep 29, 2025
c0fe386
clarify runner-prunning log-line
iurii-ssv Sep 30, 2025
aadaad1
Merge branch 'stage' into logs-log-more-details-per-validator-slot-epoch
iurii-ssv Sep 30, 2025
300b6ca
add units to durations
iurii-ssv Oct 1, 2025
51a133c
minor improvements
iurii-ssv Oct 1, 2025
55d84b7
cleanup
iurii-ssv Oct 1, 2025
e8a03a3
dedup some functionality
iurii-ssv Oct 1, 2025
c1f66b8
fix(add) consensus_time units
iurii-ssv Oct 1, 2025
d1d4c55
fix the consensus_time calculation for aggregator duty
iurii-ssv Oct 1, 2025
0f24cff
Merge branch 'stage' into logs-log-more-details-per-validator-slot-epoch
iurii-ssv Oct 22, 2025
6a8887c
Merge branch 'stage' into logs-log-more-details-per-validator-slot-epoch
iurii-ssv Oct 27, 2025
be24f7a
minor improvements (+ adjustments after rebase)
iurii-ssv Oct 29, 2025
9387920
Merge branch 'stage' into logs-log-more-details-per-validator-slot-epoch
iurii-ssv Oct 29, 2025
3e17951
clean up the logs around message-retries
iurii-ssv Oct 29, 2025
91f7b29
tmp
iurii-ssv Oct 29, 2025
f9818ca
committee-runner: log all roots (msg-roots vs quorum-roots)
iurii-ssv Oct 29, 2025
aacf87c
print the actual roots (not the lengths)
iurii-ssv Oct 29, 2025
e954078
fix merge artifacts & improve logs
iurii-ssv Oct 29, 2025
0537b71
log own operator-id next to qbft leader
iurii-ssv Oct 29, 2025
dff5512
clean up logs
iurii-ssv Oct 29, 2025
8651427
print quorumRoots length instead of contents
iurii-ssv Oct 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions api/handlers/exporter/decided.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
spectypes "github.com/ssvlabs/ssv-spec/types"

"github.com/ssvlabs/ssv/api"
"github.com/ssvlabs/ssv/ibft/storage"
dutytracer "github.com/ssvlabs/ssv/operator/dutytracer"
qbftstorage "github.com/ssvlabs/ssv/protocol/v2/qbft/storage"
)

// TraceDecideds godoc
Expand Down Expand Up @@ -132,7 +132,7 @@ func (e *Exporter) Decideds(w http.ResponseWriter, r *http.Request) error {
role := spectypes.BeaconRole(r)
store := e.participantStores.Get(role)

var participantsRange []qbftstorage.ParticipantsRangeEntry
var participantsRange []storage.ParticipantsRangeEntry

if len(pubkeys) == 0 {
var err error
Expand Down Expand Up @@ -216,12 +216,12 @@ func (e *Exporter) getValidatorDecidedsForRole(slot phase0.Slot, indices []phase

// toParticipantsRangeEntry converts an index-based entry into a ParticipantsRangeEntry
// by resolving the validator's pubkey from the registry store.
func (e *Exporter) toParticipantsRangeEntry(ent dutytracer.ParticipantsRangeIndexEntry) (qbftstorage.ParticipantsRangeEntry, error) {
func (e *Exporter) toParticipantsRangeEntry(ent dutytracer.ParticipantsRangeIndexEntry) (storage.ParticipantsRangeEntry, error) {
pk, found := e.validators.ValidatorPubkey(ent.Index)
if !found {
return qbftstorage.ParticipantsRangeEntry{}, fmt.Errorf("validator not found by index: %d", ent.Index)
return storage.ParticipantsRangeEntry{}, fmt.Errorf("validator not found by index: %d", ent.Index)
}
return qbftstorage.ParticipantsRangeEntry{
return storage.ParticipantsRangeEntry{
Slot: ent.Slot,
PubKey: pk,
Signers: ent.Signers,
Expand Down
4 changes: 2 additions & 2 deletions api/handlers/exporter/decided_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
spectypes "github.com/ssvlabs/ssv-spec/types"

"github.com/ssvlabs/ssv/api"
qbftstorage "github.com/ssvlabs/ssv/protocol/v2/qbft/storage"
"github.com/ssvlabs/ssv/ibft/storage"
)

// DecidedParticipant describes a decided duty participant entry.
Expand Down Expand Up @@ -72,7 +72,7 @@ func (r *DecidedsRequest) hasFilters() bool {
return len(r.PubKeys) > 0 || len(r.Indices) > 0
}

func toParticipantResponse(role spectypes.BeaconRole, entry qbftstorage.ParticipantsRangeEntry) *DecidedParticipant {
func toParticipantResponse(role spectypes.BeaconRole, entry storage.ParticipantsRangeEntry) *DecidedParticipant {
response := &DecidedParticipant{
Role: role.String(),
Slot: uint64(entry.Slot),
Expand Down
32 changes: 15 additions & 17 deletions api/handlers/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,31 @@ import (
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethereum/go-ethereum/common"
"github.com/hashicorp/go-multierror"
spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

spectypes "github.com/ssvlabs/ssv-spec/types"

"github.com/ssvlabs/ssv/api"
"github.com/ssvlabs/ssv/exporter"
"github.com/ssvlabs/ssv/exporter/rolemask"
estore "github.com/ssvlabs/ssv/exporter/store"
ibftstorage "github.com/ssvlabs/ssv/ibft/storage"
dutytracer "github.com/ssvlabs/ssv/operator/dutytracer"
"github.com/ssvlabs/ssv/operator/slotticker"
qbftstorage "github.com/ssvlabs/ssv/protocol/v2/qbft/storage"
ssvtypes "github.com/ssvlabs/ssv/protocol/v2/types"
"github.com/ssvlabs/ssv/registry/storage"
)

// mockParticipantStore is a basic mock for qbftstorage.ParticipantStore.
// mockParticipantStore is a basic mock for ibftstorage.ParticipantStore.
type mockParticipantStore struct {
participantsRangeEntries map[string][]qbftstorage.ParticipantsRangeEntry
participantsRangeEntries map[string][]ibftstorage.ParticipantsRangeEntry
}

// newMockParticipantStore creates a new instance of mockParticipantStore.
func newMockParticipantStore() *mockParticipantStore {
return &mockParticipantStore{
participantsRangeEntries: make(map[string][]qbftstorage.ParticipantsRangeEntry),
participantsRangeEntries: make(map[string][]ibftstorage.ParticipantsRangeEntry),
}
}

Expand All @@ -54,8 +52,8 @@ func (m *mockParticipantStore) SaveParticipants(spectypes.ValidatorPK, phase0.Sl
}

// GetAllParticipantsInRange returns all participant entries within the given slot range.
func (m *mockParticipantStore) GetAllParticipantsInRange(from, to phase0.Slot) ([]qbftstorage.ParticipantsRangeEntry, error) {
var result []qbftstorage.ParticipantsRangeEntry
func (m *mockParticipantStore) GetAllParticipantsInRange(from, to phase0.Slot) ([]ibftstorage.ParticipantsRangeEntry, error) {
var result []ibftstorage.ParticipantsRangeEntry
for _, entries := range m.participantsRangeEntries {
for _, entry := range entries {
if entry.Slot >= from && entry.Slot <= to {
Expand All @@ -67,9 +65,9 @@ func (m *mockParticipantStore) GetAllParticipantsInRange(from, to phase0.Slot) (
}

// GetParticipantsInRange returns participant entries for a given public key and slot range.
func (m *mockParticipantStore) GetParticipantsInRange(pk spectypes.ValidatorPK, from, to phase0.Slot) ([]qbftstorage.ParticipantsRangeEntry, error) {
func (m *mockParticipantStore) GetParticipantsInRange(pk spectypes.ValidatorPK, from, to phase0.Slot) ([]ibftstorage.ParticipantsRangeEntry, error) {
key := hex.EncodeToString(pk[:])
var result []qbftstorage.ParticipantsRangeEntry
var result []ibftstorage.ParticipantsRangeEntry
entries, ok := m.participantsRangeEntries[key]
if !ok {
return result, nil
Expand All @@ -90,14 +88,14 @@ func (m *mockParticipantStore) Prune(context.Context, phase0.Slot) {
// no-op.
}

func (m *mockParticipantStore) PruneContinously(context.Context, slotticker.Provider, phase0.Slot) {
func (m *mockParticipantStore) PruneContinuously(context.Context, slotticker.Provider, phase0.Slot) {
// no-op.
}

// AddEntry adds an entry to the mock store.
func (m *mockParticipantStore) AddEntry(pk spectypes.ValidatorPK, slot phase0.Slot, signers []uint64) {
key := hex.EncodeToString(pk[:])
entry := qbftstorage.ParticipantsRangeEntry{
entry := ibftstorage.ParticipantsRangeEntry{
Slot: slot,
PubKey: pk,
Signers: signers,
Expand All @@ -110,7 +108,7 @@ type errorAllRangeMockStore struct {
*mockParticipantStore
}

func (m *errorAllRangeMockStore) GetAllParticipantsInRange(phase0.Slot, phase0.Slot) ([]qbftstorage.ParticipantsRangeEntry, error) {
func (m *errorAllRangeMockStore) GetAllParticipantsInRange(phase0.Slot, phase0.Slot) ([]ibftstorage.ParticipantsRangeEntry, error) {
return nil, fmt.Errorf("forced error on GetAllParticipantsInRange")
}

Expand All @@ -119,7 +117,7 @@ type errorByPKMockStore struct {
*mockParticipantStore
}

func (m *errorByPKMockStore) GetParticipantsInRange(spectypes.ValidatorPK, phase0.Slot, phase0.Slot) ([]qbftstorage.ParticipantsRangeEntry, error) {
func (m *errorByPKMockStore) GetParticipantsInRange(spectypes.ValidatorPK, phase0.Slot, phase0.Slot) ([]ibftstorage.ParticipantsRangeEntry, error) {
return nil, fmt.Errorf("forced error on GetParticipantsInRange")
}

Expand All @@ -133,7 +131,7 @@ func TestTransformToParticipantResponse(t *testing.T) {
var pk spectypes.ValidatorPK
copy(pk[:], pkBytes)

entry := qbftstorage.ParticipantsRangeEntry{
entry := ibftstorage.ParticipantsRangeEntry{
Slot: phase0.Slot(123),
PubKey: pk,
Signers: []uint64{1, 2, 3, 4},
Expand Down Expand Up @@ -752,7 +750,7 @@ func TestExporterTraceDecideds(t *testing.T) {
"pubkeys": []string{"b24454393691331ee6eba4ffa2dbb2600b9859f908c3e648b6c6de9e1dea3e9329866015d08355c8d451427762b913d1"},
},
setupMock: func(store *mockTraceStore) {
entry := qbftstorage.ParticipantsRangeEntry{Signers: []uint64{}}
entry := ibftstorage.ParticipantsRangeEntry{Signers: []uint64{}}
store.AddValidatorDecided(spectypes.BNRoleProposer, phase0.Slot(150), entry.Signers)
},
expectedStatus: http.StatusOK,
Expand All @@ -772,7 +770,7 @@ func TestExporterTraceDecideds(t *testing.T) {
"pubkeys": []string{"b24454393691331ee6eba4ffa2dbb2600b9859f908c3e648b6c6de9e1dea3e9329866015d08355c8d451427762b913d1"},
},
setupMock: func(store *mockTraceStore) {
entry := qbftstorage.ParticipantsRangeEntry{Signers: []uint64{}}
entry := ibftstorage.ParticipantsRangeEntry{Signers: []uint64{}}
store.AddCommitteeDecided(phase0.Slot(150), entry.Signers)
},
expectedStatus: http.StatusOK,
Expand Down
6 changes: 3 additions & 3 deletions beacon/goclient/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (gc *GoClient) SubmitAggregateSelectionProof(
// As specified in spec, an aggregator should wait until two thirds of the way through slot
// to broadcast the best aggregate to the global aggregate channel.
// https://github.com/ethereum/consensus-specs/blob/v0.9.3/specs/validator/0_beacon-chain-validator.md#broadcast-aggregate
if err := gc.waitToSlotTwoThirds(ctx, slot); err != nil {
if err := gc.waitTwoThirdsIntoSlot(ctx, slot); err != nil {
return nil, 0, fmt.Errorf("wait for 2/3 of slot: %w", err)
}

Expand Down Expand Up @@ -153,8 +153,8 @@ func (gc *GoClient) SubmitSignedAggregateSelectionProof(
return nil
}

// waitToSlotTwoThirds waits until two-third of the slot has transpired (SECONDS_PER_SLOT * 2 / 3 seconds after the start of slot)
func (gc *GoClient) waitToSlotTwoThirds(ctx context.Context, slot phase0.Slot) error {
// waitTwoThirdsIntoSlot waits until two-third of the slot has transpired (SECONDS_PER_SLOT * 2 / 3 seconds after the start of slot)
func (gc *GoClient) waitTwoThirdsIntoSlot(ctx context.Context, slot phase0.Slot) error {
config := gc.getBeaconConfig()
oneInterval := config.IntervalDuration()
finalTime := config.SlotStartTime(slot).Add(2 * oneInterval)
Expand Down
8 changes: 4 additions & 4 deletions beacon/goclient/sync_committee_contribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (gc *GoClient) GetSyncCommitteeContribution(
return nil, DataVersionNil, fmt.Errorf("mismatching number of selection proofs and subnet IDs")
}

gc.waitForOneThirdSlotDuration(ctx, slot)
gc.waitOneThirdIntoSlot(ctx, slot)

scDataReqStart := time.Now()
beaconBlockRootResp, err := gc.multiClient.BeaconBlockRoot(ctx, &api.BeaconBlockRootOpts{
Expand All @@ -70,7 +70,7 @@ func (gc *GoClient) GetSyncCommitteeContribution(

blockRoot := beaconBlockRootResp.Data

if err := gc.waitToSlotTwoThirds(ctx, slot); err != nil {
if err := gc.waitTwoThirdsIntoSlot(ctx, slot); err != nil {
return nil, 0, fmt.Errorf("wait for 2/3 of slot: %w", err)
}

Expand Down Expand Up @@ -129,8 +129,8 @@ func (gc *GoClient) SubmitSignedContributionAndProof(
return nil
}

// waitForOneThirdSlotDuration waits until one-third of the slot has transpired (SECONDS_PER_SLOT / 3 seconds after slot start time)
func (gc *GoClient) waitForOneThirdSlotDuration(ctx context.Context, slot phase0.Slot) {
// waitOneThirdIntoSlot waits until one-third of the slot has transpired (SECONDS_PER_SLOT / 3 seconds after slot start time)
func (gc *GoClient) waitOneThirdIntoSlot(ctx context.Context, slot phase0.Slot) {
config := gc.getBeaconConfig()
delay := config.IntervalDuration()
finalTime := config.SlotStartTime(slot).Add(delay)
Expand Down
2 changes: 1 addition & 1 deletion beacon/goclient/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (gc *GoClient) SubmitValidatorRegistrations(ctx context.Context, registrati
if err != nil {
return errMultiClient(fmt.Errorf("submit validator registrations (chunk size = %d): %w", len(chunk), err), "SubmitValidatorRegistrations")
}
gc.log.Info("submitted validator registrations", fields.Count(len(chunk)), fields.Duration(reqStart))
gc.log.Info("submitted validator registrations", fields.Count(len(chunk)), fields.Took(time.Since(reqStart)))
}

return nil
Expand Down
14 changes: 7 additions & 7 deletions cli/operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ import (
"github.com/ssvlabs/ssv/operator/validator"
"github.com/ssvlabs/ssv/operator/validator/metadata"
"github.com/ssvlabs/ssv/operator/validators"
qbftstorage "github.com/ssvlabs/ssv/protocol/v2/qbft/storage"
"github.com/ssvlabs/ssv/protocol/v2/ssv/runner"
"github.com/ssvlabs/ssv/protocol/v2/types"
registrystorage "github.com/ssvlabs/ssv/registry/storage"
Expand Down Expand Up @@ -889,18 +888,19 @@ func applyMigrations(

// If migrations were applied, we run a full garbage collection cycle
// to reclaim any space that may have been freed up.
start := time.Now()

logger.Debug("running full GC cycle...")

ctx, cancel := context.WithTimeout(cfg.DBOptions.Ctx, 6*time.Minute)
defer cancel()

logger.Debug("running full GC cycle...", fields.Duration(start))
start := time.Now()

if err := db.FullGC(ctx); err != nil {
return fmt.Errorf("failed to collect garbage: %w", err)
}

logger.Debug("post-migrations garbage collection completed", fields.Duration(start))
logger.Debug("post-migrations garbage collection completed", fields.Took(time.Since(start)))

return nil
}
Expand Down Expand Up @@ -1203,7 +1203,7 @@ func initSlotPruning(ctx context.Context, stores *ibftstorage.ParticipantStores,
threshold := slot - phase0.Slot(retain)

// async perform initial slot gc
_ = stores.Each(func(_ spectypes.BeaconRole, store qbftstorage.ParticipantStore) error {
_ = stores.Each(func(_ spectypes.BeaconRole, store ibftstorage.ParticipantStore) error {
wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -1215,8 +1215,8 @@ func initSlotPruning(ctx context.Context, stores *ibftstorage.ParticipantStores,
wg.Wait()

// start background job for removing old slots on every tick
_ = stores.Each(func(_ spectypes.BeaconRole, store qbftstorage.ParticipantStore) error {
go store.PruneContinously(ctx, slotTickerProvider, phase0.Slot(retain))
_ = stores.Each(func(_ spectypes.BeaconRole, store ibftstorage.ParticipantStore) error {
go store.PruneContinuously(ctx, slotTickerProvider, phase0.Slot(retain))
return nil
})
}
5 changes: 2 additions & 3 deletions exporter/api/decided/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ import (
"time"

"github.com/patrickmn/go-cache"
"go.uber.org/zap"

spectypes "github.com/ssvlabs/ssv-spec/types"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/exporter/api"
qbftstorage "github.com/ssvlabs/ssv/ibft/storage"
"github.com/ssvlabs/ssv/observability/log/fields"
dutytracer "github.com/ssvlabs/ssv/operator/dutytracer"
"github.com/ssvlabs/ssv/protocol/v2/qbft/controller"
qbftstorage "github.com/ssvlabs/ssv/protocol/v2/qbft/storage"
registrystorage "github.com/ssvlabs/ssv/registry/storage"
)

Expand Down
2 changes: 1 addition & 1 deletion exporter/api/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
specqbft "github.com/ssvlabs/ssv-spec/qbft"
spectypes "github.com/ssvlabs/ssv-spec/types"

qbftstorage "github.com/ssvlabs/ssv/protocol/v2/qbft/storage"
qbftstorage "github.com/ssvlabs/ssv/ibft/storage"
)

// Message represents an exporter message
Expand Down
7 changes: 3 additions & 4 deletions exporter/api/query_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/ssvlabs/ssv/ibft/storage"
"github.com/ssvlabs/ssv/observability/log/fields"
"github.com/ssvlabs/ssv/protocol/v2/message"
qbftstorage "github.com/ssvlabs/ssv/protocol/v2/qbft/storage"
)

const (
Expand Down Expand Up @@ -114,10 +113,10 @@ func (h *Handler) HandleParticipantsQuery(store *storage.ParticipantStores, nm *
nm.Msg = res
}

func toParticipations(role spectypes.BeaconRole, pk spectypes.ValidatorPK, ee []qbftstorage.ParticipantsRangeEntry) []qbftstorage.Participation {
out := make([]qbftstorage.Participation, 0, len(ee))
func toParticipations(role spectypes.BeaconRole, pk spectypes.ValidatorPK, ee []storage.ParticipantsRangeEntry) []storage.Participation {
out := make([]storage.Participation, 0, len(ee))
for _, e := range ee {
p := qbftstorage.Participation{
p := storage.Participation{
ParticipantsRangeEntry: e,
Role: role,
PubKey: pk,
Expand Down
3 changes: 1 addition & 2 deletions exporter/api/query_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/ssvlabs/ssv/networkconfig"
"github.com/ssvlabs/ssv/observability/log"
"github.com/ssvlabs/ssv/operator/storage"
protocoltesting "github.com/ssvlabs/ssv/protocol/v2/testing"
kv "github.com/ssvlabs/ssv/storage/badger"
"github.com/ssvlabs/ssv/storage/basedb"
)
Expand Down Expand Up @@ -108,7 +107,7 @@ func TestHandleDecidedQuery(t *testing.T) {
for _, role := range roles {
pk := sks[1].GetPublicKey()
ssvConfig := networkconfig.TestNetwork.SSV
decided250Seq, err := protocoltesting.CreateMultipleStoredInstances(rsaKeys, specqbft.Height(0), specqbft.Height(250), func(height specqbft.Height) ([]spectypes.OperatorID, *specqbft.Message) {
decided250Seq, err := qbftstorage.CreateMultipleStoredInstances(rsaKeys, specqbft.Height(0), specqbft.Height(250), func(height specqbft.Height) ([]spectypes.OperatorID, *specqbft.Message) {
return oids, &specqbft.Message{
MsgType: specqbft.CommitMsgType,
Height: height,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package qbftstorage
package storage

import (
"context"
Expand Down Expand Up @@ -50,5 +50,5 @@ type ParticipantStore interface {
Prune(ctx context.Context, below phase0.Slot)

// SlotGC continuously removes old slots
PruneContinously(ctx context.Context, slotTickerProvider slotticker.Provider, retain phase0.Slot)
PruneContinuously(ctx context.Context, slotTickerProvider slotticker.Provider, retain phase0.Slot)
}
Loading