Skip to content

Rare S3 scan_parquet deadlock #25762

@gusostow

Description

@gusostow

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

#!/usr/bin/env python3
import argparse
import logging
import os
import signal
import sys
import time
import uuid
 
import polars as pl
 
pl.Config.set_verbose(True)
 
 
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s.%(msecs)03d [%(levelname)s] %(message)s",
    datefmt="%H:%M:%S",
    stream=sys.stdout,
)
logger = logging.getLogger(__name__)
 
 
def create_test_files(base_path: str, num_sources: int = 10):
    run_id = uuid.uuid4().hex[:8]
    all_files = []
 
    logger.info(f"Creating {num_sources} sources in {base_path}/{run_id}")
    for i in range(num_sources):
        source_path = f"{base_path}/{run_id}/source_{i:02d}"
        files = []
 
        for j in range(3):
            df = pl.DataFrame(
                {
                    "id": range(j * 100, (j + 1) * 100),
                    "value": range(j * 100, (j + 1) * 100),
                }
            )
            file_path = f"{source_path}/data_{j}.parquet"
            df.write_parquet(file_path)
            files.append(file_path)
 
        all_files.append(files)
        if i == 0 or (i + 1) % 5 == 0:
            logger.info(f"  Created {i + 1}/{num_sources} sources")
 
    return all_files
 
 
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--description", default="[NO DESCRIPTION]")
    parser.add_argument(
        "--s3-base-url",
        default=os.environ.get("S3_BASE_URL", "s3://my-test-bucket/polars_repro"),
        help="S3 base URL to use (default: $S3_BASE_URL or 's3://my-test-bucket/polars_repro')",
    )
 
    args = parser.parse_args()
 
    logger.info(f"PID: {os.getpid()}")
    logger.info(f"Polars version: {pl.__version__}")
    logger.info(f"Polars path: {pl.__path__}")
 
    base_path = args.s3_base_url
    all_files = create_test_files(base_path, num_sources=10)
 
    iteration = 0
    while True:
 
        def timeout_handler(*_):
            logger.error(
                f"DEADLOCK DETECTED: description={args.description}, pid={os.getpid()}"
            )
            # Exit with error code to signal deadlock
            sys.exit(1)
 
        # Set up signal handler for timeout
        signal.signal(signal.SIGALRM, timeout_handler)
 
        iteration += 1
        start_time = time.time()
 
        # Set 30-second alarm for this iteration
        signal.alarm(30)
 
        if iteration == 10:
            time.sleep(10)
 
        logger.debug(f"Iteration {iteration:4d}: Starting cache fill")
 
        # I haven't confirmed if filling the object store cache is necessary to reproduce yet, but this was something I
        # was experimenting with
        for i in range(8):
            files = all_files[i % len(all_files)]
            _ = (
                pl.scan_parquet(files[0])
                .with_columns(pl.lit(None).alias("dummy"))
                .select("id")
                .head(1)
                .collect()
            )
 
        logger.debug(
            f"Iteration {iteration:4d}: Cache filled, starting concurrent scan"
        )
 
        source1 = (
            pl.scan_parquet(all_files[0])
            .filter(pl.col("value") > 50)
            .with_columns((pl.col("value") * 2).alias("value_2x"))
            .select(["id", "value", "value_2x"])
        )
        source2 = (
            pl.scan_parquet(all_files[1])
            .filter(pl.col("value") > 50)
            .with_columns((pl.col("value") * 2).alias("value_2x"))
            .select(["id", "value", "value_2x"])
        )
        source3 = (
            pl.scan_parquet(all_files[2])
            .filter(pl.col("value") > 50)
            .with_columns((pl.col("value") * 2).alias("value_2x"))
            .select(["id", "value", "value_2x"])
        )
 
        combined = pl.concat([source1, source2, source3])
 
        logger.debug(f"Iteration {iteration:4d}: Calling collect()")
 
        df = combined.collect()
 
        elapsed = time.time() - start_time
 
        # Cancel the alarm since iteration completed successfully
        signal.alarm(0)
 
        logger.info(f"Iteration {iteration:4d}: OK ({elapsed:.2f}s, {len(df)} rows)")
 
 
if __name__ == "__main__":
    main()

Log output

16:00:27.687 [INFO] Iteration   98: OK (1.69s, 747 rows)
_init_credential_provider_builder(): credential_provider_init = CredentialProviderBuilder(CredentialProviderAWS @ AutoInit)
[CredentialProviderBuilder]: Begin initialize CredentialProviderAWS @ AutoInit clear_cached_credentials = False
CredentialProviderAWS @ AutoInit: AutoInit cache key: bddc7cb02a7d151958448f4d6ec8c2df
Loaded credential provider from cache: CachingCredentialProvider[CredentialProviderAWS @ 0x7f61beeeb370] cache_key = b'\xbd\xdc|\xb0*}\x15\x19XD\x8fMn\xc8\xc2\xdf'
[CredentialProviderBuilder]: Initialized CachingCredentialProvider[CredentialProviderAWS @ 0x7f61beeeb370] from CredentialProviderAWS @ AutoInit
sourcing parquet scan file schema from: 's3://asd-tmp-10d/anadella/simple_sequential_repro/43d8cecf/source_00/data_0.parquet'
[FetchedCredentialsCache]: Call update_func: current_time = 1765227627, last_fetched_expiry = 0
[FetchedCredentialsCache]: Finish update_func: new expiry = 1765229396 (in 1769 seconds)
[FetchedCredentialsCache]: Using cached credentials: current_time = 1765227627, expiry = 1765229396 (in 1769 seconds)
run sink_mem
polars-stream: updating graph state
polars-stream: running in-memory-sink in subgraph
polars-stream: running multi-scan[parquet] in subgraph
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: Some(Positive { offset: 0, len: 1 }), include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[ReaderStarter]: current_row_position: Some(RowCounter { physical_rows: 0, deleted_rows: 0 }), pre_slice_this_file: Some(Positive { offset: 0, len: 1 })
[MultiScan]: Initialize source 0
[ReaderStarter]: scan_source_idx: 0
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: received reader (n_readers_received: 1)
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: Some(Positive { offset: 0, len: 1 }), external_filter_mask: None, file_iceberg_schema: None
memory prefetch function: madvise_willneed
[ParquetFileReader]: project: 1 / 2, pre_slice: Some(Positive { offset: 0, len: 1 }), resolved_pre_slice: Some(Positive { offset: 0, len: 1 }), row_index: None, predicate: None
[ParquetFileReader]: Config { num_pipelines: 80, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[ParquetFileReader]: ideal_morsel_size: 100000
[ParquetFileReader]: Slice pushdown: reading 1 / 1 row groups
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
[ReaderStarter]: current_row_position: Some(RowCounter { physical_rows: 100, deleted_rows: 0 }), pre_slice_this_file: Some(Positive { offset: 0, len: 0 })
[ReaderStarter]: Stopping (pre_slice)
[MultiScanState]: Readers disconnected
polars-stream: done running graph phase
polars-stream: updating graph state
_init_credential_provider_builder(): credential_provider_init = CredentialProviderBuilder(CredentialProviderAWS @ AutoInit)
[CredentialProviderBuilder]: Begin initialize CredentialProviderAWS @ AutoInit clear_cached_credentials = False
CredentialProviderAWS @ AutoInit: AutoInit cache key: bddc7cb02a7d151958448f4d6ec8c2df
Loaded credential provider from cache: CachingCredentialProvider[CredentialProviderAWS @ 0x7f61beeeb370] cache_key = b'\xbd\xdc|\xb0*}\x15\x19XD\x8fMn\xc8\xc2\xdf'
[CredentialProviderBuilder]: Initialized CachingCredentialProvider[CredentialProviderAWS @ 0x7f61beeeb370] from CredentialProviderAWS @ AutoInit
sourcing parquet scan file schema from: 's3://asd-tmp-10d/anadella/simple_sequential_repro/43d8cecf/source_01/data_0.parquet'
[FetchedCredentialsCache]: Call update_func: current_time = 1765227627, last_fetched_expiry = 0
[FetchedCredentialsCache]: Finish update_func: new expiry = 1765229396 (in 1769 seconds)
[FetchedCredentialsCache]: Using cached credentials: current_time = 1765227627, expiry = 1765229396 (in 1769 seconds)
run sink_mem
polars-stream: updating graph state
polars-stream: running multi-scan[parquet] in subgraph
polars-stream: running in-memory-sink in subgraph
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: Some(Positive { offset: 0, len: 1 }), include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[ReaderStarter]: current_row_position: Some(RowCounter { physical_rows: 0, deleted_rows: 0 }), pre_slice_this_file: Some(Positive { offset: 0, len: 1 })
[MultiScan]: Initialize source 0
[ReaderStarter]: scan_source_idx: 0
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: received reader (n_readers_received: 1)
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: Some(Positive { offset: 0, len: 1 }), external_filter_mask: None, file_iceberg_schema: None
memory prefetch function: madvise_willneed
[ParquetFileReader]: project: 1 / 2, pre_slice: Some(Positive { offset: 0, len: 1 }), resolved_pre_slice: Some(Positive { offset: 0, len: 1 }), row_index: None, predicate: None
[ParquetFileReader]: Config { num_pipelines: 80, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[ParquetFileReader]: ideal_morsel_size: 100000
[ParquetFileReader]: Slice pushdown: reading 1 / 1 row groups
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
[ReaderStarter]: current_row_position: Some(RowCounter { physical_rows: 100, deleted_rows: 0 }), pre_slice_this_file: Some(Positive { offset: 0, len: 0 })
[ReaderStarter]: Stopping (pre_slice)
[MultiScanState]: Readers disconnected
polars-stream: done running graph phase
polars-stream: updating graph state
_init_credential_provider_builder(): credential_provider_init = CredentialProviderBuilder(CredentialProviderAWS @ AutoInit)
[CredentialProviderBuilder]: Begin initialize CredentialProviderAWS @ AutoInit clear_cached_credentials = False
CredentialProviderAWS @ AutoInit: AutoInit cache key: bddc7cb02a7d151958448f4d6ec8c2df
Loaded credential provider from cache: CachingCredentialProvider[CredentialProviderAWS @ 0x7f61beeeb370] cache_key = b'\xbd\xdc|\xb0*}\x15\x19XD\x8fMn\xc8\xc2\xdf'
[CredentialProviderBuilder]: Initialized CachingCredentialProvider[CredentialProviderAWS @ 0x7f61beeeb370] from CredentialProviderAWS @ AutoInit
build_object_store: clearing store cache (cache.len(): 8)
sourcing parquet scan file schema from: 's3://asd-tmp-10d/anadella/simple_sequential_repro/43d8cecf/source_02/data_0.parquet'
[FetchedCredentialsCache]: Call update_func: current_time = 1765227628, last_fetched_expiry = 0
[FetchedCredentialsCache]: Finish update_func: new expiry = 1765229396 (in 1768 seconds)
[FetchedCredentialsCache]: Using cached credentials: current_time = 1765227628, expiry = 1765229396 (in 1768 seconds)
run sink_mem
polars-stream: updating graph state
polars-stream: running in-memory-sink in subgraph
polars-stream: running multi-scan[parquet] in subgraph
[MultiScanTaskInit]: 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER | MAPPED_COLUMN_PROJECTION), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: Some(Positive { offset: 0, len: 1 }), include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[ReaderStarter]: current_row_position: Some(RowCounter { physical_rows: 0, deleted_rows: 0 }), pre_slice_this_file: Some(Positive { offset: 0, len: 1 })
[MultiScan]: Initialize source 0
[ReaderStarter]: scan_source_idx: 0
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: Some(Positive { offset: 0, len: 1 }), external_filter_mask: None, file_iceberg_schema: None
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: received reader (n_readers_received: 1)
memory prefetch function: madvise_willneed
[ParquetFileReader]: project: 1 / 2, pre_slice: Some(Positive { offset: 0, len: 1 }), resolved_pre_slice: Some(Positive { offset: 0, len: 1 }), row_index: None, predicate: None
[ParquetFileReader]: Config { num_pipelines: 80, row_group_prefetch_size: 128, target_values_per_thread: 16777216 }
[ParquetFileReader]: ideal_morsel_size: 100000
[ParquetFileReader]: Slice pushdown: reading 1 / 1 row groups
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
[ReaderStarter]: current_row_position: Some(RowCounter { physical_rows: 100, deleted_rows: 0 }), pre_slice_this_file: Some(Positive { offset: 0, len: 0 })
[ReaderStarter]: Stopping (pre_slice)
[MultiScanState]: Readers disconnected
polars-stream: done running graph phase
polars-stream: updating graph state
_init_credential_provider_builder(): credential_provider_init = CredentialProviderBuilder(CredentialProviderAWS @ AutoInit)
[CredentialProviderBuilder]: Begin initialize CredentialProviderAWS @ AutoInit clear_cached_credentials = False
CredentialProviderAWS @ AutoInit: AutoInit cache key: bddc7cb02a7d151958448f4d6ec8c2df
Loaded credential provider from cache: CachingCredentialProvider[CredentialProviderAWS @ 0x7f61beeeb370] cache_key = b'\xbd\xdc|\xb0*}\x15\x19XD\x8fMn\xc8\xc2\xdf'
[CredentialProviderBuilder]: Initialized CachingCredentialProvider[CredentialProviderAWS @ 0x7f61beeeb370] from CredentialProviderAWS @ AutoInit
sourcing parquet scan file schema from: 's3://asd-tmp-10d/anadella/simple_sequential_repro/43d8cecf/source_03/data_0.parquet'
[FetchedCredentialsCache]: Call update_func: current_time = 1765227628, last_fetched_expiry = 0
[FetchedCredentialsCache]: Finish update_func: new expiry = 1765229396 (in 1768 seconds)


(deadlocked here)

Issue description

I've noticed a rare deadlock with S3 scan parquet. We hit it a few times a month in production. The above script hits it but it may take hours or days.

I haven't been able to reproduce when reading parquet files from disk.

I've hit this deadlock on these builds:

  • polars-lts-cpu 1.33.1 (conda-forge)
  • polars 1.36.1 (pypi)
  • polars main debug build

Attached is a GDB backtrace from a deadlocked debug build. deadlock.txt

Notice that four tokio threads (94, 104, 113, 114) are stuck in pl_async::RuntimeManager.block_in_place_on(...). I believe these are all of the four tokio runtime threads that are deadlocked, but I'm not very familiar with async Rust.

Expected behavior

Not to deadlock!

Installed versions

Details
$ python -c 'import polars as pl; print(pl.show_versions())'
--------Version info---------
Polars:              1.36.1
Index type:          UInt32
Platform:            Linux-5.14.0-570.24.1.el9_6.x86_64-x86_64-with-glibc2.34
Python:              3.10.18 | packaged by conda-forge | (main, Jun  4 2025, 14:45:41) [GCC 13.3.0]
Runtime:             rt32
 
----Optional dependencies----
Azure CLI            <not installed>
adbc_driver_manager  1.9.0
altair               6.0.0
azure.identity       1.25.1
boto3                1.41.5
cloudpickle          3.1.2
connectorx           0.4.4
deltalake            1.2.1
fastexcel            0.18.0
fsspec               2025.12.0
gevent               25.9.1
google.auth          2.43.0
great_tables         0.20.0
matplotlib           3.10.8
numpy                1.26.4
openpyxl             3.1.5
pandas               2.3.3
polars_cloud         0.4.2
pyarrow              22.0.0
pydantic             2.12.5
pyiceberg            0.10.0
sqlalchemy           2.0.45
torch                2.0.0+cu117
xlsx2csv             0.8.4

Metadata

Metadata

Assignees

No one assigned

    Labels

    A-io-cloudArea: reading/writing to cloud storageA-io-parquetArea: reading/writing Parquet filesbugSomething isn't workingneeds triageAwaiting prioritization by a maintainerpythonRelated to Python Polars

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions