-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Open
Labels
P-mediumPriority: mediumPriority: mediumacceptedReady for implementationReady for implementationbugSomething isn't workingSomething isn't workingpythonRelated to Python PolarsRelated to Python Polars
Description
Checks
- I have checked that this issue has not already been reported.
- I have confirmed this bug exists on the latest version of Polars.
Reproducible example
import asyncio
import pathlib
import polars as pl
import numpy as np
import tqdm
async def async_iter(xs, f, max_in_flight=8):
if hasattr(xs, '__len__'):
N = len(xs)
else:
N = None
coros = (f(x) for x in xs)
futures = set()
def push_task():
try:
coro = next(coros)
task = asyncio.create_task(coro)
futures.add(task)
except StopIteration:
pass
for _ in range(max_in_flight):
push_task()
with tqdm.tqdm(desc='fetch', total=N, position=0) as pbar:
pbar.set_description(f'fetch: {len(futures)} in flight')
while futures:
done, _ = await asyncio.wait(
futures,
return_when=asyncio.FIRST_COMPLETED
)
for task in done:
result = task.result()
futures.discard(task)
push_task()
pbar.set_description(f'fetch ({len(futures)} in flight)')
pbar.update()
yield result
async def collect_all(xs, f, max_in_flight=8):
ret = []
async for x in async_iter(xs, f, max_in_flight=max_in_flight):
ret.append(x)
return ret
def main():
for i in tqdm.trange(32):
B = 1024*1024
dst = pathlib.Path('mock') / f'{i:02d}.parquet'
dst.parent.mkdir(exist_ok=True)
if pathlib.Path(dst).exists():
continue
else:
pl.DataFrame(data={
'y': np.random.randn(B),
}).write_parquet(dst, row_group_size=1024*32)
files = list(pathlib.Path('mock').rglob('*.parquet'))
print(files)
N, engine = 1024, "in-memory" # freezes
# N, engine = 1024, "streaming" # runs
# N, engine = 1, "streaming" # freezes
# N, engine = 32, "streaming" # freezes after a while, sometimes works.
async def fetch(source):
lf = pl.scan_parquet(str(source))
lf = lf.sort('y').head(N)
df = await lf.collect_async(engine=engine)
return df
with asyncio.Runner() as runner:
result = runner.run(collect_all(files, fetch))
print(result)
if __name__ == '__main__':
main()
Log output
fetch: 8 in flight: 0%| | 0/32 [00:00<?, ?it/s]_init_credential_provider_builder(): credential_provider_init = None
_init_credential_provider_builder(): credential_provider_init = None
_init_credential_provider_builder(): credential_provider_init = None
_init_credential_provider_builder(): credential_provider_init = None
_init_credential_provider_builder(): credential_provider_init = None
_init_credential_provider_builder(): credential_provider_init = None
_init_credential_provider_builder(): credential_provider_init = None
_init_credential_provider_builder(): credential_provider_init = None
PREFILL CACHES
CachePrefiller: concurrent streaming scan exec limit: 8
async thread count: 4
CachePrefiller: wait for 1 scan executors
PREFILL CACHES
CachePrefiller: concurrent streaming scan exec limit: 8
CachePrefiller: wait for 1 scan executors
PREFILL CACHES
CachePrefiller: concurrent streaming scan exec limit: 8
CachePrefiller: wait for 1 scan executors
PREFILL CACHES
CachePrefiller: concurrent streaming scan exec limit: 8
CachePrefiller: wait for 1 scan executors
polars-stream: updating graph state
PREFILL CACHES
CachePrefiller: concurrent streaming scan exec limit: 8
CachePrefiller: wait for 1 scan executors
PREFILL CACHES
CachePrefiller: concurrent streaming scan exec limit: 8
CachePrefiller: wait for 1 scan executors
polars-stream: updating graph state
polars-stream: updating graph state
PREFILL CACHES
CachePrefiller: concurrent streaming scan exec limit: 8
CachePrefiller: wait for 1 scan executors
polars-stream: updating graph state
polars-stream: running in-memory-sink in subgraph
polars-stream: running multi-scan[parquet] in subgraph
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: None, include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[MultiScan]: Initialize source 0
polars-stream: running in-memory-sink in subgraph
polars-stream: running multi-scan[parquet] in subgraph
polars-stream: updating graph state
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: None, include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[MultiScan]: Initialize source 0
polars-stream: running in-memory-sink in subgraph
polars-stream: running multi-scan[parquet] in subgraph
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: None, include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[MultiScan]: Initialize source 0
polars-stream: updating graph state
polars-stream: running multi-scan[parquet] in subgraph
polars-stream: running in-memory-sink in subgraph
polars-stream: running multi-scan[parquet] in subgraph
polars-stream: running in-memory-sink in subgraph
polars-stream: running in-memory-sink in subgraph
polars-stream: running multi-scan[parquet] in subgraph
[ReaderStarter]: scan_source_idx: 0
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: received reader (n_readers_received: 1)
[ReaderStarter]: scan_source_idx: 0
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: received reader (n_readers_received: 1)
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
polars-stream: updating graph state
[ReaderStarter]: scan_source_idx: 0
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: received reader (n_readers_received: 1)
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: None, external_filter_mask: None, file_iceberg_schema: None
memory prefetch function: madvise_willneed
[ParquetFileReader]: project: 1 / 1, pre_slice: None, resolved_pre_slice: None, row_index: None, predicate: None
[ParquetFileReader]: Config { num_pipelines: 8, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[ParquetFileReader]: ideal_morsel_size: 100000
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: None, include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: None, include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: None, external_filter_mask: None, file_iceberg_schema: None
memory prefetch function: madvise_willneed
[ParquetFileReader]: project: 1 / 1, pre_slice: None, resolved_pre_slice: None, row_index: None, predicate: None
[ParquetFileReader]: Config { num_pipelines: 8, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[ParquetFileReader]: ideal_morsel_size: 100000
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: None, include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: None, external_filter_mask: None, file_iceberg_schema: None
memory prefetch function: madvise_willneed
[ParquetFileReader]: project: 1 / 1, pre_slice: None, resolved_pre_slice: None, row_index: None, predicate: None
[ParquetFileReader]: Config { num_pipelines: 8, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[ParquetFileReader]: ideal_morsel_size: 100000
PREFILL CACHES
CachePrefiller: concurrent streaming scan exec limit: 8
CachePrefiller: wait for 1 scan executors
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
polars-stream: updating graph state
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
polars-stream: running multi-scan[parquet] in subgraph
polars-stream: running in-memory-sink in subgraph
polars-stream: running multi-scan[parquet] in subgraph
polars-stream: running in-memory-sink in subgraph
[ReaderStarter]: Stopping (no more readers)
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScan]: Initialize source 0
[MultiScan]: Initialize source 0
[MultiScan]: Initialize source 0
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: None, include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[MultiScan]: Initialize source 0
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: None, include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[ReaderStarter]: Stopping (no more readers)
[MultiScan]: Initialize source 0
[ReaderStarter]: Stopping (no more readers)
[ReaderStarter]: scan_source_idx: 0
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[ReaderStarter]: scan_source_idx: 0
[MultiScanState]: Readers disconnected
[MultiScanState]: Readers disconnected
[ReaderStarter]: scan_source_idx: 0
[AttachReaderToBridge]: received reader (n_readers_received: 1)
polars-stream: done running graph phase
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: received reader (n_readers_received: 1)
[ReaderStarter]: scan_source_idx: 0
[ReaderStarter]: scan_source_idx: 0
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: None, external_filter_mask: None, file_iceberg_schema: None
polars-stream: done running graph phase
polars-stream: updating graph state
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: received reader (n_readers_received: 1)
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: None, external_filter_mask: None, file_iceberg_schema: None
polars-stream: updating graph state
memory prefetch function: madvise_willneed
[ParquetFileReader]: project: 1 / 1, pre_slice: None, resolved_pre_slice: None, row_index: None, predicate: None
[ParquetFileReader]: Config { num_pipelines: 8, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[ParquetFileReader]: ideal_morsel_size: 100000
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: received reader (n_readers_received: 1)
memory prefetch function: madvise_willneed
[ParquetFileReader]: project: 1 / 1, pre_slice: None, resolved_pre_slice: None, row_index: None, predicate: None
[ParquetFileReader]: Config { num_pipelines: 8, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[ParquetFileReader]: ideal_morsel_size: 100000
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: received reader (n_readers_received: 1)
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: None, external_filter_mask: None, file_iceberg_schema: None
memory prefetch function: madvise_willneed
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: None, external_filter_mask: None, file_iceberg_schema: None
memory prefetch function: madvise_willneed
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: None, external_filter_mask: None, file_iceberg_schema: None
memory prefetch function: madvise_willneed
[ParquetFileReader]: project: 1 / 1, pre_slice: None, resolved_pre_slice: None, row_index: None, predicate: None
[ParquetFileReader]: Config { num_pipelines: 8, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[ParquetFileReader]: ideal_morsel_size: 100000
[ParquetFileReader]: project: 1 / 1, pre_slice: None, resolved_pre_slice: None, row_index: None, predicate: None
[ParquetFileReader]: Config { num_pipelines: 8, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[ParquetFileReader]: ideal_morsel_size: 100000
[ParquetFileReader]: project: 1 / 1, pre_slice: None, resolved_pre_slice: None, row_index: None, predicate: None
[ParquetFileReader]: Config { num_pipelines: 8, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[ParquetFileReader]: ideal_morsel_size: 100000
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
[ReaderStarter]: Stopping (no more readers)
[ReaderStarter]: Stopping (no more readers)
[ReaderStarter]: Stopping (no more readers)
[ReaderStarter]: Stopping (no more readers)
[MultiScanState]: Readers disconnected
[ReaderStarter]: Stopping (no more readers)
polars-stream: done running graph phase
polars-stream: updating graph state
[MultiScanState]: Readers disconnected
[MultiScanState]: Readers disconnected
[MultiScanState]: Readers disconnected
polars-stream: done running graph phase
polars-stream: updating graph state
[MultiScanState]: Readers disconnected
polars-stream: done running graph phase
polars-stream: updating graph state
polars-stream: done running graph phase
polars-stream: done running graph phase
polars-stream: updating graph state
polars-stream: updating graph state
[MultiScanState]: Readers disconnected
polars-stream: done running graph phase
polars-stream: updating graph state
Issue description
collect_async freezes unexpectedly, with the freezing being determined by the nature of the query and the engine used.
async def fetch(source):
lf = pl.scan_parquet(str(source))
lf = lf.sort('y').head(N)
df = await lf.collect_async(engine=engine)
return df
For the async function above, engine='in-memory' results in freezing, whereas for the streaming engine it depends on the value of N: for low N it freezes, for high N it (sometimes) works. It also works for max_in_flight < 8 (on my machine, pl.thread_pool_size()=8
).
Expected behavior
The freeze was unexpected.
The issue seems resolved either by reducing the maximum number of async collects in flight, or by not using collect_async
, but instead using asyncio.to_thread(lf.collect, ...)
, i.e:
async def fetch(source):
lf = pl.scan_parquet(str(source))
lf = lf.sort('y').head(N)
df = await asyncio.to_thread(lf.collect, engine=engine)
return df
Installed versions
--------Version info---------
Polars: 1.33.0
Index type: UInt32
Platform: Linux-6.14.0-28-generic-x86_64-with-glibc2.39
Python: 3.13.2 (main, Feb 5 2025, 19:11:32) [Clang 19.1.6 ]
LTS CPU: False
----Optional dependencies----
Azure CLI <not installed>
adbc_driver_manager <not installed>
altair <not installed>
azure.identity <not installed>
boto3 <not installed>
cloudpickle <not installed>
connectorx <not installed>
deltalake <not installed>
fastexcel <not installed>
fsspec <not installed>
gevent <not installed>
google.auth <not installed>
great_tables <not installed>
matplotlib <not installed>
numpy 2.3.2
openpyxl <not installed>
pandas <not installed>
polars_cloud <not installed>
pyarrow 21.0.0
pydantic <not installed>
pyiceberg <not installed>
sqlalchemy <not installed>
torch <not installed>
xlsx2csv <not installed>
xlsxwriter <not installed>
lmocsi
Metadata
Metadata
Assignees
Labels
P-mediumPriority: mediumPriority: mediumacceptedReady for implementationReady for implementationbugSomething isn't workingSomething isn't workingpythonRelated to Python PolarsRelated to Python Polars