Skip to content
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 108 additions & 94 deletions validator_client/validator_services/src/attestation_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,23 +180,64 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
Ok(())
}

/// For each each required attestation, spawn a new task that downloads, signs and uploads the
/// attestation to the beacon node.
/// Spawn only one new task for attestation post-Electra
/// For each required aggregates, spawn a new task that downloads, signs and uploads the
/// aggregates to the beacon node.
fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> {
let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?;
let duration_to_next_slot = self
.slot_clock
.duration_to_next_slot()
.ok_or("Unable to determine duration to next slot")?;

// Create and publish an `Attestation` for all validators only once
// as the committee_index is the same (index 0) post-Electra
let attestation_duties: Vec<_> = self.duties_service.attesters(slot).into_iter().collect();

let attestation_data_cache = Arc::new(tokio::sync::RwLock::new(None));
let attestation_data_cache_clone = attestation_data_cache.clone();

let attestation_service = self.clone();
self.inner.executor.spawn_ignoring_error(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use spawn_handle and then await the handle to get the attestation data from inside the future (rather than using the attestation_data_cache RwLock).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case the handle doesn't return the result, it's an error and we probably need to just stop the attestation process for this slot (return Err(..))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the comment, I revised in 0967696

async move {
let attestation_data = attestation_service
.beacon_nodes
.first_success(|beacon_node| async move {
beacon_node
.get_validator_attestation_data(slot, 0)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
})
.await;

match attestation_data {
Ok(attestation_data) => {
*attestation_data_cache_clone.write().await =
Some(attestation_data.clone());

attestation_service
.publish_attestations(slot, &attestation_duties, attestation_data)
.await
.ok();
}
Err(e) => {
error!(error = %e, slot = slot.as_u64(), "Failed to get attestation data from beacon nodes");
}
}
Ok(())
},
"attestation publish",
);

// If a validator needs to publish an aggregate attestation, they must do so at 2/3
// through the slot. This delay triggers at this time
let aggregate_production_instant = Instant::now()
+ duration_to_next_slot
.checked_sub(slot_duration / 3)
.unwrap_or_else(|| Duration::from_secs(0));

let duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = self
let aggregate_duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = self
.duties_service
.attesters(slot)
.into_iter()
Expand All @@ -208,23 +249,23 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
});

// For each committee index for this slot:
//
// - Create and publish an `Attestation` for all required validators.
// - Create and publish `SignedAggregateAndProof` for all aggregating validators.
duties_by_committee_index
.into_iter()
.for_each(|(committee_index, validator_duties)| {
// Create and publish `SignedAggregateAndProof` for all aggregating validators.
aggregate_duties_by_committee_index.into_iter().for_each(
|(committee_index, validator_duties)| {
let attestation_data = attestation_data_cache.clone();
// Spawn a separate task for each attestation.
self.inner.executor.spawn_ignoring_error(
self.clone().publish_attestations_and_aggregates(
self.clone().handle_aggregates(
slot,
committee_index,
validator_duties,
aggregate_production_instant,
attestation_data,
),
"attestation publish",
"aggregate publish",
);
});
},
);

// Schedule pruning of the slashing protection database once all unaggregated
// attestations have (hopefully) been signed, i.e. at the same time as aggregate
Expand All @@ -234,84 +275,70 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
Ok(())
}

/// Performs the first step of the attesting process: downloading `Attestation` objects,
/// signing them and returning them to the validator.
///
/// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attesting
///
/// ## Detail
///
/// The given `validator_duties` should already be filtered to only contain those that match
/// `slot` and `committee_index`. Critical errors will be logged if this is not the case.
async fn publish_attestations_and_aggregates(
/// If an attestation was produced, make an aggregate
async fn handle_aggregates(
self,
slot: Slot,
committee_index: CommitteeIndex,
validator_duties: Vec<DutyAndProof>,
aggregate_production_instant: Instant,
attestation_data: Arc<tokio::sync::RwLock<Option<AttestationData>>>,
) -> Result<(), ()> {
let attestations_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS],
);

// There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have
// There's not need to produce `SignedAggregateAndProof` if we do not have
// any validators for the given `slot` and `committee_index`.
if validator_duties.is_empty() {
return Ok(());
}

// Step 1.
//
// Download, sign and publish an `Attestation` for each validator.
let attestation_opt = self
.produce_and_publish_attestations(slot, committee_index, &validator_duties)
.await
.map_err(move |e| {
crit!(
error = format!("{:?}", e),
committee_index,
slot = slot.as_u64(),
"Error during attestation routine"
)
})?;
// Wait until the `aggregation_production_instant` (2/3rds
// of the way though the slot). As verified in the
// `delay_triggers_when_in_the_past` test, this code will still run
// even if the instant has already elapsed.
sleep_until(aggregate_production_instant).await;

drop(attestations_timer);
// Start the metrics timer *after* we've done the delay.
let _aggregates_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::AGGREGATES],
);

let attestation_data = {
let attestation_data_opt = attestation_data.read().await;
if let Some(attestation_data) = attestation_data_opt.as_ref() {
attestation_data.clone()
} else {
self.beacon_nodes
.first_success(|beacon_node| async move {
beacon_node
.get_validator_attestation_data(slot, 0)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
})
.await
.map_err(|e| {
error!(
error = %e,
slot = slot.as_u64(),
"Failed to get attestation data from beacon nodes"
);
})?
}
};

// Step 2.
//
// If an attestation was produced, make an aggregate.
if let Some(attestation_data) = attestation_opt {
// First, wait until the `aggregation_production_instant` (2/3rds
// of the way though the slot). As verified in the
// `delay_triggers_when_in_the_past` test, this code will still run
// even if the instant has already elapsed.
sleep_until(aggregate_production_instant).await;

// Start the metrics timer *after* we've done the delay.
let _aggregates_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::AGGREGATES],
);

// Then download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
// `committee_index`.
self.produce_and_publish_aggregates(
&attestation_data,
committee_index,
&validator_duties,
)
// Download, sign and publish a `SignedAggregateAndProof` for each
// validator that is elected to aggregate for this `slot` and
// `committee_index`.
self.produce_and_publish_aggregates(&attestation_data, committee_index, &validator_duties)
.await
.map_err(move |e| {
crit!(
error = format!("{:?}", e),
committee_index,
slot = slot.as_u64(),
"Error during attestation routine"
"Error during aggregate attestation routine"
)
})?;
}

Ok(())
}
Expand All @@ -328,14 +355,19 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
///
/// Only one `Attestation` is downloaded from the BN. It is then cloned and signed by each
/// validator and the list of individually-signed `Attestation` objects is returned to the BN.
async fn produce_and_publish_attestations(
async fn publish_attestations(
&self,
slot: Slot,
committee_index: CommitteeIndex,
validator_duties: &[DutyAndProof],
) -> Result<Option<AttestationData>, String> {
attestation_data: AttestationData,
) -> Result<(), String> {
let attestations_timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS],
);

if validator_duties.is_empty() {
return Ok(None);
return Ok(());
}

let current_epoch = self
Expand All @@ -344,22 +376,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
.ok_or("Unable to determine current slot from clock")?
.epoch(S::E::slots_per_epoch());

let attestation_data = self
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS_HTTP_GET],
);
beacon_node
.get_validator_attestation_data(slot, committee_index)
.await
.map_err(|e| format!("Failed to produce attestation data: {:?}", e))
.map(|result| result.data)
})
.await
.map_err(|e| e.to_string())?;

// Create futures to produce signed `Attestation` objects.
let attestation_data_ref = &attestation_data;
let signing_futures = validator_duties.iter().map(|duty_and_proof| async move {
Expand All @@ -372,7 +388,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
validator = ?duty.pubkey,
duty_slot = %duty.slot,
attestation_slot = %attestation_data.slot,
duty_index = duty.committee_index,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could keep these logs (these won't be 0, because the duty still contains the real committee index).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, revised in 0967696

attestation_index = attestation_data.index,
"Inconsistent validator duties during signing"
);
Expand Down Expand Up @@ -418,7 +433,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
info = "a validator may have recently been removed from this VC",
pubkey = ?pubkey,
validator = ?duty.pubkey,
committee_index = committee_index,
slot = slot.as_u64(),
"Missing pubkey for attestation"
);
Expand All @@ -428,7 +442,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
crit!(
error = ?e,
validator = ?duty.pubkey,
committee_index,
slot = slot.as_u64(),
"Failed to sign attestation"
);
Expand All @@ -446,7 +459,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,

if attestations.is_empty() {
warn!("No attestations were published");
return Ok(None);
return Ok(());
}
let fork_name = self
.chain_spec
Expand Down Expand Up @@ -507,7 +520,8 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
),
}

Ok(Some(attestation_data))
drop(attestations_timer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This drop probably not needed (can just rename the timer to _attestations_timer and it will drop when it goes out of scope, when the function finishes)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a underscore before the timer and removed this line: 0967696

Ok(())
}

/// Performs the second step of the attesting process: downloading an aggregated `Attestation`,
Expand Down
Loading