chdb-io / chdb

chDB is an in-process OLAP SQL Engine 🚀 powered by ClickHouse
https://clickhouse.com/docs/en/chdb
Apache License 2.0
2.03k stars 72 forks source link

Run a query on a pandas dataframe directly #34

Closed schalekamp closed 1 year ago

schalekamp commented 1 year ago

Is there a plan to add the ability to run a sql query directly on a pandas dataframe? Similar to https://pypi.org/project/sqldf/

import pandas as pd
import numpy as np
import chdb

# Create a dummy pd.Dataframe
df = pd.DataFrame({'col1': ['A', 'B', np.NaN, 'C', 'D'], 'col2': ['F', np.NaN, 'G', 'H', 'I']})

# Define a SQL (Clickhouse) query
query = '''
SELECT *
FROM df
WHERE col_1 IS NOT NULL;
'''

# Run the query
df_view = chdb.query(sql, "Dataframe")

My current workaround is to 1) save the dataframe to a parquet file and then 2) run chdb.query on that file I could wrap this into a function that would create a temporary parquet file. But being able to directly query on dataframes seems very convenient to me.

lmangani commented 1 year ago

Not a solution per se, but one possible way to approach this could be latching into the existing stdin/pipe feature of the underlying code from a chdb python object carrying arbitrary data and using the additional flags to define the format, or even the schema of such objects. Here's a working example of the raw stdin/pipe functionality purely for reference:

echo -e "1,2\n3,4" | python3 -m chdb "CREATE TABLE table (a Int64, b Int64) ENGINE = File(CSV, stdin); SELECT a + b as sum FROM table; DROP TABLE table" "CSV"

Until a better method is available, piping the desired data into a subprocess and capturing the output should work for chains:

import pandas as pd
import chdb
import subprocess

df = pd.DataFrame({'a': [2, 2], 'b': [4, 4]})
csv_string = df.to_csv(index=False, header=False)
query = '''
CREATE TABLE table (a Int64, b Int64) ENGINE = File(CSV, stdin); SELECT a + b as sum FROM table;
'''
def spawn_chdb(query, stdin, format):
    proc = subprocess.Popen(['python3', '-m', 'chdb', query, format], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
    custom_input = csv_string
    stdout = proc.communicate(input=stdin.encode())[0].decode('utf-8')
    return stdout

# Call chdb through subprocess with custom stdin input and Dataframe output
res = spawn_chdb(query, csv_string, "Dataframe")

For smaller data sets the following workaround could be used as a ugly hack (no memory optimizations accounted for)

import pandas as pd
import chdb
# Assuming you have a Pandas DataFrame or Series with the values
data = pd.DataFrame({'col1': [2, 4], 'col2': [5, 5]})
# Create a memory table with the correct schema
query = "CREATE TABLE memory_table (col1 Int64, col2 Int64) ENGINE = Memory;"
# Import the values from Pandas (choose the fastest method)
values = ", ".join(f"('{row['col1']}', {row['col2']})" for _, row in data.iterrows())
query += f"INSERT INTO memory_table (col1, col2) VALUES {values};"
# Select the inserted data using any function
query += 'SELECT col1 + col2 as sum FROM memory_table;'
# Print the results
print(chdb.query(query,"Dataframe").data())
auxten commented 1 year ago

Maybe the best implementation is adding a new storage engine support dataframe in Python memory. And that will be very complex.

There is an interesting project called pandahouse doing something similar on Clickhouse server. Inspired by it, I think a quick working way is to write some Python code:

  1. create a temp table
  2. convert dataframe into some insert SQL, and query on the temp table
  3. query on that temp table

It's pretty similar to the way @lmangani proposed. I am not sure if that will better than the "Save to Parquet" way.

auxten commented 1 year ago

@alexey-milovidov would you please give us some advice on implementing that?

lmangani commented 1 year ago

@auxten updated the above example using the temp table approach and data piping to subprocess. I only tested with CSV but I guess it could work with more complex interim data formats too if anyone wants to try it out and profile usage.

import pandas as pd
import chdb
import subprocess

df = pd.DataFrame({'a': [2, 2], 'b': [4, 4]})
csv_string = df.to_csv(index=False, header=False)
query = '''
CREATE TABLE table (a Int64, b Int64) ENGINE = File(CSV, stdin); SELECT a + b as sum FROM table;
'''
def spawn_chdb(query, stdin, format):
    proc = subprocess.Popen(['python3', '-m', 'chdb', query, format], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
    custom_input = csv_string
    stdout = proc.communicate(input=stdin.encode())[0].decode('utf-8')
    return stdout

# Call chdb through subprocess with custom stdin input and Dataframe output
res = spawn_chdb(query, csv_string, "Dataframe")
auxten commented 1 year ago

If we want chdb on dataframe running as fast as some native SQL on dataframe solution. We need to let the Clickhouse engine run on the raw memory of dataframe BlockManager. I haven't got familiar with the storage engine part of Clickhouse which needs some time. And also wondering if this feature is really necessary to implement the "hard way". vote needed.

nmreadelf commented 1 year ago

I propose converting a pandas DataFrame into an Apache Arrow table and subsequently reading the Arrow table into ClickHouse

lmangani commented 1 year ago

@nmreadelf here's an ArrowStream pipe version

import sys
import pandas as pd
import pyarrow as pa
import chdb
import subprocess

df = pd.DataFrame({'a': [2, 2], 'b': [4, 4]})
# convert Dataframe to Arrow stream
data = pa.BufferOutputStream()
pa.RecordBatchStreamWriter(data, pa.Table.from_pandas(df).schema).write_table(pa.Table.from_pandas(df))
data = data.getvalue()
query = '''
CREATE TABLE table ENGINE = File(ArrowStream, stdin); SELECT a + b as sum FROM table;
'''
# Spawn the subprocess and pass custom input from the parent process
def spawn_chdb(query, stdin, format):
    proc = subprocess.Popen(['python3', '-m', 'chdb', query, format], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
    custom_input = data
    stdout = proc.communicate(input=stdin)[0].decode('utf-8')
    return stdout

# Call the function to spawn the subprocess with custom input
res = spawn_chdb(query, data, "Dataframe")
# Run the query
print(res)

Perhaps someone more experienced than me could write a wrapper for piping data into queries in a prettier way 😉

auxten commented 1 year ago

A friend also ask me how to continue use chdb on a generated dataframe. It seems important for chdb Python users.

jialeiwang commented 1 year ago

As a machine learning algorithm engineer, I need this feature, Here is my scenario:

In data analysis, we often work through a step-by-step iterative process, which involves generating intermediate data tables. These tables allow us to refer back to previous steps and modify or generate new tables as needed. However, the final table we need is often a subset of the intermediate tables. To make things easier, we want to be able to reference the intermediate tables directly by their variable names. Writing out a large SQL query would not be intuitive and could be difficult to read, and changes in the future could be problematic.

If it is necessary to write the intermediate tables to disk, it would be acceptable as long as the user does not perceive this action. For example, the intermediate tables could be stored in a tmpdir and automatically destroyed after use.

auxten commented 1 year ago

@nmreadelf here's an ArrowStream pipe version

import sys
import pandas as pd
import pyarrow as pa
import chdb
import subprocess

df = pd.DataFrame({'a': [2, 2], 'b': [4, 4]})
# convert Dataframe to Arrow stream
data = pa.BufferOutputStream()
pa.RecordBatchStreamWriter(data, pa.Table.from_pandas(df).schema).write_table(pa.Table.from_pandas(df))
data = data.getvalue()
query = '''
CREATE TABLE table ENGINE = File(ArrowStream, stdin); SELECT a + b as sum FROM table;
'''
# Spawn the subprocess and pass custom input from the parent process
def spawn_chdb(query, stdin, format):
    proc = subprocess.Popen(['python3', '-m', 'chdb', query, format], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
    custom_input = data
    stdout = proc.communicate(input=stdin)[0].decode('utf-8')
    return stdout

# Call the function to spawn the subprocess with custom input
res = spawn_chdb(query, data, "Dataframe")
# Run the query
print(res)

Perhaps someone more experienced than me could write a wrapper for piping data into queries in a prettier way 😉

It seems pandas maintainers are going to rewrite pandas blockmanager. Arrow will be a better intermediate data format for the long term. There are 2 improvements todo:

  1. We can use INSERT INTO sometable FORMAT ArrowStream to skip writing table schema.
  2. I will find a way to let chdb engine read data from a given fd directly to avoid forking an extra process.
lmangani commented 1 year ago

@auxten you're right with Arrow we don't need to define the table schema and the definition can be bypassed. Example updated!

auxten commented 1 year ago

I tried different implementations of running chdb on dataframe, the best one is using memfd, something like:

table = pa.Table.from_pandas(df)
# create memfd
fd = os.memfd_create("./tmp_mem_fd", flags=os.MFD_CLOEXEC)

# create writer from file descriptor
with pa.RecordBatchStreamWriter(os.fdopen(fd, "wb"), table.schema) as writer:
    writer.write_table(table)

os.lseek(fd, 0, os.SEEK_SET)

# Query with Arrow file descriptor.
query = f'''CREATE TABLE table ENGINE = File(ArrowStream, {fd});
SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) 
    FROM table GROUP BY RegionID ORDER BY c DESC LIMIT 10;'''
ret = chdb.query(query, output_format="Dataframe")
os.close(fd)

See: https://gist.github.com/auxten/5dd72eeaf5f76728d17531deb59a6dd2

nmreadelf commented 1 year ago

I tried different implementations of running chdb on dataframe, the best one is using memfd, something like:

table = pa.Table.from_pandas(df)
# create memfd
fd = os.memfd_create("./tmp_mem_fd", flags=os.MFD_CLOEXEC)

# create writer from file descriptor
with pa.RecordBatchStreamWriter(os.fdopen(fd, "wb"), table.schema) as writer:
    writer.write_table(table)

os.lseek(fd, 0, os.SEEK_SET)

# Query with Arrow file descriptor.
query = f'''CREATE TABLE table ENGINE = File(ArrowStream, {fd});
SELECT RegionID, SUM(AdvEngineID), COUNT(*) AS c, AVG(ResolutionWidth), COUNT(DISTINCT UserID) 
    FROM table GROUP BY RegionID ORDER BY c DESC LIMIT 10;'''
ret = chdb.query(query, output_format="Dataframe")
os.close(fd)

See: https://gist.github.com/auxten/5dd72eeaf5f76728d17531deb59a6dd2

there is an implementation that involves the conversion of a Pandas object to PyArrow within the C++. https://github.com/duckdb/duckdb/blob/v0.8.0/tools/pythonpkg/src/pyconnection.cpp#L100

auxten commented 1 year ago

We will try to implement a Barely Usable version.

  1. Try using pandas 2.0 arrow backend as much as possible.
  2. Try use memfd instead of temp file on linux and give option to user.
  3. More detailed optimization points see https://gist.github.com/auxten/5dd72eeaf5f76728d17531deb59a6dd2

There will be more points to be optimized later:

  1. Convert arrow query result from clickhouse engine to pandas with pyarrow dtype. need support from apache arrow team. I have raised an issue about pyarrow to_pandas(dtype_backend=pyarrow) https://github.com/apache/arrow/issues/35802
  2. Avoid copying arrow data from pandas to memfd
  3. Maybe use memfd allocated memory in chdb query directly
  4. Try best to keep only one copy of pandas arrow mem