Skip to content

collect_async freezes. #24329

@Apsod

Description

@Apsod

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

import asyncio
import pathlib

import polars as pl
import numpy as np
import tqdm

async def async_iter(xs, f, max_in_flight=8):
    if hasattr(xs, '__len__'):
        N = len(xs)
    else:
        N = None
    coros = (f(x) for x in xs)
    futures = set()
    def push_task():
        try:
            coro = next(coros)
            task = asyncio.create_task(coro)
            futures.add(task)
        except StopIteration:
            pass
    
    for _ in range(max_in_flight):
        push_task()
    
    with tqdm.tqdm(desc='fetch', total=N, position=0) as pbar:
        pbar.set_description(f'fetch: {len(futures)} in flight')
        while futures:
            done, _ = await asyncio.wait(
                    futures,
                    return_when=asyncio.FIRST_COMPLETED
            )
            for task in done:
                result = task.result()
                futures.discard(task)
                push_task()
                pbar.set_description(f'fetch ({len(futures)} in flight)')
                pbar.update()
                yield result

async def collect_all(xs, f, max_in_flight=8):
    ret = []
    async for x in async_iter(xs, f, max_in_flight=max_in_flight):
        ret.append(x)
    return ret

def main():
    for i in tqdm.trange(32):
        B = 1024*1024
        dst = pathlib.Path('mock') / f'{i:02d}.parquet'
        dst.parent.mkdir(exist_ok=True)
        if pathlib.Path(dst).exists():
            continue
        else:
            pl.DataFrame(data={
                'y': np.random.randn(B),
            }).write_parquet(dst, row_group_size=1024*32)

    files = list(pathlib.Path('mock').rglob('*.parquet'))

    print(files)

    N, engine = 1024, "in-memory" # freezes
    # N, engine = 1024, "streaming" # runs
    # N, engine = 1, "streaming" # freezes
    # N, engine = 32, "streaming" # freezes after a while, sometimes works. 

    async def fetch(source):
        lf = pl.scan_parquet(str(source))
        lf = lf.sort('y').head(N)
        df = await lf.collect_async(engine=engine)
        return df

    with asyncio.Runner() as runner:
        result = runner.run(collect_all(files, fetch))
    print(result)

if __name__ == '__main__':
    main()

Log output

fetch: 8 in flight:   0%|          | 0/32 [00:00<?, ?it/s]_init_credential_provider_builder(): credential_provider_init = None
_init_credential_provider_builder(): credential_provider_init = None
_init_credential_provider_builder(): credential_provider_init = None
_init_credential_provider_builder(): credential_provider_init = None
_init_credential_provider_builder(): credential_provider_init = None
_init_credential_provider_builder(): credential_provider_init = None
_init_credential_provider_builder(): credential_provider_init = None
_init_credential_provider_builder(): credential_provider_init = None
PREFILL CACHES
CachePrefiller: concurrent streaming scan exec limit: 8
async thread count: 4
CachePrefiller: wait for 1 scan executors
PREFILL CACHES
CachePrefiller: concurrent streaming scan exec limit: 8
CachePrefiller: wait for 1 scan executors
PREFILL CACHES
CachePrefiller: concurrent streaming scan exec limit: 8
CachePrefiller: wait for 1 scan executors
PREFILL CACHES
CachePrefiller: concurrent streaming scan exec limit: 8
CachePrefiller: wait for 1 scan executors
polars-stream: updating graph state
PREFILL CACHES
CachePrefiller: concurrent streaming scan exec limit: 8
CachePrefiller: wait for 1 scan executors
PREFILL CACHES
CachePrefiller: concurrent streaming scan exec limit: 8
CachePrefiller: wait for 1 scan executors
polars-stream: updating graph state
polars-stream: updating graph state
PREFILL CACHES
CachePrefiller: concurrent streaming scan exec limit: 8
CachePrefiller: wait for 1 scan executors
polars-stream: updating graph state
polars-stream: running in-memory-sink in subgraph
polars-stream: running multi-scan[parquet] in subgraph
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: None, include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[MultiScan]: Initialize source 0
polars-stream: running in-memory-sink in subgraph
polars-stream: running multi-scan[parquet] in subgraph
polars-stream: updating graph state
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: None, include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[MultiScan]: Initialize source 0
polars-stream: running in-memory-sink in subgraph
polars-stream: running multi-scan[parquet] in subgraph
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: None, include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[MultiScan]: Initialize source 0
polars-stream: updating graph state
polars-stream: running multi-scan[parquet] in subgraph
polars-stream: running in-memory-sink in subgraph
polars-stream: running multi-scan[parquet] in subgraph
polars-stream: running in-memory-sink in subgraph
polars-stream: running in-memory-sink in subgraph
polars-stream: running multi-scan[parquet] in subgraph
[ReaderStarter]: scan_source_idx: 0
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: received reader (n_readers_received: 1)
[ReaderStarter]: scan_source_idx: 0
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: received reader (n_readers_received: 1)
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
polars-stream: updating graph state
[ReaderStarter]: scan_source_idx: 0
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: received reader (n_readers_received: 1)
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: None, external_filter_mask: None, file_iceberg_schema: None
memory prefetch function: madvise_willneed
[ParquetFileReader]: project: 1 / 1, pre_slice: None, resolved_pre_slice: None, row_index: None, predicate: None 
[ParquetFileReader]: Config { num_pipelines: 8, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[ParquetFileReader]: ideal_morsel_size: 100000
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: None, include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: None, include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: None, external_filter_mask: None, file_iceberg_schema: None
memory prefetch function: madvise_willneed
[ParquetFileReader]: project: 1 / 1, pre_slice: None, resolved_pre_slice: None, row_index: None, predicate: None 
[ParquetFileReader]: Config { num_pipelines: 8, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[ParquetFileReader]: ideal_morsel_size: 100000
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: None, include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: None, external_filter_mask: None, file_iceberg_schema: None
memory prefetch function: madvise_willneed
[ParquetFileReader]: project: 1 / 1, pre_slice: None, resolved_pre_slice: None, row_index: None, predicate: None 
[ParquetFileReader]: Config { num_pipelines: 8, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[ParquetFileReader]: ideal_morsel_size: 100000
PREFILL CACHES
CachePrefiller: concurrent streaming scan exec limit: 8
CachePrefiller: wait for 1 scan executors
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
polars-stream: updating graph state
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
polars-stream: running multi-scan[parquet] in subgraph
polars-stream: running in-memory-sink in subgraph
polars-stream: running multi-scan[parquet] in subgraph
polars-stream: running in-memory-sink in subgraph
[ReaderStarter]: Stopping (no more readers)
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScan]: Initialize source 0
[MultiScan]: Initialize source 0
[MultiScan]: Initialize source 0
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: None, include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[MultiScan]: Initialize source 0
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: None, include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[ReaderStarter]: Stopping (no more readers)
[MultiScan]: Initialize source 0
[ReaderStarter]: Stopping (no more readers)
[ReaderStarter]: scan_source_idx: 0
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[ReaderStarter]: scan_source_idx: 0
[MultiScanState]: Readers disconnected
[MultiScanState]: Readers disconnected
[ReaderStarter]: scan_source_idx: 0
[AttachReaderToBridge]: received reader (n_readers_received: 1)
polars-stream: done running graph phase
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: received reader (n_readers_received: 1)
[ReaderStarter]: scan_source_idx: 0
[ReaderStarter]: scan_source_idx: 0
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: None, external_filter_mask: None, file_iceberg_schema: None
polars-stream: done running graph phase
polars-stream: updating graph state
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: received reader (n_readers_received: 1)
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: None, external_filter_mask: None, file_iceberg_schema: None
polars-stream: updating graph state
memory prefetch function: madvise_willneed
[ParquetFileReader]: project: 1 / 1, pre_slice: None, resolved_pre_slice: None, row_index: None, predicate: None 
[ParquetFileReader]: Config { num_pipelines: 8, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[ParquetFileReader]: ideal_morsel_size: 100000
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: received reader (n_readers_received: 1)
memory prefetch function: madvise_willneed
[ParquetFileReader]: project: 1 / 1, pre_slice: None, resolved_pre_slice: None, row_index: None, predicate: None 
[ParquetFileReader]: Config { num_pipelines: 8, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[ParquetFileReader]: ideal_morsel_size: 100000
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: received reader (n_readers_received: 1)
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: None, external_filter_mask: None, file_iceberg_schema: None
memory prefetch function: madvise_willneed
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: None, external_filter_mask: None, file_iceberg_schema: None
memory prefetch function: madvise_willneed
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: None, external_filter_mask: None, file_iceberg_schema: None
memory prefetch function: madvise_willneed
[ParquetFileReader]: project: 1 / 1, pre_slice: None, resolved_pre_slice: None, row_index: None, predicate: None 
[ParquetFileReader]: Config { num_pipelines: 8, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[ParquetFileReader]: ideal_morsel_size: 100000
[ParquetFileReader]: project: 1 / 1, pre_slice: None, resolved_pre_slice: None, row_index: None, predicate: None 
[ParquetFileReader]: Config { num_pipelines: 8, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[ParquetFileReader]: ideal_morsel_size: 100000
[ParquetFileReader]: project: 1 / 1, pre_slice: None, resolved_pre_slice: None, row_index: None, predicate: None 
[ParquetFileReader]: Config { num_pipelines: 8, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[ParquetFileReader]: ideal_morsel_size: 100000
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
[ReaderStarter]: Stopping (no more readers)
[ReaderStarter]: Stopping (no more readers)
[ReaderStarter]: Stopping (no more readers)
[ReaderStarter]: Stopping (no more readers)
[MultiScanState]: Readers disconnected
[ReaderStarter]: Stopping (no more readers)
polars-stream: done running graph phase
polars-stream: updating graph state
[MultiScanState]: Readers disconnected
[MultiScanState]: Readers disconnected
[MultiScanState]: Readers disconnected
polars-stream: done running graph phase
polars-stream: updating graph state
[MultiScanState]: Readers disconnected
polars-stream: done running graph phase
polars-stream: updating graph state
polars-stream: done running graph phase
polars-stream: done running graph phase
polars-stream: updating graph state
polars-stream: updating graph state
[MultiScanState]: Readers disconnected
polars-stream: done running graph phase
polars-stream: updating graph state

Issue description

collect_async freezes unexpectedly, with the freezing being determined by the nature of the query and the engine used.

    async def fetch(source):
        lf = pl.scan_parquet(str(source))
        lf = lf.sort('y').head(N)
        df = await lf.collect_async(engine=engine)
        return df

For the async function above, engine='in-memory' results in freezing, whereas for the streaming engine it depends on the value of N: for low N it freezes, for high N it (sometimes) works. It also works for max_in_flight < 8 (on my machine, pl.thread_pool_size()=8).

Expected behavior

The freeze was unexpected.

The issue seems resolved either by reducing the maximum number of async collects in flight, or by not using collect_async, but instead using asyncio.to_thread(lf.collect, ...), i.e:

    async def fetch(source):
        lf = pl.scan_parquet(str(source))
        lf = lf.sort('y').head(N)
        df = await asyncio.to_thread(lf.collect, engine=engine)
        return df

Installed versions

--------Version info---------
Polars:              1.33.0
Index type:          UInt32
Platform:            Linux-6.14.0-28-generic-x86_64-with-glibc2.39
Python:              3.13.2 (main, Feb  5 2025, 19:11:32) [Clang 19.1.6 ]
LTS CPU:             False

----Optional dependencies----
Azure CLI            <not installed>
adbc_driver_manager  <not installed>
altair               <not installed>
azure.identity       <not installed>
boto3                <not installed>
cloudpickle          <not installed>
connectorx           <not installed>
deltalake            <not installed>
fastexcel            <not installed>
fsspec               <not installed>
gevent               <not installed>
google.auth          <not installed>
great_tables         <not installed>
matplotlib           <not installed>
numpy                2.3.2
openpyxl             <not installed>
pandas               <not installed>
polars_cloud         <not installed>
pyarrow              21.0.0
pydantic             <not installed>
pyiceberg            <not installed>
sqlalchemy           <not installed>
torch                <not installed>
xlsx2csv             <not installed>
xlsxwriter           <not installed>

Metadata

Metadata

Labels

P-mediumPriority: mediumacceptedReady for implementationbugSomething isn't workingpythonRelated to Python Polars

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions