Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock when using async parquet reader #18086

Open
2 tasks done
kszlim opened this issue Aug 7, 2024 · 5 comments
Open
2 tasks done

Deadlock when using async parquet reader #18086

kszlim opened this issue Aug 7, 2024 · 5 comments
Assignees
Labels
A-io-parquet Area: reading/writing Parquet files bug Something isn't working P-medium Priority: medium python Related to Python Polars

Comments

@kszlim
Copy link
Contributor

kszlim commented Aug 7, 2024

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

first do:

pip install -U polars numpy psutil
import os
os.environ["POLARS_CONCURRENCY_BUDGET"] = "1500" # not sure if this is needed or even does anything with the async local reader
os.environ["POLARS_FORCE_ASYNC"] = "1"

import polars as pl
import numpy as np
import psutil
import polars.selectors as cs
import shutil

from collections.abc import Generator, Iterable
from itertools import islice
from typing import TypeVar

T = TypeVar("T")


def create_random_dataframe(num_rows=250000, num_cols=50):
    """Create a random dataframe with specified number of rows and columns."""
    data = {f'col_{i}': np.random.randn(num_rows).astype(np.float32) for i in range(num_cols)}
    return pl.DataFrame(data)

def write_hive_partitioned_dataframes(base_path, num_partitions=2000):
    """Write random dataframes to Hive-partitioned directory, creating the Parquet file only once."""
    # Create the base directory if it doesn't exist
    os.makedirs(base_path, exist_ok=True)
    
    # Create a single random dataframe
    df = create_random_dataframe()
    
    # Write the dataframe to a temporary Parquet file
    temp_file = os.path.join(base_path, "temp.parquet")
    df.write_parquet(temp_file)
    
    for i in range(num_partitions):
        # Create the partition directory
        if i % 10 == 0:
            print(f"Writing {i}")
        partition_path = os.path.join(base_path, f"id={i}")
        os.makedirs(partition_path, exist_ok=True)
        
        # Copy the temporary Parquet file to the partition
        output_path = os.path.join(partition_path, f"part-{i}.parquet")
        if os.path.exists(output_path):
            continue
        shutil.copy(temp_file, output_path)

    # Remove the temporary file
    os.remove(temp_file)

    print(f"Created {num_partitions} Hive-partitioned dataframes in {base_path}")

def batched(iterable: Iterable[T], n: int) -> Generator[tuple[T, ...], None, None]:
    """Yield successive n-sized chunks from iterable."""
    if n < 1:
        msg = "n must be at least one"
        raise ValueError(msg)
    it = iter(iterable)
    while batch := tuple(islice(it, n)):
        yield batch

def get_ldf(base_path):
    ldf = pl.scan_parquet(
        f"{base_path}/**/*.parquet",
        retries=10,
        hive_partitioning=True,
    )
    return ldf

base_path = "hive_partitioned_data"
NUM_IDS = 1500
write_hive_partitioned_dataframes(base_path, NUM_IDS)


base_ldf = get_ldf(base_path)

IDS = list(range(NUM_IDS))

ID_COL = 'id'

exprs = []
for column in base_ldf.columns:
    exprs.append(pl.col(column).over(ID_COL)) # Don't think this is actually required, but seems to make it happen more frequently.

for i in range(200):
    ldfs = []
    for batch in batched(IDS, NUM_IDS // psutil.cpu_count()):
        ldf = base_ldf.filter(pl.col(ID_COL).is_in(batch))
        ldf = ldf.with_columns(exprs)
        ldf = ldf.group_by(ID_COL).agg(cs.float().mean().name.suffix("_mean"), cs.float().median().name.suffix("_median"), cs.float().min().name.suffix("_min"), cs.float().max().name.suffix("_max"), cs.float().std().name.suffix("_std"))
        ldfs.append(ldf)
    print(f"On iteration: {i}")
    dfs = pl.collect_all(ldfs)
    df = pl.concat(dfs)
    print(df)

Log output

Stops running after n number of iterations

Issue description

This code deadlocks after a while, took a few minutes for me (might heavily depend on your computer as it's probably a function of core count, I suspect, but cannot confirm that more cores makes it more likely to occur), i also think more files might help it reproduce more reliably. I'm running this on a c6a.24xlarge in AWS.

Expected behavior

Shouldn't deadlock

Installed versions

--------Version info---------
Polars:               1.4.1
Index type:           UInt32
Platform:             Linux-5.10.220-188.869.x86_64-x86_64-with-glibc2.26
Python:               3.11.7 (main, Dec  5 2023, 22:00:36) [GCC 7.3.1 20180712 (Red Hat 7.3.1-17)]

----Optional dependencies----
adbc_driver_manager:  <not installed>
cloudpickle:          <not installed>
connectorx:           <not installed>
deltalake:            <not installed>
fastexcel:            <not installed>
fsspec:               <not installed>
gevent:               <not installed>
great_tables:         <not installed>
hvplot:               <not installed>
matplotlib:           <not installed>
nest_asyncio:         <not installed>
numpy:                2.0.1
openpyxl:             <not installed>
pandas:               <not installed>
pyarrow:              <not installed>
pydantic:             <not installed>
pyiceberg:            <not installed>
sqlalchemy:           <not installed>
torch:                <not installed>
xlsx2csv:             <not installed>
xlsxwriter:           <not installed>
@kszlim kszlim added bug Something isn't working needs triage Awaiting prioritization by a maintainer python Related to Python Polars labels Aug 7, 2024
@kszlim
Copy link
Contributor Author

kszlim commented Aug 7, 2024

Closing #14939 as there's a lot of irrelevant information and now there's a MRE

@kszlim
Copy link
Contributor Author

kszlim commented Aug 7, 2024

I don't believe the collect_all is strictly necessary to induce a deadlock, but it does make the deadlock far easier to reproduce.

@coastalwhite coastalwhite added P-medium Priority: medium A-io-parquet Area: reading/writing Parquet files and removed needs triage Awaiting prioritization by a maintainer labels Aug 8, 2024
@coastalwhite coastalwhite self-assigned this Aug 8, 2024
@kszlim
Copy link
Contributor Author

kszlim commented Aug 13, 2024

@coastalwhite curious if you managed to reproduce this?

@coastalwhite
Copy link
Collaborator

I have not really looked at this yet, sorry.

@kszlim
Copy link
Contributor Author

kszlim commented Aug 13, 2024

No worries! I just want to make sure you have all you need, happy to help with anything needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-io-parquet Area: reading/writing Parquet files bug Something isn't working P-medium Priority: medium python Related to Python Polars
Projects
Status: Ready
Development

No branches or pull requests

2 participants