zillow / ctds

Python DB-API 2.0 library for MS SQL Server
MIT License
83 stars 12 forks source link

Can you document why this is better than alternatives? #3

Open sontek opened 7 years ago

sontek commented 7 years ago

I just stumbled upon this. Would love to know the benefits of this over things like pymssql which is what we use at SurveyMonkey

Thanks!

joshuahlang commented 7 years ago

The biggest difference is the implementation of Cursor.fetchall() (or Cursor.fetchmany()). ctds will read the requested rows into memory without releasing the GIL between row fetches. This makes a big difference in multi-threaded Python applications when multiple threads are using the database.

Aside from that it includes some additional features, such as:

We've been using it for over a year at ZIllow, with various versions of Python and SQL Server 2012 and 2014 with minimal issue. I would recommend using FreeTDS 1.00 or later as it has some fixes for some bad bugs that caused issues for us.

joshuahlang commented 7 years ago

I'd also mention that most of the currently open issues on the pymssql github repo are not present in ctds.

kafran commented 4 years ago

@joshuahlang I would like to use ctds on my ETL process, mainly because of the bulk feature. I'm relatively new to this area, I was working more with Data Analysis and due to the lack of professionals dealing with Data Engineering I went to an other area in my company. Can you help me? Where can I make contact with you? It doesn't need to be fast and live, I just need someone to help me leading the way. For example, I'm having trouble using the bulk insert because I'm not being able to wrap the textual columns with ctds.SqlNVarChar.

HuangRicky commented 4 years ago

@kafran this is what i did for sqlvarchar, might be helpful:

for column in df:
    # if type(df[column].iloc[0]) is str:
    if any([type(l) is str for l in list(df[column])]):
        if sys.version_info < (3, 0):

            # python 2, use decode first.
            df[column] = df[column].apply(lambda x: ctds.SqlVarChar(
                x.decode('utf-8').encode('latin-1')) if x is not None else x)
        else:
            # in python 3, don't need to decode.

            df[column] = df[column].apply(lambda x: ctds.SqlVarChar(
                x.encode('latin-1')) if x is not None else x)

there may be problem with indent, you can figure out yourself.

kafran commented 4 years ago

@HuangRicky Thank you. This Python 3 decode tip is a great one.

Do you use Pandas for your ETL process? The tables I need to move are 200M~500M+ rows. I don't know if Pandas can handle this. Checking for str type doesn't seems to be a vectorized operation, I think it will take a lot of time. For now probably I'm gonna create the tables on the destiny and just truncate the table every time before starting the ETL process. So I will probably just assume which columns are VarChar and NVarChar and hard code this.

Wouldn't it to be better to use an ORM for this kind of job? Or the DB-API itself, with .fetchmany(), put every thing in a CSV on a StringIO in memory and BulkLoad? We are trying to do an ELT. We have a lot of data we need to move from a VDB to a SQLServer Stage area and then we gonna use SQL to Transform the data into a DWH.

HuangRicky commented 4 years ago

@HuangRicky Thank you. This Python 3 decode tip is a great one.

Do you use Pandas for your ETL process? The tables I need to move are 200M~500M+ rows. I don't know if Pandas can handle this. Checking for str type doesn't seems to be a vectorized operation, I think it will take a lot of time. For now probably I'm gonna create the tables on the destiny and just truncate the table every time before starting the ETL process. So I will probably just assume which columns are VarChar and NVarChar and hard code this.

Wouldn't it to be better to use an ORM for this kind of job? Or the DB-API itself, with .fetchmany(), put every thing in a CSV on a StringIO in memory and BulkLoad? We are trying to do an ELT. We have a lot of data we need to move from a VDB to a SQLServer Stage area and then we gonna use SQL to Transform the data into a DWH.

this seems too large. my thought is: 1: write to csv then use MSSql native bcp 2: enhance what i did, probably use numpy or convert to numpy first? @joshuahlang do you have suggestion on how to upload a pandas dataframe? Thanks!

joshuahlang commented 4 years ago

I haven't worked with pandas, so I don't have much to add there. The suggestion to export to a CSV and then use the native bcp tool sounds like a good direction. Alternatively the internet also has suggestions, e.g. https://www.sqlshack.com/six-different-methods-to-copy-tables-between-databases-in-sql-server/

kafran commented 4 years ago

@joshuahlang You wrote a ctds' SQLAlchemy dialect. Can't SQLAlchemy handle ctds' VarChar and NVarChar transparently?

HuangRicky commented 4 years ago

@joshuahlang in the documentation, you said:

By default ctds.SqlVarChar will encode str objects to utf-8, which is likely incorrect for most SQL Server configurations.

can we have a global settings that can be modified? if we pass in a str obj, it will be encoded to ctds.DEFAULT_STR_ENCODING automatically? i think this will improve pandas related performance a lot. By the way thank you for providing this ctds package. It is by far the fastest bcp uploading library in Python. if we have Pandas support, it will become even more popular.

@kafran Here are some useful transform i did related to pandas:

# to convert np.nan to None:
df = df.where((pandas.notnull(df)), None)

# to convert datetime for bcp usage:
your_df_column = pd.Series(pd.to_datetime(pdcol, errors=errors), dtype=np.dtype("object"))

# to bulk_insert using the pandas generator. it's using less memory than using df.to_records.
tdsconnection.bulk_insert(table, df.itertuples(index=False, name=None))
kafran commented 4 years ago
# to bulk_insert using the pandas generator. it's using less memory than using df.to_records.
tdsconnection.bulk_insert(table, df.itertuples(index=False, name=None))

@HuangRicky, since Pandas 0.24.0 there is a new parameter called "method". Here is an example for Bulk Insert on a Postgres database. This is what I'm trying to do with ctds:

# Bulk load the table
def psql_bulk_insert(table, conn, keys, data_iter):
    # Gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection

    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
            table_name = table.name

        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)

df.to_sql('table_name', 
               sql_alchemy_engine, 
               index=False, 
               if_exists='replace', 
               method=psql_bulk_insert)
HuangRicky commented 4 years ago
# to bulk_insert using the pandas generator. it's using less memory than using df.to_records.
tdsconnection.bulk_insert(table, df.itertuples(index=False, name=None))

@HuangRicky, since Pandas 0.24.0 there is a new parameter called "method". Here is an example for Bulk Insert on a Postgres database. This is what I'm trying to do with ctds:

# Bulk load the table
def psql_bulk_insert(table, conn, keys, data_iter):
    # Gets a DBAPI connection that can provide a cursor
    dbapi_conn = conn.connection

    with dbapi_conn.cursor() as cur:
        s_buf = StringIO()
        writer = csv.writer(s_buf)
        writer.writerows(data_iter)
        s_buf.seek(0)

        columns = ', '.join('"{}"'.format(k) for k in keys)
        if table.schema:
            table_name = '{}.{}'.format(table.schema, table.name)
        else:
            table_name = table.name

        sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(table_name, columns)
        cur.copy_expert(sql=sql, file=s_buf)

df.to_sql('table_name', 
               sql_alchemy_engine, 
               index=False, 
               if_exists='replace', 
               method=psql_bulk_insert)

i doubt your approach might be slower than the in memory generator approach. you can test on some dataframe and compare the performance.

kafran commented 4 years ago

I doubt your approach might be slower than the in memory generator approach. you can test on some dataframe and compare the performance.

I'm trying to implement it with ctds. The difficult is with VarChar and NVarChar.