BlazingDB / blazingsql

BlazingSQL is a lightweight, GPU accelerated, SQL engine for Python. Built on RAPIDS cuDF.
https://blazingsql.com
Apache License 2.0
1.93k stars 183 forks source link

[BUG] Create table with Partitioned Columns returns empty Dataframe #1384

Open mlahir1 opened 3 years ago

mlahir1 commented 3 years ago

I am running into a following issue reading partitioned data from local disk. it reads am empty data frame. it is able to resolve the file paths very fine. but drops all the data. if you can see, it is even picking up all the columns and dtypes. i have read the file with cudf.rea_orc and there is data present in this files.

>>> bc.create_table('repl_order_table','/root/repl_order/', file_format='orc', get_metadata=False,
               partitions = {'code':['ABC'], 'dt':['2020-03-01]']},
                        partitions_schema = [('code','string'),('dt','date')])
>>> bc.tables['repl_order_table'].files
[b'/root/repl_order/code=ABC/dt=2020-03-03/000219_0']
>>> bc.sql('select * from repl_order_table')
Refer image attached.
>>> bc.sql('select * from repl_order_table').compute()
Refer image attached.

image

Christian8491 commented 3 years ago

@mlahir1 I noted you used partitions = {'code':['ABC'], 'dt':['2020-03-01]']} Could you please try again using the single quote char' at the end of the date value? --> 'dt':['2020-03-01'] . So basically it would be partitions = {'code':['ABC'], 'dt':['2020-03-01']} Or is it expected this single quote'char after the ] char ?

diegodfrf commented 3 years ago

For partitions with date, the partitions_schema is pandas.Timestamp('2020-03-01') with timestamp or date because it expects date in milliseconds and not string.

Also use partitions with dask and CSV/PSV files not load data.

Without Dask it work fine, but with Dask it fails.

For replicate, use 2 dask_workers For use utilityHive export environment var export PYTHONPATH=/<path_blazing>/tests/BlazingSQLTest


from dask.distributed import Client
from blazingsql import BlazingContext
from Utils import utilityHive
import dask_cudf

with_dask = True
ext = 'psv'

dir_data = '/<path_tpch_files>/tpch'
output_path = '/tmp/partitions' #Delete data if necessary

# This utility create partitions
utilityHive.test_hive_partition_data(input="%s/%s_[0-9]*.%s" % (dir_data, "part", ext),
                file_format=ext,
                table_name="part",
                partitions={
                    'p_container': ['JUMBO BAG', 'JUMBO BOX', 'JUMBO CAN', 'JUMBO CASE',
                            'JUMBO DRUM', 'JUMBO JAR', 'JUMBO PACK', 'JUMBO PKG',
                            'LG BAG', 'LG BOX', 'LG CAN', 'LG CASE', 'LG DRUM',
                            'LG JAR', 'LG PACK', 'LG PKG', 'MED BAG', 'MED BOX',
                            'MED CAN', 'MED CASE', 'MED DRUM', 'MED JAR', 'MED PACK',
                            'MED PKG', 'SM BAG', 'SM BOX', 'SM CAN', 'SM CASE',
                            'SM DRUM', 'SM JAR', 'SM PACK', 'SM PKG', 'WRAP BAG',
                            'WRAP BOX', 'WRAP CAN', 'WRAP CASE', 'WRAP DRUM',
                            'WRAP JAR', 'WRAP PACK', 'WRAP PKG']},
                partitions_schema=[('p_container', 'str')],
                output=output_path+"/part",
                num_files=1)
if with_dask:
    dask_sche_addrs = '127.0.0.1:8786'
    enable_progress_bar = True

    client = Client(dask_sche_addrs)
    bc = BlazingContext(dask_client=client)
else:
    bc = BlazingContext()

bc.create_table("part",
        output_path+"/part",
        file_format=ext,
        delimiter="|",
        dtype=[
            "int64",
            "str",
            "str",
            "str",
            "str",
            "int64",
            "str",
            "float32",
            "str"
        ],
        names=[
            "p_partkey",
            "p_name",
            "p_mfgr",
            "p_brand",
            "p_type",
            "p_size",
            "p_container",
            "p_retailprice",
            "p_comment",
        ])

query = """select * from part"""
df1 = bc.sql(query)

if type(df1) is dask_cudf.core.DataFrame:
    print(df1.compute())
else:
    print(df1)
mlahir1 commented 3 years ago

I see a new error now trying to create the table.

bc.create_table('repl_order_table','/root/traffic-data/', file_format='orc', get_metadata=False,
               partitions = {'event_dt':['2021-02-15'], 'cp_cd':['COM]'], 'cd_st':['US_ANDROID'], 'hr'=['0','1', '2']},
                        partitions_schema = [('event_dt','cp_cd', 'cd_st', 'hr'),('date', 'str', 'str', 'int')])
ERROR: The number of columns in 'partitions' should be the same as 'partitions_schema'

I remember you telling this api will be upgraded, if you can send me the upgraded API, i can test that too.

wmalpica commented 3 years ago

Note: speaking with @mlahir1 we know that this last code snippet is not correct, but we know that is not the source of the issue. It was hand written here just to show the type of issue