pola-rs / polars

Dataframes powered by a multithreaded, vectorized query engine, written in Rust
https://docs.pola.rs
Other
27.86k stars 1.71k forks source link

Expression/context evaluation bug #17057

Open Ge0rges opened 2 weeks ago

Ge0rges commented 2 weeks ago

Checks

Reproducible example

Unable to produce a minimal example yet.

Log output

Traceback (most recent call last):
  File "/Users/GeorgesKanaan/Documents/Development/Methylation/code/my_dmr_analysis.py", line 49, in <module>
    run_analysis("polaribacter_r-contigs", "dmr_by_gene", data_dir, fig_savepath="../plots/plots_5")
  File "/Users/GeorgesKanaan/Documents/Development/Methylation/code/my_dmr_analysis.py", line 28, in run_analysis
    df = group_methyl_data_by_genes(combined_methyl_data, genes)
  File "/Users/GeorgesKanaan/Documents/Development/Methylation/code/utilities/utils.py", line 238, in group_methyl_data_by_genes
    aggregation_dict = {col: aggregate for col in df_filtered.columns[1:-7]}
  File "/Users/GeorgesKanaan/micromamba/envs/jupyter/lib/python3.10/site-packages/polars/lazyframe/frame.py", line 421, in columns
    return self._ldf.columns()
polars.exceptions.ColumnNotFoundError: name

This error occurred with the following context stack:
    [1] 'with_columns' failed
    [2] 'join left' input failed to resolve
    [3] 'filter' input failed to resolve

Issue description

Please forgive me as I was unable to design a minimal reproducible example. I am still working on it.

I essentially have a LazyFrame which I am trying to do the following on:

df = df.with_columns(
    contig=pl.col('name').str.split(by='|').list.get(0),
    start=pl.col('name').str.split(by='|').list.get(2).cast(pl.UInt32),
    stop=pl.col('name').str.split(by='|').list.get(3).cast(pl.UInt32)
)

This fails with the error pasted above, despite df.colllect() completing successfully if run immediately before and producing a column called name (the string value was double checked by printing df.collect().columns: ['name', ...]).

A minimal example where this works:

import polars as pl

# Create a DataFrame with a column 'name' which contains the strings to be split
data = {
    'name': [
        'contig_100089|-|4012|4013',
        'contig_100090|-|4014|4015',
        'contig_100091|-|4016|4017'
    ]
}

# Create a LazyFrame from the data
df = pl.DataFrame(data).lazy()

# Display the LazyFrame
print(df.collect())

# Perform the string split and create new columns
df = df.with_columns(
    contig=pl.col('name').str.split(by='|').list.get(0),
    start=pl.col('name').str.split(by='|').list.get(2).cast(pl.UInt32),
    stop=pl.col('name').str.split(by='|').list.get(3).cast(pl.UInt32)
)

# Collect the result to trigger the LazyFrame computation
result = df.collect()

# Display the result
print(result)

I've tried running my code with a single row, I've tried the beta version of polars and the issue persists.

Here are the function calls involving this dataframe since the last collect(), I hope this is somewhat clear and helpful:

def func1(df_from_csv):
   ...
   pivot_df = methyl_data.collect(streaming=True).pivot(index='name', columns='modified base code and motif', values='Nmod', aggregate_function='first').lazy()

    pivot_df = pivot_df.join(methyl_data.select(['name', 'Ncanonical']), on='name', how='left').unique().fill_null(0)

    return pivot_df.select('name', '21839', 'a', 'm', 'Ncanonical')

def func2():
    ...
    combined_methyl_data = pl.LazyFrame()
    for I in range(10):
       ...
        df = func1(df_from_csv)

        # Add column
        df = df.with_columns(sample=pl.lit("a"))

    return df

df = func2()

# If df.collect() is called here in console it works. If df = df.collect().lazy() is done, the following code doesn't crash

df = df.with_columns(
        contig=pl.col('name').str.split(by='|').list.get(0),
        start=pl.col('name').str.split(by='|').list.get(2).cast(pl.UInt32),
        stop=pl.col('name').str.split(by='|').list.get(3).cast(pl.UInt32)
    )

# Crash occurs here when calling df.collect()

The reason I am reporting this as a bug is because if df = df.collect().lazy() is called immediately prior to my with_columns operations, everything works.

Expected behavior

The code should complete successfully and split the strings in name into 3 columns as indicated.

Installed versions

``` --------Version info--------- Polars: 0.20.31 Index type: UInt32 Platform: macOS-14.5-arm64-arm-64bit Python: 3.10.14 | packaged by conda-forge | (main, Mar 20 2024, 12:51:49) [Clang 16.0.6 ] ----Optional dependencies---- adbc_driver_manager: cloudpickle: 3.0.0 connectorx: deltalake: fastexcel: fsspec: 2024.6.0 gevent: hvplot: matplotlib: 3.8.4 nest_asyncio: 1.5.8 numpy: 1.26.4 openpyxl: 3.1.2 pandas: 2.2.2 pyarrow: 15.0.0 pydantic: 1.10.8 pyiceberg: pyxlsb: sqlalchemy: torch: xlsx2csv: xlsxwriter: ```
Ge0rges commented 2 weeks ago

Prior to the traceback the tail of the log file shows:

RUN STREAMING PIPELINE
[csv -> filter -> hstack -> generic-group_by -> callback -> filter -> ordered_sink, csv -> filter -> hstack -> generic_join_build]
STREAMING CHUNK SIZE: 3571 rows
STREAMING CHUNK SIZE: 7142 rows
OOC group_by started
OOC group_by started
OOC group_by started
OOC group_by started
OOC group_by started
OOC group_by started
OOC group_by started
OOC group_by started
process partition 0 during generic-group_by-source
process partition 1 during generic-group_by-source
process partition 2 during generic-group_by-source
process partition 3 during generic-group_by-source
process partition 4 during generic-group_by-source
process partition 5 during generic-group_by-source
process partition 6 during generic-group_by-source
process partition 7 during generic-group_by-source
process partition 8 during generic-group_by-source
process partition 9 during generic-group_by-source
process partition 10 during generic-group_by-source
process partition 11 during generic-group_by-source
process partition 12 during generic-group_by-source
process partition 13 during generic-group_by-source
process partition 14 during generic-group_by-source
process partition 15 during generic-group_by-source
process partition 16 during generic-group_by-source
process partition 17 during generic-group_by-source
process partition 18 during generic-group_by-source
process partition 19 during generic-group_by-source
process partition 20 during generic-group_by-source
process partition 21 during generic-group_by-source
process partition 22 during generic-group_by-source
process partition 23 during generic-group_by-source
process partition 24 during generic-group_by-source
process partition 25 during generic-group_by-source
process partition 26 during generic-group_by-source
process partition 27 during generic-group_by-source
process partition 28 during generic-group_by-source
process partition 29 during generic-group_by-source
process partition 30 during generic-group_by-source
process partition 31 during generic-group_by-source
process partition 32 during generic-group_by-source
process partition 33 during generic-group_by-source
process partition 34 during generic-group_by-source
process partition 35 during generic-group_by-source
process partition 36 during generic-group_by-source
process partition 37 during generic-group_by-source
process partition 38 during generic-group_by-source
process partition 39 during generic-group_by-source
process partition 40 during generic-group_by-source
process partition 41 during generic-group_by-source
process partition 42 during generic-group_by-source
process partition 43 during generic-group_by-source
process partition 44 during generic-group_by-source
process partition 45 during generic-group_by-source
process partition 46 during generic-group_by-source
process partition 47 during generic-group_by-source
process partition 48 during generic-group_by-source
process partition 49 during generic-group_by-source
process partition 50 during generic-group_by-source
process partition 51 during generic-group_by-source
process partition 52 during generic-group_by-source
process partition 53 during generic-group_by-source
process partition 54 during generic-group_by-source
process partition 55 during generic-group_by-source
process partition 56 during generic-group_by-source
process partition 57 during generic-group_by-source
process partition 58 during generic-group_by-source
process partition 59 during generic-group_by-source
process partition 60 during generic-group_by-source
process partition 61 during generic-group_by-source
process partition 62 during generic-group_by-source
process partition 63 during generic-group_by-source
RUN STREAMING PIPELINE
[csv -> filter -> hstack -> generic-group_by -> callback -> filter -> ordered_sink, csv -> filter -> hstack -> generic_join_build]
STREAMING CHUNK SIZE: 3571 rows
STREAMING CHUNK SIZE: 7142 rows
OOC group_by started
OOC group_by started
OOC group_by started
OOC group_by started
OOC group_by started
OOC group_by started
OOC group_by started
OOC group_by started
process partition 0 during generic-group_by-source
process partition 1 during generic-group_by-source
process partition 2 during generic-group_by-source
process partition 3 during generic-group_by-source
process partition 4 during generic-group_by-source
process partition 5 during generic-group_by-source
process partition 6 during generic-group_by-source
process partition 7 during generic-group_by-source
process partition 8 during generic-group_by-source
process partition 9 during generic-group_by-source
process partition 10 during generic-group_by-source
process partition 11 during generic-group_by-source
process partition 12 during generic-group_by-source
process partition 13 during generic-group_by-source
process partition 14 during generic-group_by-source
process partition 15 during generic-group_by-source
process partition 16 during generic-group_by-source
process partition 17 during generic-group_by-source
process partition 18 during generic-group_by-source
process partition 19 during generic-group_by-source
process partition 20 during generic-group_by-source
process partition 21 during generic-group_by-source
process partition 22 during generic-group_by-source
process partition 23 during generic-group_by-source
process partition 24 during generic-group_by-source
process partition 25 during generic-group_by-source
process partition 26 during generic-group_by-source
process partition 27 during generic-group_by-source
process partition 28 during generic-group_by-source
process partition 29 during generic-group_by-source
process partition 30 during generic-group_by-source
process partition 31 during generic-group_by-source
process partition 32 during generic-group_by-source
process partition 33 during generic-group_by-source
process partition 34 during generic-group_by-source
process partition 35 during generic-group_by-source
process partition 36 during generic-group_by-source
process partition 37 during generic-group_by-source
process partition 38 during generic-group_by-source
process partition 39 during generic-group_by-source
process partition 40 during generic-group_by-source
process partition 41 during generic-group_by-source
process partition 42 during generic-group_by-source
process partition 43 during generic-group_by-source
process partition 44 during generic-group_by-source
process partition 45 during generic-group_by-source
process partition 46 during generic-group_by-source
process partition 47 during generic-group_by-source
process partition 48 during generic-group_by-source
process partition 49 during generic-group_by-source
process partition 50 during generic-group_by-source
process partition 51 during generic-group_by-source
process partition 52 during generic-group_by-source
process partition 53 during generic-group_by-source
process partition 54 during generic-group_by-source
process partition 55 during generic-group_by-source
process partition 56 during generic-group_by-source
process partition 57 during generic-group_by-source
process partition 58 during generic-group_by-source
process partition 59 during generic-group_by-source
process partition 60 during generic-group_by-source
process partition 61 during generic-group_by-source
process partition 62 during generic-group_by-source
process partition 63 during generic-group_by-source
Ge0rges commented 2 weeks ago

When df = df.collect.lazy() is called prior to the problematic code the log file (ending immediately after the call to with_columns) shows:

found multiple sources; run comm_subplan_elim
UNION: `parallel=false` union is run sequentially
join parallel: false
join parallel: false
read files in parallel
avg line length: 67.58008
std. dev. line length: 6.988767
initial row estimate: 2131851
no. of chunks: 8 processed by: 8 threads.
CACHE SET: cache id: 0
CACHE HIT: cache id: 0
estimated unique values: 770990
estimated unique count: 770990 exceeded the boundary: 1000, running default HASH AGGREGATION
INNER join triggered a rechunk of the right DataFrame: 3 columns are affected
INNER join dataframes finished
dataframe filtered
LEFT join dataframes finished
join parallel: false
join parallel: false
read files in parallel
avg line length: 71.887695
std. dev. line length: 3.7583435
initial row estimate: 2079664
no. of chunks: 8 processed by: 8 threads.
CACHE SET: cache id: 1
CACHE HIT: cache id: 1
estimated unique values: 512530
estimated unique count: 512530 exceeded the boundary: 1000, running default HASH AGGREGATION
INNER join triggered a rechunk of the right DataFrame: 3 columns are affected
INNER join dataframes finished
dataframe filtered
LEFT join dataframes finished
join parallel: false
join parallel: false
read files in parallel
avg line length: 71.74707
std. dev. line length: 4.2742543
initial row estimate: 2028932
no. of chunks: 8 processed by: 8 threads.
CACHE SET: cache id: 2
CACHE HIT: cache id: 2
estimated unique values: 637402
estimated unique count: 637402 exceeded the boundary: 1000, running default HASH AGGREGATION
INNER join triggered a rechunk of the right DataFrame: 3 columns are affected
INNER join dataframes finished
dataframe filtered
LEFT join dataframes finished
join parallel: false
join parallel: false
read files in parallel
avg line length: 73.89453
std. dev. line length: 2.0176597
initial row estimate: 1954295
no. of chunks: 8 processed by: 8 threads.
CACHE SET: cache id: 3
CACHE HIT: cache id: 3
estimated unique values: 736951
estimated unique count: 736951 exceeded the boundary: 1000, running default HASH AGGREGATION
INNER join triggered a rechunk of the right DataFrame: 3 columns are affected
INNER join dataframes finished
dataframe filtered
LEFT join dataframes finished
join parallel: false
join parallel: false
read files in parallel
avg line length: 72.50195
std. dev. line length: 2.815622
initial row estimate: 2055499
no. of chunks: 8 processed by: 8 threads.
CACHE SET: cache id: 4
CACHE HIT: cache id: 4
estimated unique values: 403573
estimated unique count: 403573 exceeded the boundary: 1000, running default HASH AGGREGATION
INNER join triggered a rechunk of the right DataFrame: 3 columns are affected
INNER join dataframes finished
dataframe filtered
LEFT join dataframes finished
join parallel: false
join parallel: false
read files in parallel
avg line length: 65.9375
std. dev. line length: 4.904956
initial row estimate: 2201427
no. of chunks: 8 processed by: 8 threads.
CACHE SET: cache id: 5
CACHE HIT: cache id: 5
estimated unique values: 764336
estimated unique count: 764336 exceeded the boundary: 1000, running default HASH AGGREGATION
INNER join triggered a rechunk of the right DataFrame: 3 columns are affected
INNER join dataframes finished
dataframe filtered
LEFT join dataframes finished
join parallel: false
join parallel: false
read files in parallel
avg line length: 70.17578
std. dev. line length: 4.5201983
initial row estimate: 2064369
no. of chunks: 8 processed by: 8 threads.
CACHE SET: cache id: 6
CACHE HIT: cache id: 6
estimated unique values: 465196
estimated unique count: 465196 exceeded the boundary: 1000, running default HASH AGGREGATION
INNER join triggered a rechunk of the right DataFrame: 3 columns are affected
INNER join dataframes finished
dataframe filtered
LEFT join dataframes finished
Ge0rges commented 2 weeks ago

On polars==0.20.0 the log is as follows, same error:

join parallel: false
avg line length: 67.58008
std. dev. line length: 6.988767
initial row estimate: 2131851
no. of chunks: 8 processed by: 8 threads.
estimated unique values: 770990
estimated unique count: 770990 exceeded the boundary: 1000, running default HASH AGGREGATION
INNER join dataframes finished
dataframe filtered
join parallel: false
avg line length: 71.887695
std. dev. line length: 3.7583435
initial row estimate: 2079664
no. of chunks: 8 processed by: 8 threads.
estimated unique values: 512530
estimated unique count: 512530 exceeded the boundary: 1000, running default HASH AGGREGATION
INNER join dataframes finished
dataframe filtered
join parallel: false
avg line length: 71.74707
std. dev. line length: 4.2742543
initial row estimate: 2028932
no. of chunks: 8 processed by: 8 threads.
estimated unique values: 637402
estimated unique count: 637402 exceeded the boundary: 1000, running default HASH AGGREGATION
INNER join dataframes finished
dataframe filtered
join parallel: false
avg line length: 73.89453
std. dev. line length: 2.0176597
initial row estimate: 1954295
no. of chunks: 8 processed by: 8 threads.
estimated unique values: 736951
estimated unique count: 736951 exceeded the boundary: 1000, running default HASH AGGREGATION
INNER join dataframes finished
dataframe filtered
join parallel: false
avg line length: 72.50195
std. dev. line length: 2.815622
initial row estimate: 2055499
no. of chunks: 8 processed by: 8 threads.
estimated unique values: 403573
estimated unique count: 403573 exceeded the boundary: 1000, running default HASH AGGREGATION
INNER join dataframes finished
dataframe filtered
join parallel: false
avg line length: 65.9375
std. dev. line length: 4.904956
initial row estimate: 2201427
no. of chunks: 8 processed by: 8 threads.
estimated unique values: 764336
estimated unique count: 764336 exceeded the boundary: 1000, running default HASH AGGREGATION
INNER join dataframes finished
dataframe filtered
join parallel: false
avg line length: 70.17578
std. dev. line length: 4.5201983
initial row estimate: 2064369
no. of chunks: 8 processed by: 8 threads.
estimated unique values: 465196
estimated unique count: 465196 exceeded the boundary: 1000, running default HASH AGGREGATION
INNER join dataframes finished
dataframe filtered
Traceback (most recent call last):
  File "/Users/GeorgesKanaan/Documents/Development/Methylation/code/my_dmr_analysis.py", line 49, in <module>
    run_analysis("polaribacter_r-contigs", "dmr_by_gene", data_dir, fig_savepath="../plots/plots_5")
  File "/Users/GeorgesKanaan/Documents/Development/Methylation/code/my_dmr_analysis.py", line 28, in run_analysis
    df = group_methyl_data_by_genes(combined_methyl_data, genes)
  File "/Users/GeorgesKanaan/Documents/Development/Methylation/code/utilities/utils.py", line 228, in group_methyl_data_by_genes
    df.collect()
  File "/Users/GeorgesKanaan/micromamba/envs/jupyter/lib/python3.10/site-packages/polars/lazyframe/frame.py", line 1706, in collect
    return wrap_df(ldf.collect())
polars.exceptions.ColumnNotFoundError: name

Error originated just after this operation:
UNION
  PLAN 0:
    DF []; PROJECT */0 COLUMNS; SELECTION: "None"
  PLAN 1:
     WITH_COLUMNS:
     [Utf8(bottom).alias("sample")]
       SELECT [col("name"), col("21839"), col("a"), col("m"), col("Ncanonical")] FROM
         WITH_COLUMNS:
         [col("m").fill_null([0]), col("21839").fill_null([0]), col("a").fill_null([0]), col("Ncanonical").fill_null([0])]
          UNIQUE BY None
            LEFT JOIN:
            LEFT PLAN ON: [col("name")]
              DF ["name", "m", "21839", "a"]; PROJECT */4 COLUMNS; SELECTION: "None"
            RIGHT PLAN ON: [col("name")]
               SELECT [col("name"), col("Ncanonical")] FROM
                FILTER [(col("Nvalid_cov")) == (col("max_valid_cov"))] FROM

                INNER JOIN:
                LEFT PLAN ON: [col("name"), col("mod_group")]
                   WITH_COLUMNS:
                   [col("modified base code and motif").replace([Series, Series, col("modified base code and motif")]).alias("mod_group")]
                    FILTER [(col("Ndiff")) < (col("Nvalid_cov"))] FROM

                     WITH_COLUMNS:
                     [[([([([([([(col("chrom")) + (Utf8(|))]) + (col("strand"))]) + (Utf8(|))]) + (col("inclusive start position").strict_cast(Utf8))]) + (Utf8(|))]) + (col("exclusive end position").strict_cast(Utf8))].alias("name")]
                       SELECT [col("chrom"), col("inclusive start position"), col("exclusive end position"), col("modified base code and motif"), col("strand"), col("Nvalid_cov"), col("fraction modified"), col("Nmod"), col("Ncanonical"), col("Nother_mod"), col("Ndelete"), col("Nfail"), col("Ndiff"), col("Nnocall")] FROM

                          Csv SCAN /Users/GeorgesKanaan/Documents/Development/Methylation/code/../data/methylation_5/polaribacter_r-contigs/bottom.bed
                          PROJECT */18 COLUMNS
                RIGHT PLAN ON: [col("name"), col("mod_group")]
                  AGGREGATE
                    [col("Nvalid_cov").max().alias("max_valid_cov")] BY [col("name"), col("mod_group")] FROM
                     WITH_COLUMNS:
                     [col("modified base code and motif").replace([Series, Series, col("modified base code and motif")]).alias("mod_group")]
                      FILTER [(col("Ndiff")) < (col("Nvalid_cov"))] FROM

                       WITH_COLUMNS:
                       [[([([([([([(col("chrom")) + (Utf8(|))]) + (col("strand"))]) + (Utf8(|))]) + (col("inclusive start position").strict_cast(Utf8))]) + (Utf8(|))]) + (col("exclusive end position").strict_cast(Utf8))].alias("name")]
                         SELECT [col("chrom"), col("inclusive start position"), col("exclusive end position"), col("modified base code and motif"), col("strand"), col("Nvalid_cov"), col("fraction modified"), col("Nmod"), col("Ncanonical"), col("Nother_mod"), col("Ndelete"), col("Nfail"), col("Ndiff"), col("Nnocall")] FROM

                            Csv SCAN /Users/GeorgesKanaan/Documents/Development/Methylation/code/../data/methylation_5/polaribacter_r-contigs/bottom.bed
                            PROJECT */18 COLUMNS
                END INNER JOIN
            END LEFT JOIN
  PLAN 2:
     WITH_COLUMNS:
     [Utf8(barcode11).alias("sample")]
       SELECT [col("name"), col("21839"), col("a"), col("m"), col("Ncanonical")] FROM
         WITH_COLUMNS:
         [col("a").fill_null([0]), col("m").fill_null([0]), col("21839").fill_null([0]), col("Ncanonical").fill_null([0])]
          UNIQUE BY None
            LEFT JOIN:
            LEFT PLAN ON: [col("name")]
              DF ["name", "a", "m", "21839"]; PROJECT */4 COLUMNS; SELECTION: "None"
            RIGHT PLAN ON: [col("name")]
               SELECT [col("name"), col("Ncanonical")] FROM
                FILTER [(col("Nvalid_cov")) == (col("max_valid_cov"))] FROM

                INNER JOIN:
                LEFT PLAN ON: [col("name"), col("mod_group")]
                   WITH_COLUMNS:
                   [col("modified base code and motif").replace([Series, Series, col("modified base code and motif")]).alias("mod_group")]
                    FILTER [(col("Ndiff")) < (col("Nvalid_cov"))] FROM

                     WITH_COLUMNS:
                     [[([([([([([(col("chrom")) + (Utf8(|))]) + (col("strand"))]) + (Utf8(|))]) + (col("inclusive start position").strict_cast(Utf8))]) + (Utf8(|))]) + (col("exclusive end position").strict_cast(Utf8))].alias("name")]
                       SELECT [col("chrom"), col("inclusive start position"), col("exclusive end position"), col("modified base code and motif"), col("strand"), col("Nvalid_cov"), col("fraction modified"), col("Nmod"), col("Ncanonical"), col("Nother_mod"), col("Ndelete"), col("Nfail"), col("Ndiff"), col("Nnocall")] FROM

                          Csv SCAN /Users/GeorgesKanaan/Documents/Development/Methylation/code/../data/methylation_5/polaribacter_r-contigs/barcode11.bed
                          PROJECT */18 COLUMNS
                RIGHT PLAN ON: [col("name"), col("mod_group")]
                  AGGREGATE
                    [col("Nvalid_cov").max().alias("max_valid_cov")] BY [col("name"), col("mod_group")] FROM
                     WITH_COLUMNS:
                     [col("modified base code and motif").replace([Series, Series, col("modified base code and motif")]).alias("mod_group")]
                      FILTER [(col("Ndiff")) < (col("Nvalid_cov"))] FROM

                       WITH_COLUMNS:
                       [[([([([([([(col("chrom")) + (Utf8(|))]) + (col("strand"))]) + (Utf8(|))]) + (col("inclusive start position").strict_cast(Utf8))]) + (Utf8(|))]) + (col("exclusive end position").strict_cast(Utf8))].alias("name")]
                         SELECT [col("chrom"), col("inclusive start position"), col("exclusive end position"), col("modified base code and motif"), col("strand"), col("Nvalid_cov"), col("fraction modified"), col("Nmod"), col("Ncanonical"), col("Nother_mod"), col("Ndelete"), col("Nfail"), col("Ndiff"), col("Nnocall")] FROM

                            Csv SCAN /Users/GeorgesKanaan/Documents/Development/Methylation/code/../data/methylation_5/polaribacter_r-contigs/barcode11.bed
                            PROJECT */18 COLUMNS
                END INNER JOIN
            END LEFT JOIN
  PLAN 3:
     WITH_COLUMNS:
     [Utf8(barcode13).alias("sample")]
       SELECT [col("name"), col("21839"), col("a"), col("m"), col("Ncanonical")] FROM
         WITH_COLUMNS:
         [col("a").fill_null([0]), col("m").fill_null([0]), col("21839").fill_null([0]), col("Ncanonical").fill_null([0])]
          UNIQUE BY None
            LEFT JOIN:
            LEFT PLAN ON: [col("name")]
              DF ["name", "a", "m", "21839"]; PROJECT */4 COLUMNS; SELECTION: "None"
            RIGHT PLAN ON: [col("name")]
               SELECT [col("name"), col("Ncanonical")] FROM
                FILTER [(col("Nvalid_cov")) == (col("max_valid_cov"))] FROM

                INNER JOIN:
                LEFT PLAN ON: [col("name"), col("mod_group")]
                   WITH_COLUMNS:
                   [col("modified base code and motif").replace([Series, Series, col("modified base code and motif")]).alias("mod_group")]
                    FILTER [(col("Ndiff")) < (col("Nvalid_cov"))] FROM

                     WITH_COLUMNS:
                     [[([([([([([(col("chrom")) + (Utf8(|))]) + (col("strand"))]) + (Utf8(|))]) + (col("inclusive start position").strict_cast(Utf8))]) + (Utf8(|))]) + (col("exclusive end position").strict_cast(Utf8))].alias("name")]
                       SELECT [col("chrom"), col("inclusive start position"), col("exclusive end position"), col("modified base code and motif"), col("strand"), col("Nvalid_cov"), col("fraction modified"), col("Nmod"), col("Ncanonical"), col("Nother_mod"), col("Ndelete"), col("Nfail"), col("Ndiff"), col("Nnocall")] FROM

                          Csv SCAN /Users/GeorgesKanaan/Documents/Development/Methylation/code/../data/methylation_5/polaribacter_r-contigs/barcode13.bed
                          PROJECT */18 COLUMNS
                RIGHT PLAN ON: [col("name"), col("mod_group")]
                  AGGREGATE
                    [col("Nvalid_cov").max().alias("max_valid_cov")] BY [col("name"), col("mod_group")] FROM
                     WITH_COLUMNS:
                     [col("modified base code and motif").replace([Series, Series, col("modified base code and motif")]).alias("mod_group")]
                      FILTER [(col("Ndiff")) < (col("Nvalid_cov"))] FROM

                       WITH_COLUMNS:
                       [[([([([([([(col("chrom")) + (Utf8(|))]) + (col("strand"))]) + (Utf8(|))]) + (col("inclusive start position").strict_cast(Utf8))]) + (Utf8(|))]) + (col("exclusive end position").strict_cast(Utf8))].alias("name")]
                         SELECT [col("chrom"), col("inclusive start position"), col("exclusive end position"), col("modified base code and motif"), col("strand"), col("Nvalid_cov"), col("fraction modified"), col("Nmod"), col("Ncanonical"), col("Nother_mod"), col("Ndelete"), col("Nfail"), col("Ndiff"), col("Nnocall")] FROM

                            Csv SCAN /Users/GeorgesKanaan/Documents/Development/Methylation/code/../data/methylation_5/polaribacter_r-contigs/barcode13.bed
                            PROJECT */18 COLUMNS
                END INNER JOIN
            END LEFT JOIN
  PLAN 4:
     WITH_COLUMNS:
     [Utf8(barcode12).alias("sample")]
       SELECT [col("name"), col("21839"), col("a"), col("m"), col("Ncanonical")] FROM
         WITH_COLUMNS:
         [col("m").fill_null([0]), col("21839").fill_null([0]), col("a").fill_null([0]), col("Ncanonical").fill_null([0])]
          UNIQUE BY None
            LEFT JOIN:
            LEFT PLAN ON: [col("name")]
              DF ["name", "m", "21839", "a"]; PROJECT */4 COLUMNS; SELECTION: "None"
            RIGHT PLAN ON: [col("name")]
               SELECT [col("name"), col("Ncanonical")] FROM
                FILTER [(col("Nvalid_cov")) == (col("max_valid_cov"))] FROM

                INNER JOIN:
                LEFT PLAN ON: [col("name"), col("mod_group")]
                   WITH_COLUMNS:
                   [col("modified base code and motif").replace([Series, Series, col("modified base code and motif")]).alias("mod_group")]
                    FILTER [(col("Ndiff")) < (col("Nvalid_cov"))] FROM

                     WITH_COLUMNS:
                     [[([([([([([(col("chrom")) + (Utf8(|))]) + (col("strand"))]) + (Utf8(|))]) + (col("inclusive start position").strict_cast(Utf8))]) + (Utf8(|))]) + (col("exclusive end position").strict_cast(Utf8))].alias("name")]
                       SELECT [col("chrom"), col("inclusive start position"), col("exclusive end position"), col("modified base code and motif"), col("strand"), col("Nvalid_cov"), col("fraction modified"), col("Nmod"), col("Ncanonical"), col("Nother_mod"), col("Ndelete"), col("Nfail"), col("Ndiff"), col("Nnocall")] FROM

                          Csv SCAN /Users/GeorgesKanaan/Documents/Development/Methylation/code/../data/methylation_5/polaribacter_r-contigs/barcode12.bed
                          PROJECT */18 COLUMNS
                RIGHT PLAN ON: [col("name"), col("mod_group")]
                  AGGREGATE
                    [col("Nvalid_cov").max().alias("max_valid_cov")] BY [col("name"), col("mod_group")] FROM
                     WITH_COLUMNS:
                     [col("modified base code and motif").replace([Series, Series, col("modified base code and motif")]).alias("mod_group")]
                      FILTER [(col("Ndiff")) < (col("Nvalid_cov"))] FROM

                       WITH_COLUMNS:
                       [[([([([([([(col("chrom")) + (Utf8(|))]) + (col("strand"))]) + (Utf8(|))]) + (col("inclusive start position").strict_cast(Utf8))]) + (Utf8(|))]) + (col("exclusive end position").strict_cast(Utf8))].alias("name")]
                         SELECT [col("chrom"), col("inclusive start position"), col("exclusive end position"), col("modified base code and motif"), col("strand"), col("Nvalid_cov"), col("fraction modified"), col("Nmod"), col("Ncanonical"), col("Nother_mod"), col("Ndelete"), col("Nfail"), col("Ndiff"), col("Nnocall")] FROM

                            Csv SCAN /Users/GeorgesKanaan/Documents/Development/Methylation/code/../data/methylation_5/polaribacter_r-contigs/barcode12.bed
                            PROJECT */18 COLUMNS
                END INNER JOIN
            END LEFT JOIN
  PLAN 5:
     WITH_COLUMNS:
     [Utf8(barcode14).alias("sample")]
       SELECT [col("name"), col("21839"), col("a"), col("m"), col("Ncanonical")] FROM
         WITH_COLUMNS:
         [col("m").fill_null([0]), col("21839").fill_null([0]), col("a").fill_null([0]), col("Ncanonical").fill_null([0])]
          UNIQUE BY None
            LEFT JOIN:
            LEFT PLAN ON: [col("name")]
              DF ["name", "m", "21839", "a"]; PROJECT */4 COLUMNS; SELECTION: "None"
            RIGHT PLAN ON: [col("name")]
               SELECT [col("name"), col("Ncanonical")] FROM
                FILTER [(col("Nvalid_cov")) == (col("max_valid_cov"))] FROM

                INNER JOIN:
                LEFT PLAN ON: [col("name"), col("mod_group")]
                   WITH_COLUMNS:
                   [col("modified base code and motif").replace([Series, Series, col("modified base code and motif")]).alias("mod_group")]
                    FILTER [(col("Ndiff")) < (col("Nvalid_cov"))] FROM

                     WITH_COLUMNS:
                     [[([([([([([(col("chrom")) + (Utf8(|))]) + (col("strand"))]) + (Utf8(|))]) + (col("inclusive start position").strict_cast(Utf8))]) + (Utf8(|))]) + (col("exclusive end position").strict_cast(Utf8))].alias("name")]
                       SELECT [col("chrom"), col("inclusive start position"), col("exclusive end position"), col("modified base code and motif"), col("strand"), col("Nvalid_cov"), col("fraction modified"), col("Nmod"), col("Ncanonical"), col("Nother_mod"), col("Ndelete"), col("Nfail"), col("Ndiff"), col("Nnocall")] FROM

                          Csv SCAN /Users/GeorgesKanaan/Documents/Development/Methylation/code/../data/methylation_5/polaribacter_r-contigs/barcode14.bed
                          PROJECT */18 COLUMNS
                RIGHT PLAN ON: [col("name"), col("mod_group")]
                  AGGREGATE
                    [col("Nvalid_cov").max().alias("max_valid_cov")] BY [col("name"), col("mod_group")] FROM
                     WITH_COLUMNS:
                     [col("modified base code and motif").replace([Series, Series, col("modified base code and motif")]).alias("mod_group")]
                      FILTER [(col("Ndiff")) < (col("Nvalid_cov"))] FROM

                       WITH_COLUMNS:
                       [[([([([([([(col("chrom")) + (Utf8(|))]) + (col("strand"))]) + (Utf8(|))]) + (col("inclusive start position").strict_cast(Utf8))]) + (Utf8(|))]) + (col("exclusive end position").strict_cast(Utf8))].alias("name")]
                         SELECT [col("chrom"), col("inclusive start position"), col("exclusive end position"), col("modified base code and motif"), col("strand"), col("Nvalid_cov"), col("fraction modified"), col("Nmod"), col("Ncanonical"), col("Nother_mod"), col("Ndelete"), col("Nfail"), col("Ndiff"), col("Nnocall")] FROM

                            Csv SCAN /Users/GeorgesKanaan/Documents/Development/Methylation/code/../data/methylation_5/polaribacter_r-contigs/barcode14.bed
                            PROJECT */18 COLUMNS
                END INNER JOIN
            END LEFT JOIN
  PLAN 6:
     WITH_COLUMNS:
     [Utf8(middle).alias("sample")]
       SELECT [col("name"), col("21839"), col("a"), col("m"), col("Ncanonical")] FROM
         WITH_COLUMNS:
         [col("m").fill_null([0]), col("21839").fill_null([0]), col("a").fill_null([0]), col("Ncanonical").fill_null([0])]
          UNIQUE BY None
            LEFT JOIN:
            LEFT PLAN ON: [col("name")]
              DF ["name", "m", "21839", "a"]; PROJECT */4 COLUMNS; SELECTION: "None"
            RIGHT PLAN ON: [col("name")]
               SELECT [col("name"), col("Ncanonical")] FROM
                FILTER [(col("Nvalid_cov")) == (col("max_valid_cov"))] FROM

                INNER JOIN:
                LEFT PLAN ON: [col("name"), col("mod_group")]
                   WITH_COLUMNS:
                   [col("modified base code and motif").replace([Series, Series, col("modified base code and motif")]).alias("mod_group")]
                    FILTER [(col("Ndiff")) < (col("Nvalid_cov"))] FROM

                     WITH_COLUMNS:
                     [[([([([([([(col("chrom")) + (Utf8(|))]) + (col("strand"))]) + (Utf8(|))]) + (col("inclusive start position").strict_cast(Utf8))]) + (Utf8(|))]) + (col("exclusive end position").strict_cast(Utf8))].alias("name")]
                       SELECT [col("chrom"), col("inclusive start position"), col("exclusive end position"), col("modified base code and motif"), col("strand"), col("Nvalid_cov"), col("fraction modified"), col("Nmod"), col("Ncanonical"), col("Nother_mod"), col("Ndelete"), col("Nfail"), col("Ndiff"), col("Nnocall")] FROM

                          Csv SCAN /Users/GeorgesKanaan/Documents/Development/Methylation/code/../data/methylation_5/polaribacter_r-contigs/middle.bed
                          PROJECT */18 COLUMNS
                RIGHT PLAN ON: [col("name"), col("mod_group")]
                  AGGREGATE
                    [col("Nvalid_cov").max().alias("max_valid_cov")] BY [col("name"), col("mod_group")] FROM
                     WITH_COLUMNS:
                     [col("modified base code and motif").replace([Series, Series, col("modified base code and motif")]).alias("mod_group")]
                      FILTER [(col("Ndiff")) < (col("Nvalid_cov"))] FROM

                       WITH_COLUMNS:
                       [[([([([([([(col("chrom")) + (Utf8(|))]) + (col("strand"))]) + (Utf8(|))]) + (col("inclusive start position").strict_cast(Utf8))]) + (Utf8(|))]) + (col("exclusive end position").strict_cast(Utf8))].alias("name")]
                         SELECT [col("chrom"), col("inclusive start position"), col("exclusive end position"), col("modified base code and motif"), col("strand"), col("Nvalid_cov"), col("fraction modified"), col("Nmod"), col("Ncanonical"), col("Nother_mod"), col("Ndelete"), col("Nfail"), col("Ndiff"), col("Nnocall")] FROM

                            Csv SCAN /Users/GeorgesKanaan/Documents/Development/Methylation/code/../data/methylation_5/polaribacter_r-contigs/middle.bed
                            PROJECT */18 COLUMNS
                END INNER JOIN
            END LEFT JOIN
  PLAN 7:
     WITH_COLUMNS:
     [Utf8(top).alias("sample")]
       SELECT [col("name"), col("21839"), col("a"), col("m"), col("Ncanonical")] FROM
         WITH_COLUMNS:
         [col("m").fill_null([0]), col("21839").fill_null([0]), col("a").fill_null([0]), col("Ncanonical").fill_null([0])]
          UNIQUE BY None
            LEFT JOIN:
            LEFT PLAN ON: [col("name")]
              DF ["name", "m", "21839", "a"]; PROJECT */4 COLUMNS; SELECTION: "None"
            RIGHT PLAN ON: [col("name")]
               SELECT [col("name"), col("Ncanonical")] FROM
                FILTER [(col("Nvalid_cov")) == (col("max_valid_cov"))] FROM

                INNER JOIN:
                LEFT PLAN ON: [col("name"), col("mod_group")]
                   WITH_COLUMNS:
                   [col("modified base code and motif").replace([Series, Series, col("modified base code and motif")]).alias("mod_group")]
                    FILTER [(col("Ndiff")) < (col("Nvalid_cov"))] FROM

                     WITH_COLUMNS:
                     [[([([([([([(col("chrom")) + (Utf8(|))]) + (col("strand"))]) + (Utf8(|))]) + (col("inclusive start position").strict_cast(Utf8))]) + (Utf8(|))]) + (col("exclusive end position").strict_cast(Utf8))].alias("name")]
                       SELECT [col("chrom"), col("inclusive start position"), col("exclusive end position"), col("modified base code and motif"), col("strand"), col("Nvalid_cov"), col("fraction modified"), col("Nmod"), col("Ncanonical"), col("Nother_mod"), col("Ndelete"), col("Nfail"), col("Ndiff"), col("Nnocall")] FROM

                          Csv SCAN /Users/GeorgesKanaan/Documents/Development/Methylation/code/../data/methylation_5/polaribacter_r-contigs/top.bed
                          PROJECT */18 COLUMNS
                RIGHT PLAN ON: [col("name"), col("mod_group")]
                  AGGREGATE
                    [col("Nvalid_cov").max().alias("max_valid_cov")] BY [col("name"), col("mod_group")] FROM
                     WITH_COLUMNS:
                     [col("modified base code and motif").replace([Series, Series, col("modified base code and motif")]).alias("mod_group")]
                      FILTER [(col("Ndiff")) < (col("Nvalid_cov"))] FROM

                       WITH_COLUMNS:
                       [[([([([([([(col("chrom")) + (Utf8(|))]) + (col("strand"))]) + (Utf8(|))]) + (col("inclusive start position").strict_cast(Utf8))]) + (Utf8(|))]) + (col("exclusive end position").strict_cast(Utf8))].alias("name")]
                         SELECT [col("chrom"), col("inclusive start position"), col("exclusive end position"), col("modified base code and motif"), col("strand"), col("Nvalid_cov"), col("fraction modified"), col("Nmod"), col("Ncanonical"), col("Nother_mod"), col("Ndelete"), col("Nfail"), col("Ndiff"), col("Nnocall")] FROM

                            Csv SCAN /Users/GeorgesKanaan/Documents/Development/Methylation/code/../data/methylation_5/polaribacter_r-contigs/top.bed
                            PROJECT */18 COLUMNS
                END INNER JOIN
            END LEFT JOIN
END UNION