Skip to content
1 change: 1 addition & 0 deletions .changes/added/2579.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Clear expiration txs cache in transaction pool based on inserted transactions
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Version 0.41.5]

### Added
- [2579](https://github.com/FuelLabs/fuel-core/pull/2579): Clear expiration txs cache in transaction pool based on inserted transactions

### Changed
- [2387](https://github.com/FuelLabs/fuel-core/pull/2387): Update description `tx-max-depth` flag.
- [2630](https://github.com/FuelLabs/fuel-core/pull/2630): Removed some noisy `tracing::info!` logs
Expand Down
10 changes: 8 additions & 2 deletions crates/services/txpool_v2/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,11 @@ where

/// Remove transaction but keep its dependents.
/// The dependents become executables.
pub fn remove_transaction(&mut self, tx_ids: Vec<TxId>) {
pub fn remove_transactions(
&mut self,
tx_ids: impl Iterator<Item = TxId>,
) -> Vec<ArcPoolTx> {
let mut removed_transactions = Vec::with_capacity(tx_ids.size_hint().0);
for tx_id in tx_ids {
if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) {
let dependents: Vec<S::StorageIndex> =
Expand Down Expand Up @@ -383,10 +387,12 @@ where
.new_executable_transaction(dependent, storage_data);
}
self.update_components_and_caches_on_removal(iter::once(&transaction));
removed_transactions.push(transaction.transaction);
}
}

self.update_stats();
removed_transactions
}

/// Check if the pool has enough space to store a transaction.
Expand Down Expand Up @@ -525,7 +531,7 @@ where
/// Remove transaction and its dependents.
pub fn remove_transaction_and_dependents(
&mut self,
tx_ids: Vec<TxId>,
tx_ids: impl Iterator<Item = TxId>,
) -> Vec<ArcPoolTx> {
let mut removed_transactions = vec![];
for tx_id in tx_ids {
Expand Down
49 changes: 38 additions & 11 deletions crates/services/txpool_v2/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,16 +314,31 @@ where
{
fn import_block(&mut self, result: SharedImportResult) {
let new_height = *result.sealed_block.entity.header().height();
let executed_transaction = result.tx_status.iter().map(|s| s.id).collect();
let executed_transactions: Vec<TxId> =
result.tx_status.iter().map(|s| s.id).collect();
// We don't want block importer way for us to process the result.
drop(result);

{
let removed_transactions = {
let mut tx_pool = self.pool.write();
tx_pool.remove_transaction(executed_transaction);
let removed_transactions =
tx_pool.remove_transactions(executed_transactions.into_iter());
if !tx_pool.is_empty() {
self.shared_state.new_txs_notifier.send_replace(());
}
removed_transactions
};
if !removed_transactions.is_empty() {
let mut height_expiration_txs = self.pruner.height_expiration_txs.write();
for tx in removed_transactions.into_iter() {
let expiration = tx.expiration();
if expiration < u32::MAX.into() {
if let Some(expired_txs) = height_expiration_txs.get_mut(&expiration)
{
expired_txs.remove(&tx.id());
}
}
}
}

{
Expand All @@ -344,8 +359,10 @@ where
let expired_txs = height_expiration_txs.remove(&height);
if let Some(expired_txs) = expired_txs {
let mut tx_pool = self.pool.write();
removed_txs
.extend(tx_pool.remove_transaction_and_dependents(expired_txs));
removed_txs.extend(
tx_pool
.remove_transaction_and_dependents(expired_txs.into_iter()),
);
}
}
}
Expand Down Expand Up @@ -485,7 +502,7 @@ where
if expiration < u32::MAX.into() {
let mut lock = height_expiration_txs.write();
let block_height_expiration = lock.entry(expiration).or_default();
block_height_expiration.push(tx_id);
block_height_expiration.insert(tx_id);
}

let duration = submitted_time
Expand Down Expand Up @@ -661,13 +678,23 @@ where
let removed;
{
let mut pool = self.pool.write();
removed = pool.remove_transaction_and_dependents(txs_to_remove);
removed = pool.remove_transaction_and_dependents(txs_to_remove.into_iter());
}

for tx in removed {
self.shared_state
.tx_status_sender
.send_squeezed_out(tx.id(), Error::Removed(RemovedReason::Ttl));
if !removed.is_empty() {
let mut height_expiration_txs = self.pruner.height_expiration_txs.write();
for tx in removed {
let expiration = tx.expiration();
if expiration < u32::MAX.into() {
if let Some(expired_txs) = height_expiration_txs.get_mut(&expiration)
{
expired_txs.remove(&tx.id());
}
}
self.shared_state
.tx_status_sender
.send_squeezed_out(tx.id(), Error::Removed(RemovedReason::Ttl));
}
}

{
Expand Down
3 changes: 2 additions & 1 deletion crates/services/txpool_v2/src/service/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use fuel_core_types::{
use std::{
collections::{
BTreeMap,
HashSet,
VecDeque,
},
time::SystemTime,
};

pub(super) struct TransactionPruner {
pub time_txs_submitted: Shared<VecDeque<(SystemTime, TxId)>>,
pub height_expiration_txs: Shared<BTreeMap<BlockHeight, Vec<TxId>>>,
pub height_expiration_txs: Shared<BTreeMap<BlockHeight, HashSet<TxId>>>,
pub ttl_timer: tokio::time::Interval,
pub txs_ttl: tokio::time::Duration,
}
Loading