Closed ryanshrott closed 4 months ago
def get_df_from_singlestore(query, database='ai'):
connection_details = connection_details_shared if database == 'shared' else connection_details_ai
connection = s2.connect(**connection_details)
cursor = connection.cursor()
cursor.execute(query)
if cursor.description is not None:
columns = [desc[0] for desc in cursor.description]
chunk_size = 10000 # Adjust based on your memory constraints
data_chunks = []
while True:
rows = cursor.fetchmany(chunk_size)
if not rows:
break
df_chunk = pd.DataFrame(rows, columns=columns)
df_chunk = df_chunk.drop(columns=unimportant_cols, errors='ignore')
data_chunks.append(df_chunk)
if data_chunks:
if len(data_chunks) == 1 and data_chunks[0].shape[0] <= chunk_size:
cursor.close()
connection.close()
return data_chunks[0]
# Concatenate chunks with consistent columns and data types
data_chunks = [chunk.dropna(axis=1, how='all') for chunk in data_chunks]
# Concatenate the data chunks
df = pd.concat(data_chunks, ignore_index=True)
print(df)
cursor.close()
connection.close()
return df
else:
cursor.close()
connection.close()
return pd.DataFrame()
Example above.
We are experiencing a similar issue.
The memory constantly increases every time you execute a single store query. The memory is not properly released. This memory leak issue forced me to migrate to SQLAlchemy. Switching to SQLAlchemy immediately resolved the issue.
By SQLAlchemy, I assume you mean SQLAlchemy using the a mysql://
prefix? That uses the PyMySQL driver which is what the SingleStoreDB driver is based on. You can send a pure_python=
option to s2.connect
to change to essentially the equivalent of PyMySQL (with extra SingleStoreDB-specific features). The memory leak is likely just in the C extension. I'll look into it. Thanks for the report.
@kesmit13 - I added the pure_python=True option in the connection string as follows -
connection_details_shared = { 'host': os.getenv('HOST'), 'port': os.getenv('PORT'), 'user': os.getenv('USER'), 'password': os.getenv('PASSWORD'), 'database': os.getenv('DATABASE'), 'pure_python': True }
but that did NOT solved the issue. Memory is not released.
It appears as though there may be a circular reference between cursors and connections somewhere. If I reuse the same connection (in pure Python mode) instead of creating a new one each time, the memory does not accumulate. I'm doing more investigation.
There were a couple of cyclical references in the connection object. These were fixed in 0ed670c03b5309bacbe63705b34038bd0554bb92, and released in v1.6.1. Let me know if you have any other issues.
@kesmit13 thanks. Has it been tested that it resolves a memory leak?
Yes. I used your test case to verify it.
@ryanshrott BTW, both connection objects and cursor objects are context managers, so you can change your function to have an outer layer like this and they will both automatically close.
with s2.connect(...) as connection:
with connection.cursor() as cursor:
...
@kesmit13 thanks, I modified like this:
def get_df_from_singlestore(query, database='ai'):
connection_details = connection_details_shared if database == 'shared' else connection_details_ai
with s2.connect(**connection_details) as connection:
with connection.cursor() as cursor:
cursor.execute(query)
if cursor.description is not None:
columns = [desc[0] for desc in cursor.description]
chunk_size = 10000 # Adjust based on your memory constraints
data_chunks = []
while True:
rows = cursor.fetchmany(chunk_size)
if not rows:
break
df_chunk = pd.DataFrame(rows, columns=columns)
df_chunk = df_chunk.drop(columns=unimportant_cols, errors='ignore')
data_chunks.append(df_chunk)
if data_chunks:
if len(data_chunks) == 1 and data_chunks[0].shape[0] <= chunk_size:
return data_chunks[0]
# Drop columns with all NaN values
data_chunks = [chunk.dropna(axis=1, how='all') for chunk in data_chunks]
# Concatenate the data chunks
df = pd.concat(data_chunks, ignore_index=True)
return df
return pd.DataFrame()
The memory constantly increases every time you execute a single store query. The memory is not properly released. This memory leak issue forced me to migrate to SQLAlchemy. Switching to SQLAlchemy immediately resolved the issue.