great-expectations / great_expectations

Always know what to expect from your data.
https://docs.greatexpectations.io/
Apache License 2.0
9.82k stars 1.51k forks source link

Memory leak when using GX in Azure Databricks #7971

Closed gerardperezbismart closed 1 year ago

gerardperezbismart commented 1 year ago

Describe the bug We are using GX to validate a relevant number of dataframes in parallel on Azure Databricks. Every time a new table is validated, memory usage increases and is never released. Eventually, when the driver's memory fills up, the process fails. (Even if the workers have free memory). At the moment, the only temporary solution found is to reset the cluster from time to time.

To Reproduce Steps to reproduce the behavior:

  1. Execute multiples validations simultaneously (f.e 1k dataframes in batches of 30. 2 validations for each dataframe) The code below is a parametrized notebook executed N times (N dataframes), in our particular case, launched from Azure Data Factory. It validates if the new data has some similarity with the previous day.
  dbutils.widgets.text("table_name",'None')
  dbutils.widgets.text('execution_date', 'None')

  table_name = dbutils.widgets.get("table_name")
  execution_date = dbutils.widgets.get('execution_date')

  import datetime as dt
  import json
  import datetime

  import great_expectations as gx
  from great_expectations.checkpoint import SimpleCheckpoint

  context = gx.get_context()

  dataframe_datasource = context.sources.add_or_update_spark(
      name="Spark_in_memory_datasource",
  )

  date = dt.datetime.strptime(execution_date, '%Y-%m-%d').date()
  yesterday_date = date - dt.timedelta(days=1)
  yesterday_date_str =yesterday_date.strftime('%Y-%m-%d')

  year, month, day = execution_date.split('-')
  df = spark.read.parquet(f"/mnt/data/landing/{table_name}/{year}/{month}/{day}/*") #read parquet

  yesterdayYear, yesterdayMonth, yesterdayDay = yesterday_date_str.split('-')
  previousDf = spark.read.parquet(f"/mnt/data/landing/{table_name}/{yesterdayYear}/{yesterdayMonth}/{yesterdayDay}/*") #read parquet

  dataframe_asset = dataframe_datasource.add_dataframe_asset(
      name=getRunId(),
      dataframe=df,
  )
  batch_request = dataframe_asset.build_batch_request()

  expectation_suite_name = f"{datetime.datetime.utcnow().strftime('%d%m%Y%H%M%S')}_valTest_test"
  context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)

  df_ge = context.get_validator(
      batch_request=batch_request,
      expectation_suite_name=expectation_suite_name,
  )

  YesterdayRows = previousDf.count()

  df_ge.expect_table_row_count_to_be_between(min_value = YesterdayRows * 0.9, max_value = YesterdayRows * 1.1)

  yesterdayColumns = previousDf.columns

  df_ge.expect_table_columns_to_match_set(column_set = yesterdayColumns, exact_match = True)

  df_ge.save_expectation_suite(discard_failed_expectations=False)

  my_checkpoint_name = expectation_suite_name + "_checkpoint"
  checkpoint = SimpleCheckpoint(
      name = my_checkpoint_name,
      config_version=1.0,
      class_name="SimpleCheckpoint",
      run_name_template= "%Y%m%d-%H%M%S-" + my_checkpoint_name,
      data_context=context,
  )
  context.add_or_update_checkpoint(checkpoint=checkpoint)

  checkpoint_result = context.run_checkpoint(
      checkpoint_name=my_checkpoint_name,
      validations=[
          {
              "batch_request": batch_request,
              "expectation_suite_name": expectation_suite_name,
          }
      ],  
  )
  1. Check Ganglia UI. See how memory is never released. https://learn.microsoft.com/es-es/azure/databricks/clusters/clusters-manage#ganglia-metrics

Expected behavior Memory should not be increased indefinitely unitl validations are not completed (is what it seems to be happening when running expectations). Memory should be released as the validations are completed so the memory remains constant or is decreased.

Environment (please complete the following information):

Additional context Current behavior: As we can see in the image shown below, memory is not released when validations are completed (when CPU usage is lower) image

If the allocated memory in the driver is not enough, it will fail (Swap). When cluster is Idle, memory is not released. image

Expected behavior: As we can see in the image shown below (sample from an standard data ingestion process), memory is released or stays constant. If extra CPU/Memory is needed due to extra workload, data and computation is distributed across workers. image

Solution Attempts We have enabled garbage collector and manually executed it. Deleted multiple variables like context or dataframe_datasource apart from loaded dataframes. Clear cache. Try a sequential execution. Simplify notebook (the code provided is already the simplified version).

Nothing worked. (Just restarting cluster)

jacovg91 commented 1 year ago

Exact same behaviour in our Databricks environment. Memory goes up to 240GB. Didn't find time to look for a resolution yet. Our Databricks code is also executed (in parallel) from an ADF pipeline.

gerardperezbismart commented 1 year ago

Hi @jacovg91, if you find any solution while waiting GX to fix it, we would appreciate you to share it with us. Thank You.

jacovg91 commented 1 year ago

@gerardperezbismart Unfortunately no resolution here yet, its not as pushing as this is the last step in our batch before the cluster gets turned off.

tjholsman commented 1 year ago

Hi @gerardperezbismart, thanks for reporting this! We'll take a look and be in touch.

alexsherstinsky commented 1 year ago

Hello, @gerardperezbismart -- I am now looking into this issue, and I would like to troubleshoot it in partnership with you.

In order to get started, I wanted to let you know that as part of the Great Expectations release "0.16.15" last Thursday (June 1, 2023), the DataFrame add_asset() API has changed. The new behavior is that dataframe_datasource.add_dataframe_asset() does not include the dataframe; the dataframe is specified in the call to dataframe_asset.build_batch_request() -- always. This change ensures that there is a single, unambiguous place in the API, where the actual DataFrame is specified, and it is consistent with creating the DataFrameAsset, reloading DataContext, and reading this DataAsset from configuration again. (Please see "https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/fluent/in_memory/how_to_connect_to_in_memory_data_using_pandas" for the new behavior.)

It would really help the troubleshooting process if we could learn whether or not the same loss of memory occurs under the updated API -- would it be possible for you to run it and provide the same detailed report as your original one?

Thank you very much in advance!

gerardperezbismart commented 1 year ago

Hi @alexsherstinsky, thanks for replying. I have updated our test environment with GX 0.16.15 release and modified the code as you mentioned: -dataframe_asset = dataframe_datasource.add_dataframe_asset(name=getRunId()) -batch_request = dataframe_asset.build_batch_request(dataframe = df) Keep in mind that we are not using Pandas Dataframes but we use Spark Dataframes, so our dataframe_datasource looks like: dataframe_datasource = context.sources.add_or_update_spark(name="Spark_in_memory_datasource")

Full code:

  dbutils.widgets.text("table_name",'None')
  dbutils.widgets.text('execution_date', 'None')

  table_name = dbutils.widgets.get("table_name")
  execution_date = dbutils.widgets.get('execution_date')

  import datetime as dt
  import json
  import datetime

  import great_expectations as gx
  from great_expectations.checkpoint import SimpleCheckpoint

  context = gx.get_context()

  dataframe_datasource = context.sources.add_or_update_spark(
      name="Spark_in_memory_datasource",
  )

  date = dt.datetime.strptime(execution_date, '%Y-%m-%d').date()
  yesterday_date = date - dt.timedelta(days=1)
  yesterday_date_str =yesterday_date.strftime('%Y-%m-%d')

  year, month, day = execution_date.split('-')
  df = spark.read.parquet(f"/mnt/data/landing/{table_name}/{year}/{month}/{day}/*") #read parquet

  yesterdayYear, yesterdayMonth, yesterdayDay = yesterday_date_str.split('-')
  previousDf = spark.read.parquet(f"/mnt/data/landing/{table_name}/{yesterdayYear}/{yesterdayMonth}/{yesterdayDay}/*") #read parquet

  dataframe_asset = dataframe_datasource.add_dataframe_asset(
      name=getRunId()
  )
  batch_request = dataframe_asset.build_batch_request(dataframe = df)

  expectation_suite_name = f"{datetime.datetime.utcnow().strftime('%d%m%Y%H%M%S')}_valTest_test"
  context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)

  df_ge = context.get_validator(
      batch_request=batch_request,
      expectation_suite_name=expectation_suite_name,
  )

  YesterdayRows = previousDf.count()

  df_ge.expect_table_row_count_to_be_between(min_value = YesterdayRows * 0.9, max_value = YesterdayRows * 1.1)

  yesterdayColumns = previousDf.columns

  df_ge.expect_table_columns_to_match_set(column_set = yesterdayColumns, exact_match = True)

  df_ge.save_expectation_suite(discard_failed_expectations=False)

  my_checkpoint_name = expectation_suite_name + "_checkpoint"
  checkpoint = SimpleCheckpoint(
      name = my_checkpoint_name,
      config_version=1.0,
      class_name="SimpleCheckpoint",
      run_name_template= "%Y%m%d-%H%M%S-" + my_checkpoint_name,
      data_context=context,
  )
  context.add_or_update_checkpoint(checkpoint=checkpoint)

  checkpoint_result = context.run_checkpoint(
      checkpoint_name=my_checkpoint_name,
      validations=[
          {
              "batch_request": batch_request,
              "expectation_suite_name": expectation_suite_name,
          }
      ],  
  )

Results We are experiencing the same results mentioned above. As we can see in the image shown below, memory is still not released when validations are completed (or idle). When memory usage is higher than the driver allocated memory (currently 128gb) it still fails (Swap). image CPU usage comparison: image

Environment:

If you need more information, just let me know. I'll try to answer asap. Could someone check if the spark session is properly closed? If not, this could mean a possible problem with memory (not sure at all). In Databricks we dont "manually" create any spark session unlike raw python

alexsherstinsky commented 1 year ago

Hello, @gerardperezbismart -- this is great information! Thank you so much for providing it and the code. Other than the fact that for the present experiment saving the Checkpoint is not needed, the code should be ready for trying in Databricks. Is there any way for you to share your Parquet data with us so that we reproduce as closely as possible to your environment? I am also wondering if we should test both in Azure and AWS backed Databricks environment. Thanks a lot!

gerardperezbismart commented 1 year ago

Hi, @alexsherstinsky. We tried to run GX without any validation/expectation, just instantiating context, datasource and dataframe_asset... Unfortunately, we are experiencing the same results mentioned above... (constant memory accumulation and never released)

image

Code:

import great_expectations as gx
import datetime as dt
import gc  

context = gx.get_context()

dataframe_datasource = context.sources.add_or_update_spark(
    name="Spark_in_memory_datasource"
)

dataframe_asset = dataframe_datasource.add_dataframe_asset(
    name=getRunId())

expectation_suite_name = f"{datetime.datetime.utcnow().strftime('%d%m%Y%H%M%S')}_valTest_test"
context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)

del context
del dataframe_datasource
gc.collect() 

As we have been able to verify, to reproduce our environment, you don't really need any parquet, just instantiate the library in multiple jobs. To do so, you can use a For Each activity in Data Factory that executes the test Notebook with Batch Count Settings up to 50.

image

As we are a Microsoft partner company, we only work with Azure. If you want to try it in AWS, we cannot help you. Thanks for answering.

alexsherstinsky commented 1 year ago

@gerardperezbismart Thank you very much for the code examples -- it looks like we are not even passing the dataframe, and the memory problem happens. That tells me that we should be able to reproduce locally. If we cannot and/or cannot reproduce in Databricks with AWS, then it is possible that the problem rests with the Azure backing of Databricks -- we do not know at this point, but we will start diagnosing by experimenting/troubleshooting locally. We will keep you posted. Thanks again.

alexsherstinsky commented 1 year ago

@gerardperezbismart We are still working on this; thank you very much for your patience.

gerardperezbismart commented 1 year ago

Thank you @alexsherstinsky!

gerardperezbismart commented 1 year ago

Hi @alexsherstinsky, how long will it take to have a solution? We are stuck in the validation part of our projects. Let me know if we can help.

alexsherstinsky commented 1 year ago

Hello, @gerardperezbismart -- please give us a few days; today is a Holiday, and I will resume working on this tomorrow and keep you posted. Thank you for your patience.

gerardperezbismart commented 1 year ago

Thank you @alexsherstinsky!

alexsherstinsky commented 1 year ago

Hello, @gerardperezbismart -- I have conducted some experiments locally and in Databricks/AWS environments. Here is the code I used:

import gc
import uuid
import datetime

import great_expectations as gx 

gx.__version__

def getRunId():
    return f"run_id_{str(uuid.uuid4())[:8]}"

def memory_usage_check():
    context = gx.get_context()
    datasource = context.sources.add_or_update_spark(
        name="SparkInMemoryDatasource"
    )
    data_asset = datasource.add_dataframe_asset(
        name=getRunId(),
    )
    expectation_suite_name = f"{datetime.datetime.utcnow().strftime('%d%m%Y%H%M%S')}_validation_Test_test"
    expectation_suite = context.add_or_update_expectation_suite(expectation_suite_name=expectation_suite_name)
    del context
    del datasource

for counter in range(40):
    memory_usage_check()
    num_unreachable_objects = gc.collect()
    print(num_unreachable_objects)

When I run it, the output of the print statement is always nearly the same (approximately 800 -- 1600 objects) and does not grow.

It is possible that this could have something to do with the Azure environment (e.g., a specific aspect of how Data Factory works)? We do not have a Databricks with Microsoft Azure environment set up, so perhaps we can troubleshoot this in collaboration with you. In order to do that, we would need some information:

1) What is the meaning of "Batch Count"? Does this refer to GX Batch concept, or something pertaining to Databricks and/or Azure? 2) What is your cluster configuration? 3) What is the meaning of "jobs"? How does one set up the above Python script to run in multiple jobs? (If this is a Databricks notion, then perhaps we can try to reproduce it in our Databricks/AWS environment.) If you take a look at the above "for" loop in my code snippet, is this the way to exercise the code, or are you doing it differently in your environment? 4) Are there other computational tasks running in your cluster that could obscure the problem we are observing?

Thank you very much for your collaboration. /cc @tyler-hoffman

gerardperezbismart commented 1 year ago

I have executed your code in our environment and memory does not increase indefinitely and it remains stable. This makes me think that the problem could be when GX is executed through Databricks jobs. You should check if the problem is due to the execution of jobs created from datafactory (the current issue) or if there are also memory problems when executing jobs directly from databricks. You can try to create them through the API (https://docs.databricks.com/api/workspace/jobs) or create them manually in the portal. I will update you as soon as I try it too.

Apart from these assumptions, let me answer your questions:

1. What is the meaning of "Batch Count"? Does this refer to GX Batch concept, or something pertaining to Databricks and/or Azure? It refers to the number of parallel runs inside the "For Each" activity of Azure Datafactory. In this case, is the maximum number of parallel executions of the validation notebook (aka notebook or parameterized notebook). It does not have any relation with GX, is just the way we can parallelize our databricks processes.

2. What is your cluster configuration? image Regardless of the memory of the workers, if the memory consumption exceeds the capacity of the driver, the process fails.

GX version: great_expectations==0.16.15

3. What is the meaning of "jobs"? How does one set up the above Python script to run in multiple jobs? (If this is a Databricks notion, then perhaps we can try to reproduce it in our Databricks/AWS environment.) If you take a look at the above "for" loop in my code snippet, is this the way to exercise the code, or are you doing it differently in your environment?

Databricks jobs are found there (Workflows/Jobs): image We dont manually define any job here. Datafactory is the one who creates the jobs that execute our databricks notebook. Every job is an independent execution of the parameterized notebook. Refer to the following link for more information about how we execute Databricks notebooks from Data Factory: https://learn.microsoft.com/en-us/azure/data-factory/transform-data-using-databricks-notebook

As I mentioned above, we can create jobs in this ways: Jobs API: https://docs.databricks.com/api/workspace/jobs Jobs through portal: image

About DataFactory, our "for" loop (aka For Each activity) loops as many times as tables we want to validate. Inside the "for" loop, the Databricks Notebook is executed with the parameters needed for every table. Datafactory automatically creates one Databricks job for every parameterized notebook and executes it. image As the tables are validated, the memory problems appear.

EDIT: We have used the same methodology in datafactory either with our main notebook or with the test one (the ones we shared here)

4. Are there other computational tasks running in your cluster that could obscure the problem we are observing?

No. This cluster is dedicated to the validations process. Other tasks such as integration, transformation, ... are executed in a separated cluster.

Thank you

tyler-hoffman commented 1 year ago

Hey @gerardperezbismart , thanks for the detailed response(s). It looks like the main gap we have right now is that our team is running aws-backed databricks, so we don't have an environment with datafactory.

Would you mind clarifying one point?

I have executed your code in our environment and memory does not increase indefinitely and it remains stable.

When you say you executed the code in your environment, how was it evoked? Via datafactory? I've invoked it by running databricks jobs, and seen stable memory consumption.

Thanks for your ongoing help troubleshooting this.

gerardperezbismart commented 1 year ago

Would you mind clarifying one point? Sure @tyler-hoffman, I have executed the code you shared to me (the one with the for loop) without any modification directly in databricks (no datafactroy) and it works as expected, memory usage remains constant. But this is not the way we execute our Notebooks, we use jobs. To run it properly in databricks jobs, every element of the loop, should be an independent job. Have you been able to try it like this? I think the problem lies in the number of jobs that have GX instantiated. Thanks!

tyler-hoffman commented 1 year ago

Hey @gerardperezbismart , thanks for the info. I've been able to reproduce now and will be taking it back to the team to prioritize. I'm not sure how quickly we'll get a fix in, but I'll keep you updated.

gerardperezbismart commented 1 year ago

Thanks @tyler-hoffman.

tyler-hoffman commented 1 year ago

Hi @gerardperezbismart, I discussed this with the team today. We are likely not going to be able to prioritize this for an internal fix in the immediate future. Because of that, we’ve closed the Issue, but we’ve added this to our product backlog for future consideration. We invite contributions for this issue, and are happy to support you to get this across the line if you’re interested in working on this. If so, please feel free to open a draft PR at any time, and reach out on Slack if you have any questions about the contribution process.

gerardperezbismart commented 1 year ago

We are very disappointed with the reply. That issue means that you have not support for databricks. Wich is your proposal as workaround for this problem?

tyler-hoffman commented 1 year ago

Hey @gerardperezbismart , I apologize about not being able to prioritize a fix. I think the workaround right now is to continue restarting the cluster periodically. Like I mentioned, we would love to get a community contribution on this. We are happy to help get this across the finish line if you are able put up a PR.