googleapis / python-spanner-sqlalchemy

Apache License 2.0
38 stars 28 forks source link

Table Object #285

Closed Joselguti closed 1 year ago

Joselguti commented 1 year ago

Hello,

I am trying to create a complete pipeline from the data I have in my computer to spanner, the steps are the following: 1.- I have the data as a Pandas Dataframe 2.- I create the table on spanner "dynamically" from said dataframe by doing the following code

def spanner_create_table(df, table, database):
    import numpy as np
    cols=list(df.columns)
    if('index' not in cols):
        df=df.reset_index()
    query_str=f"CREATE TABLE {table} ("
    types_dict={np.dtype('O'):'STRING(50)',np.dtype('int64'):'INT64','float64':'FLOAT64'}
    for each in cols:
        query_str+=f"{each} {types_dict[df[each].dtypes]},"
    query_str=query_str[:-1] + ') PRIMARY KEY (index)'
    operation = database.update_ddl(
        [
            ""f"{query_str}"""
        ]
    )
    print("Waiting for operation to complete...")
    operation.result()
    print("Added the MarketingBudget column.")
    return "Database created for new client",str(table)

3.-Now I would like to insert the data to the created table in spanner

I was looking at the following function in your files: def insert_data(conn, table, data): conn.execute(table.insert(), data)

Q: I'm not sure what object the "table" or how do I reference it to the actual table that is already created in spanner?

Q2: Is there not any df.to_sql() function available?

If you have any other suggestion for the table creation I would also appreciate it.

Thank you in advance!

IlyaFaer commented 1 year ago

@Joselguti, I don't see any sqlalchemy_spanner code in your snippet, you're using only pure python-spanner client. Are you going to use sqlalchemy_spanner dialect?

Looking at your code, I think, yes, you should use the dialect. Creation of a table with it can be done like this: https://github.com/googleapis/python-spanner-sqlalchemy#create-a-table

Here is how you can reference an existing Spanner table with the dialect: https://github.com/googleapis/python-spanner-sqlalchemy#read

For your case, I would walk through dataframe columns with a for cycle, generate Column objects and send them into the Table(). The object will generate all the necessary SQL operations for you.

While using sqlalchemy_spanner, you don't need to deal with Database, Instance and other original Spanner client objects, just use SQLAlchemy ORM features.

Joselguti commented 1 year ago

@IlyaFaer Thank you for your feedback, I was able to integrate your ideas to my code.

Another question, If I have an existing table on Spanner and I would like to add a new column, how could I acheive this via Python?

IlyaFaer commented 1 year ago

@Joselguti,

If you're using the pure python-spanner client, you can run the update_ddl() method with Spanner ALTER operations, like in the system tests, for example: https://github.com/googleapis/python-spanner/blob/57cbf4de90018c47825f4adff03d13aaeb8ca868/tests/system/test_database_api.py#L260-L263

With the SQLAlchemy dialect you can use Table.append_column() method, which is the simplest variant: https://docs.sqlalchemy.org/en/14/core/metadata.html#sqlalchemy.schema.Table.append_column

Most of the time though Alembic is used to run database migrations with SQLAlchemy dialect: https://alembic.sqlalchemy.org/en/latest/tutorial.html#running-our-second-migration

Joselguti commented 1 year ago

@IlyaFaer,

I have been doing several tests using SQLAlchemy dialect, tests of insertion and read of data in big chunks, I have encountered the following error when inserting/reading more than 250.000 rows and 24 columns at once:

grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.ABORTED
        details = "Transaction was aborted."
        debug_error_string = "{"created":"@1667909603.922000000","description":"Error received from peer ipv4:142.251.129.202:443","file":"src/core/lib/surface/call.cc","file_line":904
,"grpc_message":"Transaction was aborted.","grpc_status":10}"

To fix this error I am just splitting it in smaller sections and cycling to insert/read them into spanner, do you know of another solution?

On other hand, I have been trying to add new columns to an already existing table in spanner.

types_dict = {np.dtype('O'): String(50), np.dtype('int64'): Integer, np.dtype('float64'): Float,
              'TIMESTAMP': 'TIMESTAMP'}
metadata = MetaData(bind=spanner_engine)
#Bring table Object
table = Table(holding_name, metadata, autoload=True)
# Compare current df with spanner df, If new columns, attach
data_columns=list(rows[0].keys())
new_df_columns=list(df.columns)
if new_df_columns != data_columns:
    new_cols = list(set(new_df_columns) - set(data_columns))
    for each in new_cols:
        table.append_column(Column(each, types_dict[df.dtypes[each]], nullable=True))
metadata.create_all(spanner_engine)

I don't get any kind of error when executing this code, however there is no change reflected in spanner.

IlyaFaer commented 1 year ago

To fix this error I am just splitting it in smaller sections and cycling to insert/read them into spanner, do you know of another solution?

No, here I don't see another solution. Aborted kind of error means a data conflict: the data read/written is no longer actual, so it can't be written/read correctly. The longer a transaction takes and the more data it influences, the higher is the chance of an Aborted error. We have an aborted transaction auto-retry mechanism (see this), but in case of big inserts it's unlikely the transaction will be successfully retried, as it's still a big amount of data.

You can try AUTOCOMMIT mode to commit smaller peaces of data without manipulating transactions too much. In the docs there are recommendations about measuring an optimal transaction size: https://cloud.google.com/spanner/docs/commit-statistics https://cloud.google.com/spanner/docs/reference/rest/v1/TransactionOptions

IlyaFaer commented 1 year ago

I don't get any kind of error when executing this code, however there is no change reflected in spanner.

DDLs in Spanner are executed outside of normal transactions and in the default mode they are lazy. It means when you're executing DDLs, they are not really executed, but gathered into a batch. On commit or another SQL operation execute, all the batched DDLs are executed on the database. So, I guess, you just need to call commit in your program to make the DDL happen (or use AUTOCOMMIT mode, which'll execute every DDL immediately).

IlyaFaer commented 1 year ago

Seems like all the questions are answered for now. Closing the issue.

Joselguti commented 1 year ago

@IlyaFaer Sorry, I lost track of this Issue Ticket.

What do you mean by commit? I tried using the following: autocommit_engine = spanner_engine.execution_options(isolation_level="AUTOCOMMIT") But it didn't work either, I see no changes in my spanner table.

Basically what I want to achieve is the following

(Code taken from: https://cloud.google.com/spanner/docs/samples/spanner-add-column)

`def add_column(instance_id, database_id):
    """Adds a new column to the Albums table in the example database."""
    spanner_client = spanner.Client()
    instance = spanner_client.instance(instance_id)
    database = instance.database(database_id)

    operation = database.update_ddl(
        ["ALTER TABLE Albums ADD COLUMN MarketingBudget INT64"]
    )

    print("Waiting for operation to complete...")
    operation.result(OPERATION_TIMEOUT_SECONDS)

    print("Added the MarketingBudget column.")`

Is there a way of doing this with python-spanner-sqlalchemy

IlyaFaer commented 1 year ago

What do you mean by commit?

Well, commit, explicit call, like

connection.commit()

To add a column (or do another kind of schema change), usually migrations are used. In SQLAlchemy there is an Alembic tool, which organizes such things.

You still can do something like this:

engine.execute('ALTER TABLE %s ADD COLUMN %s %s' % (table_name, column_name, column_type))

to add a column manually, but it's not a common way.