@@ -25,11 +25,14 @@ use lightning_block_sync::rpc::RpcClient;
2525use lightning_block_sync:: { AsyncBlockSourceResult , BlockData , BlockHeaderData , BlockSource } ;
2626use serde_json;
2727use std:: collections:: HashMap ;
28+ use std:: future:: Future ;
2829use std:: str:: FromStr ;
2930use std:: sync:: atomic:: { AtomicU32 , Ordering } ;
3031use std:: sync:: Arc ;
3132use std:: time:: Duration ;
3233
34+ use tokio:: runtime:: { self , Runtime } ;
35+
3336pub struct BitcoindClient {
3437 pub ( crate ) bitcoind_rpc_client : Arc < RpcClient > ,
3538 network : Network ,
@@ -38,7 +41,8 @@ pub struct BitcoindClient {
3841 rpc_user : String ,
3942 rpc_password : String ,
4043 fees : Arc < HashMap < ConfirmationTarget , AtomicU32 > > ,
41- handle : tokio:: runtime:: Handle ,
44+ main_runtime_handle : runtime:: Handle ,
45+ inner_runtime : Arc < Runtime > ,
4246 logger : Arc < FilesystemLogger > ,
4347}
4448
@@ -66,7 +70,7 @@ const MIN_FEERATE: u32 = 253;
6670impl BitcoindClient {
6771 pub ( crate ) async fn new (
6872 host : String , port : u16 , rpc_user : String , rpc_password : String , network : Network ,
69- handle : tokio :: runtime:: Handle , logger : Arc < FilesystemLogger > ,
73+ handle : runtime:: Handle , logger : Arc < FilesystemLogger > ,
7074 ) -> std:: io:: Result < Self > {
7175 let http_endpoint = HttpEndpoint :: for_host ( host. clone ( ) ) . with_port ( port) ;
7276 let rpc_credentials =
@@ -95,6 +99,15 @@ impl BitcoindClient {
9599 fees. insert ( ConfirmationTarget :: ChannelCloseMinimum , AtomicU32 :: new ( MIN_FEERATE ) ) ;
96100 fees. insert ( ConfirmationTarget :: OutputSpendingFee , AtomicU32 :: new ( MIN_FEERATE ) ) ;
97101
102+ let mut builder = runtime:: Builder :: new_multi_thread ( ) ;
103+ let runtime =
104+ builder. enable_all ( ) . worker_threads ( 1 ) . thread_name ( "rpc-worker" ) . build ( ) . unwrap ( ) ;
105+ let inner_runtime = Arc :: new ( runtime) ;
106+ // Tokio will panic if we drop a runtime while in another runtime. Because the entire
107+ // application runs inside a tokio runtime, we have to ensure this runtime is never
108+ // `drop`'d, which we do by leaking an Arc reference.
109+ std:: mem:: forget ( Arc :: clone ( & inner_runtime) ) ;
110+
98111 let client = Self {
99112 bitcoind_rpc_client : Arc :: new ( bitcoind_rpc_client) ,
100113 host,
@@ -103,7 +116,8 @@ impl BitcoindClient {
103116 rpc_password,
104117 network,
105118 fees : Arc :: new ( fees) ,
106- handle : handle. clone ( ) ,
119+ main_runtime_handle : handle. clone ( ) ,
120+ inner_runtime,
107121 logger,
108122 } ;
109123 BitcoindClient :: poll_for_fee_estimates (
@@ -226,10 +240,42 @@ impl BitcoindClient {
226240 } ) ;
227241 }
228242
243+ fn run_future_in_blocking_context < F : Future + Send + ' static > ( & self , future : F ) -> F :: Output
244+ where
245+ F :: Output : Send + ' static ,
246+ {
247+ // Tokio deliberately makes it nigh impossible to block on a future in a sync context that
248+ // is running in an async task (which makes it really hard to interact with sync code that
249+ // has callbacks in an async project).
250+ //
251+ // Reading the docs, it *seems* like
252+ // `tokio::task::block_in_place(tokio::runtime::Handle::spawn(future))` should do the
253+ // trick, and 99.999% of the time it does! But tokio has a "non-stealable I/O driver" - if
254+ // the task we're running happens to, by sheer luck, be holding the "I/O driver" when we go
255+ // into a `block_in_place` call, and the inner future requires I/O (which of course it
256+ // does, its a future!), the whole thing will come to a grinding halt as no other thread is
257+ // allowed to poll I/O until the blocked one finishes.
258+ //
259+ // This is, of course, nuts, and an almost trivial performance penalty of occasional
260+ // additional wakeups would solve this, but tokio refuses to do so because any performance
261+ // penalty at all would be too much (tokio issue #4730).
262+ //
263+ // Instead, we have to do a rather insane dance - we have to spawn the `future` we want to
264+ // run on a *different* (threaded) tokio runtime (doing the `block_in_place` dance to avoid
265+ // blocking too many threads on the main runtime). We want to block on that `future` being
266+ // run on the other runtime's threads, but tokio only provides `block_on` to do so, which
267+ // runs the `future` itself on the current thread, panicing if this thread is already a
268+ // part of a tokio runtime (which in this case it is - the main tokio runtime). Thus, we
269+ // have to `spawn` the `future` on the secondary runtime and then `block_on` the resulting
270+ // `JoinHandle` on the main runtime.
271+ tokio:: task:: block_in_place ( move || {
272+ self . main_runtime_handle . block_on ( self . inner_runtime . spawn ( future) ) . unwrap ( )
273+ } )
274+ }
275+
229276 pub fn get_new_rpc_client ( & self ) -> RpcClient {
230277 let http_endpoint = HttpEndpoint :: for_host ( self . host . clone ( ) ) . with_port ( self . port ) ;
231- let rpc_credentials =
232- base64:: encode ( format ! ( "{}:{}" , self . rpc_user. clone( ) , self . rpc_password. clone( ) ) ) ;
278+ let rpc_credentials = base64:: encode ( format ! ( "{}:{}" , self . rpc_user, self . rpc_password) ) ;
233279 RpcClient :: new ( & rpc_credentials, http_endpoint)
234280 }
235281
@@ -273,22 +319,28 @@ impl BitcoindClient {
273319 . unwrap ( ) ;
274320 }
275321
276- pub async fn sign_raw_transaction_with_wallet ( & self , tx_hex : String ) -> SignedTx {
322+ pub fn sign_raw_transaction_with_wallet (
323+ & self , tx_hex : String ,
324+ ) -> impl Future < Output = SignedTx > {
277325 let tx_hex_json = serde_json:: json!( tx_hex) ;
278- self . bitcoind_rpc_client
279- . call_method ( "signrawtransactionwithwallet" , & vec ! [ tx_hex_json] )
280- . await
281- . unwrap ( )
326+ let rpc_client = self . get_new_rpc_client ( ) ;
327+ async move {
328+ rpc_client
329+ . call_method ( "signrawtransactionwithwallet" , & vec ! [ tx_hex_json] )
330+ . await
331+ . unwrap ( )
332+ }
282333 }
283334
284- pub async fn get_new_address ( & self ) -> Address {
335+ pub fn get_new_address ( & self ) -> impl Future < Output = Address > {
285336 let addr_args = vec ! [ serde_json:: json!( "LDK output address" ) ] ;
286- let addr = self
287- . bitcoind_rpc_client
288- . call_method :: < NewAddress > ( "getnewaddress" , & addr_args)
289- . await
290- . unwrap ( ) ;
291- Address :: from_str ( addr. 0 . as_str ( ) ) . unwrap ( ) . require_network ( self . network ) . unwrap ( )
337+ let network = self . network ;
338+ let rpc_client = self . get_new_rpc_client ( ) ;
339+ async move {
340+ let addr =
341+ rpc_client. call_method :: < NewAddress > ( "getnewaddress" , & addr_args) . await . unwrap ( ) ;
342+ Address :: from_str ( addr. 0 . as_str ( ) ) . unwrap ( ) . require_network ( network) . unwrap ( )
343+ }
292344 }
293345
294346 pub async fn get_blockchain_info ( & self ) -> BlockchainInfo {
@@ -298,11 +350,11 @@ impl BitcoindClient {
298350 . unwrap ( )
299351 }
300352
301- pub async fn list_unspent ( & self ) -> ListUnspentResponse {
302- self . bitcoind_rpc_client
303- . call_method :: < ListUnspentResponse > ( "listunspent" , & vec ! [ ] )
304- . await
305- . unwrap ( )
353+ pub fn list_unspent ( & self ) -> impl Future < Output = ListUnspentResponse > {
354+ let rpc_client = self . get_new_rpc_client ( ) ;
355+ async move {
356+ rpc_client . call_method :: < ListUnspentResponse > ( "listunspent" , & vec ! [ ] ) . await . unwrap ( )
357+ }
306358 }
307359}
308360
@@ -324,7 +376,7 @@ impl BroadcasterInterface for BitcoindClient {
324376 let txn = txs. iter ( ) . map ( |tx| encode:: serialize_hex ( tx) ) . collect :: < Vec < _ > > ( ) ;
325377 let bitcoind_rpc_client = Arc :: clone ( & self . bitcoind_rpc_client ) ;
326378 let logger = Arc :: clone ( & self . logger ) ;
327- self . handle . spawn ( async move {
379+ self . main_runtime_handle . spawn ( async move {
328380 let res = if txn. len ( ) == 1 {
329381 let tx_json = serde_json:: json!( txn[ 0 ] ) ;
330382 bitcoind_rpc_client
@@ -355,17 +407,15 @@ impl BroadcasterInterface for BitcoindClient {
355407
356408impl ChangeDestinationSource for BitcoindClient {
357409 fn get_change_destination_script ( & self ) -> Result < ScriptBuf , ( ) > {
358- tokio:: task:: block_in_place ( move || {
359- Ok ( self . handle . block_on ( async move { self . get_new_address ( ) . await . script_pubkey ( ) } ) )
360- } )
410+ let future = self . get_new_address ( ) ;
411+ Ok ( self . run_future_in_blocking_context ( async move { future. await . script_pubkey ( ) } ) )
361412 }
362413}
363414
364415impl WalletSource for BitcoindClient {
365416 fn list_confirmed_utxos ( & self ) -> Result < Vec < Utxo > , ( ) > {
366- let utxos = tokio:: task:: block_in_place ( move || {
367- self . handle . block_on ( async move { self . list_unspent ( ) . await } ) . 0
368- } ) ;
417+ let future = self . list_unspent ( ) ;
418+ let utxos = self . run_future_in_blocking_context ( async move { future. await . 0 } ) ;
369419 Ok ( utxos
370420 . into_iter ( )
371421 . filter_map ( |utxo| {
@@ -398,18 +448,16 @@ impl WalletSource for BitcoindClient {
398448 }
399449
400450 fn get_change_script ( & self ) -> Result < ScriptBuf , ( ) > {
401- tokio:: task:: block_in_place ( move || {
402- Ok ( self . handle . block_on ( async move { self . get_new_address ( ) . await . script_pubkey ( ) } ) )
403- } )
451+ let future = self . get_new_address ( ) ;
452+ Ok ( self . run_future_in_blocking_context ( async move { future. await . script_pubkey ( ) } ) )
404453 }
405454
406455 fn sign_psbt ( & self , tx : Psbt ) -> Result < Transaction , ( ) > {
407456 let mut tx_bytes = Vec :: new ( ) ;
408457 let _ = tx. unsigned_tx . consensus_encode ( & mut tx_bytes) . map_err ( |_| ( ) ) ;
409458 let tx_hex = hex_utils:: hex_str ( & tx_bytes) ;
410- let signed_tx = tokio:: task:: block_in_place ( move || {
411- self . handle . block_on ( async move { self . sign_raw_transaction_with_wallet ( tx_hex) . await } )
412- } ) ;
459+ let future = self . sign_raw_transaction_with_wallet ( tx_hex) ;
460+ let signed_tx = self . run_future_in_blocking_context ( async move { future. await } ) ;
413461 let signed_tx_bytes = hex_utils:: to_vec ( & signed_tx. hex ) . ok_or ( ( ) ) ?;
414462 Transaction :: consensus_decode ( & mut signed_tx_bytes. as_slice ( ) ) . map_err ( |_| ( ) )
415463 }
0 commit comments