Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 0 additions & 28 deletions crates/polars-io/src/pl_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,34 +331,6 @@ impl RuntimeManager {
{
self.rt.spawn_blocking(f)
}

/// Run a task on the rayon threadpool. To avoid deadlocks, if the current thread is already a
/// rayon thread, the task is executed on the current thread after tokio's `block_in_place` is
/// used to spawn another thread to poll futures.
pub async fn spawn_rayon<F, O>(&self, func: F) -> O
where
F: FnOnce() -> O + Send + Sync + 'static,
O: Send + Sync + 'static,
{
if POOL.current_thread_index().is_some() {
// We are a rayon thread, so we can't use POOL.spawn as it would mean we spawn a task and block until
// another rayon thread executes it - we would deadlock if all rayon threads did this.
// Safety: The tokio runtime flavor is multi-threaded.
tokio::task::block_in_place(func)
} else {
let (tx, rx) = tokio::sync::oneshot::channel();

let func = move || {
let out = func();
// Don't unwrap send attempt - async task could be cancelled.
let _ = tx.send(out);
};

POOL.spawn(func);

rx.await.unwrap()
}
}
}

static RUNTIME: LazyLock<RuntimeManager> = LazyLock::new(RuntimeManager::new);
Expand Down
19 changes: 19 additions & 0 deletions crates/polars-io/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,22 @@ pub const URL_ENCODE_CHAR_SET: &percent_encoding::AsciiSet = &percent_encoding::
.add(b':')
.add(b' ')
.add(b'%');

/// Spawns a blocking task to a background thread.
///
/// This uses `pl_async::get_runtime().spawn_blocking` if the `async` feature enabled. It uses
/// `std::thread::spawn` otherwise.
pub fn spawn_blocking<F, R>(func: F)
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
#[cfg(feature = "async")]
{
crate::pl_async::get_runtime().spawn_blocking(func);
}
#[cfg(not(feature = "async"))]
{
std::thread::spawn(func);
}
}
18 changes: 4 additions & 14 deletions crates/polars-lazy/src/frame/exitable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,10 @@ impl LazyFrame {
let token = state.cancel_token();

if physical_plan.is_cache_prefiller() {
#[cfg(feature = "async")]
{
polars_io::pl_async::get_runtime().spawn_blocking(move || {
let result = physical_plan.execute(&mut state);
tx.send(result).unwrap();
});
}
#[cfg(not(feature = "async"))]
{
std::thread::spawn(move || {
let result = physical_plan.execute(&mut state);
tx.send(result).unwrap();
});
}
polars_io::utils::spawn_blocking(move || {
let result = physical_plan.execute(&mut state);
tx.send(result).unwrap();
});
} else {
POOL.spawn_fifo(move || {
let result = physical_plan.execute(&mut state);
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-python/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use polars::prelude::DataFrame;
use pyo3::pyclass;

#[pyclass(frozen)]
#[repr(transparent)]
#[repr(C)]
pub struct PyDataFrame {
pub df: RwLock<DataFrame>,
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-python/src/lazyframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ impl PyLazyFrame {
py.enter_polars_ok(|| {
let ldf = self.ldf.read().clone();

polars_core::POOL.spawn(move || {
polars_io::utils::spawn_blocking(move || {
let result = ldf
.collect_with_engine(engine.0)
.map(PyDataFrame::new)
Expand Down
10 changes: 10 additions & 0 deletions py-polars/tests/unit/io/test_lazy_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,7 @@ def test_scan_parquet_in_mem_to_streaming_dispatch_deadlock_22641() -> None:
os.environ["POLARS_MAX_THREADS"] = "1"
os.environ["POLARS_VERBOSE"] = "1"

import asyncio
import io
import sys
from threading import Thread
Expand All @@ -981,6 +982,7 @@ def test_scan_parquet_in_mem_to_streaming_dispatch_deadlock_22641() -> None:
pl.DataFrame(),
pl.DataFrame(),
pl.DataFrame(),
pl.DataFrame(),
]


Expand Down Expand Up @@ -1008,6 +1010,13 @@ def run():

results[4] = q.collect(background=True).fetch_blocking()

print("QUERY-FENCE", file=sys.stderr)

async def collect_async():
return await q.collect_async()

results[5] = asyncio.run(collect_async())


t = Thread(target=run, daemon=True)
t.start()
Expand All @@ -1019,6 +1028,7 @@ def run():
True,
True,
True,
True,
]

print("OK", end="", file=sys.stderr)
Expand Down
Loading