- 
                Notifications
    
You must be signed in to change notification settings  - Fork 931
 
Always use committee index 0 when getting attestation data #8171
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: unstable
Are you sure you want to change the base?
Changes from 3 commits
d0ac24d
              7b6def0
              a11bca0
              0967696
              9990122
              ed9ef19
              5f9ff2d
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -180,23 +180,64 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S, | |
| Ok(()) | ||
| } | ||
| 
     | 
||
| /// For each each required attestation, spawn a new task that downloads, signs and uploads the | ||
| /// attestation to the beacon node. | ||
| /// Spawn only one new task for attestation post-Electra | ||
| /// For each required aggregates, spawn a new task that downloads, signs and uploads the | ||
| /// aggregates to the beacon node. | ||
| fn spawn_attestation_tasks(&self, slot_duration: Duration) -> Result<(), String> { | ||
| let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; | ||
| let duration_to_next_slot = self | ||
| .slot_clock | ||
| .duration_to_next_slot() | ||
| .ok_or("Unable to determine duration to next slot")?; | ||
| 
     | 
||
| // Create and publish an `Attestation` for all validators only once | ||
| // as the committee_index is the same (index 0) post-Electra | ||
| let attestation_duties: Vec<_> = self.duties_service.attesters(slot).into_iter().collect(); | ||
| 
     | 
||
| let attestation_data_cache = Arc::new(tokio::sync::RwLock::new(None)); | ||
| let attestation_data_cache_clone = attestation_data_cache.clone(); | ||
| 
     | 
||
| let attestation_service = self.clone(); | ||
| self.inner.executor.spawn_ignoring_error( | ||
| async move { | ||
| let attestation_data = attestation_service | ||
| .beacon_nodes | ||
| .first_success(|beacon_node| async move { | ||
| beacon_node | ||
| .get_validator_attestation_data(slot, 0) | ||
| .await | ||
| .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) | ||
| .map(|result| result.data) | ||
| }) | ||
| .await; | ||
| 
     | 
||
| match attestation_data { | ||
| Ok(attestation_data) => { | ||
| *attestation_data_cache_clone.write().await = | ||
| Some(attestation_data.clone()); | ||
| 
     | 
||
| attestation_service | ||
| .publish_attestations(slot, &attestation_duties, attestation_data) | ||
| .await | ||
| .ok(); | ||
| } | ||
| Err(e) => { | ||
| error!(error = %e, slot = slot.as_u64(), "Failed to get attestation data from beacon nodes"); | ||
| } | ||
| } | ||
| Ok(()) | ||
| }, | ||
| "attestation publish", | ||
| ); | ||
| 
     | 
||
| // If a validator needs to publish an aggregate attestation, they must do so at 2/3 | ||
| // through the slot. This delay triggers at this time | ||
| let aggregate_production_instant = Instant::now() | ||
| + duration_to_next_slot | ||
| .checked_sub(slot_duration / 3) | ||
| .unwrap_or_else(|| Duration::from_secs(0)); | ||
| 
     | 
||
| let duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = self | ||
| let aggregate_duties_by_committee_index: HashMap<CommitteeIndex, Vec<DutyAndProof>> = self | ||
| .duties_service | ||
| .attesters(slot) | ||
| .into_iter() | ||
| 
        
          
        
         | 
    @@ -208,23 +249,23 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S, | |
| }); | ||
| 
     | 
||
| // For each committee index for this slot: | ||
| // | ||
| // - Create and publish an `Attestation` for all required validators. | ||
| // - Create and publish `SignedAggregateAndProof` for all aggregating validators. | ||
| duties_by_committee_index | ||
| .into_iter() | ||
| .for_each(|(committee_index, validator_duties)| { | ||
| // Create and publish `SignedAggregateAndProof` for all aggregating validators. | ||
| aggregate_duties_by_committee_index.into_iter().for_each( | ||
| |(committee_index, validator_duties)| { | ||
| let attestation_data = attestation_data_cache.clone(); | ||
| // Spawn a separate task for each attestation. | ||
| self.inner.executor.spawn_ignoring_error( | ||
| self.clone().publish_attestations_and_aggregates( | ||
| self.clone().handle_aggregates( | ||
| slot, | ||
| committee_index, | ||
| validator_duties, | ||
| aggregate_production_instant, | ||
| attestation_data, | ||
| ), | ||
| "attestation publish", | ||
| "aggregate publish", | ||
| ); | ||
| }); | ||
| }, | ||
| ); | ||
| 
     | 
||
| // Schedule pruning of the slashing protection database once all unaggregated | ||
| // attestations have (hopefully) been signed, i.e. at the same time as aggregate | ||
| 
        
          
        
         | 
    @@ -234,84 +275,70 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S, | |
| Ok(()) | ||
| } | ||
| 
     | 
||
| /// Performs the first step of the attesting process: downloading `Attestation` objects, | ||
| /// signing them and returning them to the validator. | ||
| /// | ||
| /// https://github.com/ethereum/eth2.0-specs/blob/v0.12.1/specs/phase0/validator.md#attesting | ||
| /// | ||
| /// ## Detail | ||
| /// | ||
| /// The given `validator_duties` should already be filtered to only contain those that match | ||
| /// `slot` and `committee_index`. Critical errors will be logged if this is not the case. | ||
| async fn publish_attestations_and_aggregates( | ||
| /// If an attestation was produced, make an aggregate | ||
| async fn handle_aggregates( | ||
| self, | ||
| slot: Slot, | ||
| committee_index: CommitteeIndex, | ||
| validator_duties: Vec<DutyAndProof>, | ||
| aggregate_production_instant: Instant, | ||
| attestation_data: Arc<tokio::sync::RwLock<Option<AttestationData>>>, | ||
| ) -> Result<(), ()> { | ||
| let attestations_timer = validator_metrics::start_timer_vec( | ||
| &validator_metrics::ATTESTATION_SERVICE_TIMES, | ||
| &[validator_metrics::ATTESTATIONS], | ||
| ); | ||
| 
     | 
||
| // There's not need to produce `Attestation` or `SignedAggregateAndProof` if we do not have | ||
| // There's not need to produce `SignedAggregateAndProof` if we do not have | ||
| // any validators for the given `slot` and `committee_index`. | ||
| if validator_duties.is_empty() { | ||
| return Ok(()); | ||
| } | ||
| 
     | 
||
| // Step 1. | ||
| // | ||
| // Download, sign and publish an `Attestation` for each validator. | ||
| let attestation_opt = self | ||
| .produce_and_publish_attestations(slot, committee_index, &validator_duties) | ||
| .await | ||
| .map_err(move |e| { | ||
| crit!( | ||
| error = format!("{:?}", e), | ||
| committee_index, | ||
| slot = slot.as_u64(), | ||
| "Error during attestation routine" | ||
| ) | ||
| })?; | ||
| // Wait until the `aggregation_production_instant` (2/3rds | ||
| // of the way though the slot). As verified in the | ||
| // `delay_triggers_when_in_the_past` test, this code will still run | ||
| // even if the instant has already elapsed. | ||
| sleep_until(aggregate_production_instant).await; | ||
| 
     | 
||
| drop(attestations_timer); | ||
| // Start the metrics timer *after* we've done the delay. | ||
| let _aggregates_timer = validator_metrics::start_timer_vec( | ||
| &validator_metrics::ATTESTATION_SERVICE_TIMES, | ||
| &[validator_metrics::AGGREGATES], | ||
| ); | ||
| 
     | 
||
| let attestation_data = { | ||
| let attestation_data_opt = attestation_data.read().await; | ||
| if let Some(attestation_data) = attestation_data_opt.as_ref() { | ||
| attestation_data.clone() | ||
| } else { | ||
| self.beacon_nodes | ||
| .first_success(|beacon_node| async move { | ||
| beacon_node | ||
| .get_validator_attestation_data(slot, 0) | ||
| .await | ||
| .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) | ||
| .map(|result| result.data) | ||
| }) | ||
| .await | ||
| .map_err(|e| { | ||
| error!( | ||
| error = %e, | ||
| slot = slot.as_u64(), | ||
| "Failed to get attestation data from beacon nodes" | ||
| ); | ||
| })? | ||
| } | ||
| }; | ||
| 
     | 
||
| // Step 2. | ||
| // | ||
| // If an attestation was produced, make an aggregate. | ||
| if let Some(attestation_data) = attestation_opt { | ||
| // First, wait until the `aggregation_production_instant` (2/3rds | ||
| // of the way though the slot). As verified in the | ||
| // `delay_triggers_when_in_the_past` test, this code will still run | ||
| // even if the instant has already elapsed. | ||
| sleep_until(aggregate_production_instant).await; | ||
| 
     | 
||
| // Start the metrics timer *after* we've done the delay. | ||
| let _aggregates_timer = validator_metrics::start_timer_vec( | ||
| &validator_metrics::ATTESTATION_SERVICE_TIMES, | ||
| &[validator_metrics::AGGREGATES], | ||
| ); | ||
| 
     | 
||
| // Then download, sign and publish a `SignedAggregateAndProof` for each | ||
| // validator that is elected to aggregate for this `slot` and | ||
| // `committee_index`. | ||
| self.produce_and_publish_aggregates( | ||
| &attestation_data, | ||
| committee_index, | ||
| &validator_duties, | ||
| ) | ||
| // Download, sign and publish a `SignedAggregateAndProof` for each | ||
| // validator that is elected to aggregate for this `slot` and | ||
| // `committee_index`. | ||
| self.produce_and_publish_aggregates(&attestation_data, committee_index, &validator_duties) | ||
| .await | ||
| .map_err(move |e| { | ||
| crit!( | ||
| error = format!("{:?}", e), | ||
| committee_index, | ||
| slot = slot.as_u64(), | ||
| "Error during attestation routine" | ||
| "Error during aggregate attestation routine" | ||
| ) | ||
| })?; | ||
| } | ||
| 
     | 
||
| Ok(()) | ||
| } | ||
| 
        
          
        
         | 
    @@ -328,14 +355,19 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S, | |
| /// | ||
| /// Only one `Attestation` is downloaded from the BN. It is then cloned and signed by each | ||
| /// validator and the list of individually-signed `Attestation` objects is returned to the BN. | ||
| async fn produce_and_publish_attestations( | ||
| async fn publish_attestations( | ||
| &self, | ||
| slot: Slot, | ||
| committee_index: CommitteeIndex, | ||
| validator_duties: &[DutyAndProof], | ||
| ) -> Result<Option<AttestationData>, String> { | ||
| attestation_data: AttestationData, | ||
| ) -> Result<(), String> { | ||
| let attestations_timer = validator_metrics::start_timer_vec( | ||
| &validator_metrics::ATTESTATION_SERVICE_TIMES, | ||
| &[validator_metrics::ATTESTATIONS], | ||
| ); | ||
| 
     | 
||
| if validator_duties.is_empty() { | ||
| return Ok(None); | ||
| return Ok(()); | ||
| } | ||
| 
     | 
||
| let current_epoch = self | ||
| 
        
          
        
         | 
    @@ -344,22 +376,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S, | |
| .ok_or("Unable to determine current slot from clock")? | ||
| .epoch(S::E::slots_per_epoch()); | ||
| 
     | 
||
| let attestation_data = self | ||
| .beacon_nodes | ||
| .first_success(|beacon_node| async move { | ||
| let _timer = validator_metrics::start_timer_vec( | ||
| &validator_metrics::ATTESTATION_SERVICE_TIMES, | ||
| &[validator_metrics::ATTESTATIONS_HTTP_GET], | ||
| ); | ||
| beacon_node | ||
| .get_validator_attestation_data(slot, committee_index) | ||
| .await | ||
| .map_err(|e| format!("Failed to produce attestation data: {:?}", e)) | ||
| .map(|result| result.data) | ||
| }) | ||
| .await | ||
| .map_err(|e| e.to_string())?; | ||
| 
     | 
||
| // Create futures to produce signed `Attestation` objects. | ||
| let attestation_data_ref = &attestation_data; | ||
| let signing_futures = validator_duties.iter().map(|duty_and_proof| async move { | ||
| 
        
          
        
         | 
    @@ -372,7 +388,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S, | |
| validator = ?duty.pubkey, | ||
| duty_slot = %duty.slot, | ||
| attestation_slot = %attestation_data.slot, | ||
| duty_index = duty.committee_index, | ||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could keep these logs (these won't be 0, because the duty still contains the real committee index). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, revised in 0967696  | 
||
| attestation_index = attestation_data.index, | ||
| "Inconsistent validator duties during signing" | ||
| ); | ||
| 
          
            
          
           | 
    @@ -418,7 +433,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S, | |
| info = "a validator may have recently been removed from this VC", | ||
| pubkey = ?pubkey, | ||
| validator = ?duty.pubkey, | ||
| committee_index = committee_index, | ||
| slot = slot.as_u64(), | ||
| "Missing pubkey for attestation" | ||
| ); | ||
| 
        
          
        
         | 
    @@ -428,7 +442,6 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S, | |
| crit!( | ||
| error = ?e, | ||
| validator = ?duty.pubkey, | ||
| committee_index, | ||
| slot = slot.as_u64(), | ||
| "Failed to sign attestation" | ||
| ); | ||
| 
        
          
        
         | 
    @@ -446,7 +459,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S, | |
| 
     | 
||
| if attestations.is_empty() { | ||
| warn!("No attestations were published"); | ||
| return Ok(None); | ||
| return Ok(()); | ||
| } | ||
| let fork_name = self | ||
| .chain_spec | ||
| 
          
            
          
           | 
    @@ -507,7 +520,8 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S, | |
| ), | ||
| } | ||
| 
     | 
||
| Ok(Some(attestation_data)) | ||
| drop(attestations_timer); | ||
                
       | 
||
| Ok(()) | ||
| } | ||
| 
     | 
||
| /// Performs the second step of the attesting process: downloading an aggregated `Attestation`, | ||
| 
          
            
          
           | 
    ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could use
spawn_handleand thenawaitthe handle to get the attestation data from inside the future (rather than using theattestation_data_cacheRwLock).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case the handle doesn't return the result, it's an error and we probably need to just stop the attestation process for this slot (return
Err(..))There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the comment, I revised in 0967696