diff --git a/crates/polars-io/src/pl_async.rs b/crates/polars-io/src/pl_async.rs index 9068fdd393e9..0eff6da99130 100644 --- a/crates/polars-io/src/pl_async.rs +++ b/crates/polars-io/src/pl_async.rs @@ -331,34 +331,6 @@ impl RuntimeManager { { self.rt.spawn_blocking(f) } - - /// Run a task on the rayon threadpool. To avoid deadlocks, if the current thread is already a - /// rayon thread, the task is executed on the current thread after tokio's `block_in_place` is - /// used to spawn another thread to poll futures. - pub async fn spawn_rayon(&self, func: F) -> O - where - F: FnOnce() -> O + Send + Sync + 'static, - O: Send + Sync + 'static, - { - if POOL.current_thread_index().is_some() { - // We are a rayon thread, so we can't use POOL.spawn as it would mean we spawn a task and block until - // another rayon thread executes it - we would deadlock if all rayon threads did this. - // Safety: The tokio runtime flavor is multi-threaded. - tokio::task::block_in_place(func) - } else { - let (tx, rx) = tokio::sync::oneshot::channel(); - - let func = move || { - let out = func(); - // Don't unwrap send attempt - async task could be cancelled. - let _ = tx.send(out); - }; - - POOL.spawn(func); - - rx.await.unwrap() - } - } } static RUNTIME: LazyLock = LazyLock::new(RuntimeManager::new); diff --git a/crates/polars-io/src/utils/mod.rs b/crates/polars-io/src/utils/mod.rs index 3b211ef460cf..909c5682c742 100644 --- a/crates/polars-io/src/utils/mod.rs +++ b/crates/polars-io/src/utils/mod.rs @@ -15,3 +15,22 @@ pub const URL_ENCODE_CHAR_SET: &percent_encoding::AsciiSet = &percent_encoding:: .add(b':') .add(b' ') .add(b'%'); + +/// Spawns a blocking task to a background thread. +/// +/// This uses `pl_async::get_runtime().spawn_blocking` if the `async` feature enabled. It uses +/// `std::thread::spawn` otherwise. +pub fn spawn_blocking(func: F) +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + #[cfg(feature = "async")] + { + crate::pl_async::get_runtime().spawn_blocking(func); + } + #[cfg(not(feature = "async"))] + { + std::thread::spawn(func); + } +} diff --git a/crates/polars-lazy/src/frame/exitable.rs b/crates/polars-lazy/src/frame/exitable.rs index f6fb3b7584bb..1c0a9e18af14 100644 --- a/crates/polars-lazy/src/frame/exitable.rs +++ b/crates/polars-lazy/src/frame/exitable.rs @@ -14,20 +14,10 @@ impl LazyFrame { let token = state.cancel_token(); if physical_plan.is_cache_prefiller() { - #[cfg(feature = "async")] - { - polars_io::pl_async::get_runtime().spawn_blocking(move || { - let result = physical_plan.execute(&mut state); - tx.send(result).unwrap(); - }); - } - #[cfg(not(feature = "async"))] - { - std::thread::spawn(move || { - let result = physical_plan.execute(&mut state); - tx.send(result).unwrap(); - }); - } + polars_io::utils::spawn_blocking(move || { + let result = physical_plan.execute(&mut state); + tx.send(result).unwrap(); + }); } else { POOL.spawn_fifo(move || { let result = physical_plan.execute(&mut state); diff --git a/crates/polars-python/src/dataframe/mod.rs b/crates/polars-python/src/dataframe/mod.rs index e30a63c3058f..adedfd3991e4 100644 --- a/crates/polars-python/src/dataframe/mod.rs +++ b/crates/polars-python/src/dataframe/mod.rs @@ -14,7 +14,7 @@ use polars::prelude::DataFrame; use pyo3::pyclass; #[pyclass(frozen)] -#[repr(transparent)] +#[repr(C)] pub struct PyDataFrame { pub df: RwLock, } diff --git a/crates/polars-python/src/lazyframe/general.rs b/crates/polars-python/src/lazyframe/general.rs index 771baa258981..84fd4d4eeeaf 100644 --- a/crates/polars-python/src/lazyframe/general.rs +++ b/crates/polars-python/src/lazyframe/general.rs @@ -679,7 +679,7 @@ impl PyLazyFrame { py.enter_polars_ok(|| { let ldf = self.ldf.read().clone(); - polars_core::POOL.spawn(move || { + polars_io::utils::spawn_blocking(move || { let result = ldf .collect_with_engine(engine.0) .map(PyDataFrame::new) diff --git a/py-polars/tests/unit/io/test_lazy_parquet.py b/py-polars/tests/unit/io/test_lazy_parquet.py index e3445d4e9168..dc0b66baa18d 100644 --- a/py-polars/tests/unit/io/test_lazy_parquet.py +++ b/py-polars/tests/unit/io/test_lazy_parquet.py @@ -958,6 +958,7 @@ def test_scan_parquet_in_mem_to_streaming_dispatch_deadlock_22641() -> None: os.environ["POLARS_MAX_THREADS"] = "1" os.environ["POLARS_VERBOSE"] = "1" +import asyncio import io import sys from threading import Thread @@ -981,6 +982,7 @@ def test_scan_parquet_in_mem_to_streaming_dispatch_deadlock_22641() -> None: pl.DataFrame(), pl.DataFrame(), pl.DataFrame(), + pl.DataFrame(), ] @@ -1008,6 +1010,13 @@ def run(): results[4] = q.collect(background=True).fetch_blocking() + print("QUERY-FENCE", file=sys.stderr) + + async def collect_async(): + return await q.collect_async() + + results[5] = asyncio.run(collect_async()) + t = Thread(target=run, daemon=True) t.start() @@ -1019,6 +1028,7 @@ def run(): True, True, True, + True, ] print("OK", end="", file=sys.stderr)