mrpowers-io / levi

Delta Lake helper methods. No Spark dependency.
MIT License
21 stars 8 forks source link

Create a Delta table doctor that analyzes and health and wellness of a Delta table #7

Open MrPowers opened 1 year ago

MrPowers commented 1 year ago

A levi.delta_doctor(delta_table) command could be a nice way for users to help identify issues in their Delta table that could cause slow query performance.

There are several known problems that can cause poor performance of Delta tables:

The levi.delta_doctor(delta_table) could return a string with the following warnings:

We should make it really easy for users to see if there are any obvious problems in their Delta table. We will ideally give them really easy solutions to fix these problems as well!

puneetsharma04 commented 1 year ago

Hello @MrPowers : As per my understanding, we can create a code with below expectation: The DeltaDoctor class takes a delta_table argument and has three methods (check_small_files, check_large_files, and check_file_stats) that each check for a specific problem and return a warning message if the problem is found. The diagnose method runs all three checks and returns a string with all the warning messages, or "No issues found" if no issues are found. Let me know if that matches your expectation and code looks something like the below:

import os
from pyspark.sql import SparkSession

class DeltaDoctor:
    def __init__(self, delta_table):
        self.spark = SparkSession.builder.getOrCreate()
        self.delta_table = delta_table

    def check_small_files(self, threshold=1):
        small_files = self.spark.sql(f"DESCRIBE DETAIL {self.delta_table}").filter("size < 1000000")
        count = small_files.count()
        if count > 0:
            return f"SmallFileWarning: Your table contains {count} files with less than {threshold}MB of data and you could consider optimizing to compact the small files"
        else:
            return None

    def check_large_files(self, threshold=1500):
        large_files = self.spark.sql(f"DESCRIBE DETAIL {self.delta_table}").filter("size > 1500000000")
        count = large_files.count()
        if count > 0:
            return f"LargeFileWarning: Your table contains {count} files with more than {threshold}MB of data. You should split up these files."
        else:
            return None

    def check_file_stats(self, columns=["col1", "col2"]):
        file_stats = self.spark.sql(f"DESCRIBE DETAIL {self.delta_table}").select("stats")
        num_files = file_stats.count()
        bad_files = file_stats.filter((file_stats.stats.isNull()) | (~file_stats.stats.isSet(columns)))
        num_bad_files = bad_files.count()
        if num_bad_files > 0:
            return f"FileStatsWarning: You are only collecting stats for {columns} in {num_bad_files} out of {num_files} files."
        else:
            return None

    def diagnose(self):
        results = []
        small_files_result = self.check_small_files()
        if small_files_result:
            results.append(small_files_result)
        large_files_result = self.check_large_files()
        if large_files_result:
            results.append(large_files_result)
        file_stats_result = self.check_file_stats()
        if file_stats_result:
            results.append(file_stats_result)
        if len(results) == 0:
            return "No issues found"
        else:
            return "\n".join(results)
delta_table = "my_delta_table"
doctor = DeltaDoctor(delta_table)
warnings = doctor.diagnose()
print(warnings)
MrPowers commented 1 year ago

@puneetsharma04 - thanks for the suggestion, but this repo does not depend on Spark, so we'll need a solution that does not use PySpark. We'll need to use these APIs.

puneetsharma04 commented 1 year ago

@MrPowers : So you mean, we have to use delta-rs Python API to access the Delta Lake table and perform the necessary checks? then may be the code like below can work.

import delta
import os

class DeltaDoctor:
    def __init__(self, delta_table):
        self.delta_table = delta_table

    def check_small_files(self, threshold=1):
        table = delta.DeltaTable.forPath(spark, self.delta_table)
        small_files = table.files.filter(lambda f: f.size < 1000000)
        count = small_files.count()
        if count > 0:
            return f"SmallFileWarning: Your table contains {count} files with less than {threshold}MB of data and you could consider optimizing to compact the small files"
        else:
            return None

    def check_large_files(self, threshold=1500):
        table = delta.DeltaTable.forPath(spark, self.delta_table)
        large_files = table.files.filter(lambda f: f.size > 1500000000)
        count = large_files.count()
        if count > 0:
            return f"LargeFileWarning: Your table contains {count} files with more than {threshold}MB of data. You should split up these files."
        else:
            return None

    def check_file_stats(self, columns=["col1", "col2"]):
        table = delta.DeltaTable.forPath(spark, self.delta_table)
        num_files = table.history().count()
        bad_files = table.history().filter((table.history().stats.isNull()) | (~table.history().stats.isSet(columns)))
        num_bad_files = bad_files.count()
        if num_bad_files > 0:
            return f"FileStatsWarning: You are only collecting stats for {columns} in {num_bad_files} out of {num_files} files."
        else:
            return None

    def diagnose(self):
        results = []
        small_files_result = self.check_small_files()
        if small_files_result:
            results.append(small_files_result)
        large_files_result = self.check_large_files()
        if large_files_result:
            results.append(large_files_result)
        file_stats_result = self.check_file_stats()
        if file_stats_result:
            results.append(file_stats_result)
        if len(results) == 0:
            return "No issues found"
        else:
            return "\n".join(results)

delta_table = "my_delta_table"
doctor = DeltaDoctor(delta_table)
warnings = doctor.diagnose()
print(warnings)
MrPowers commented 1 year ago

@puneetsharma04 - this looks great. Really good work. Can you submit a PR?

jeremyjordan commented 1 year ago

One note on the reference implementation above: would it be possible to return the information in a little more structured format? Even just a list of warnings would be better than a single string IMO. For example, I might be interested in periodically running diagnose() on a set of Delta Tables and would want to have some ability to do something like:

doctor = DeltaDoctor(delta_table)
warnings = doctor.diagnose()
if len(warnings) > 0:
  send_alert(warnings)  # pseudocode example

And if we made classes for these warnings SmallFileWarning, LargeFileWarning, etc. we could also make it easy to do something like:

warnings = doctor.diagnose(ignore=[SmallFileWarning,])

to filter out warnings to we may want to explicitly ignore as the list of health checks grows over time.

puneetsharma04 commented 1 year ago

@jeremyjordan : Thanks for the suggestion. @MrPowers : I was trying to test the existing code in Pycharm. However encountered with below error, is it the an existing issue with the deltalake API or any other dependency i would need to install in the module?

deltalake.PyDeltaTableError: Not a Delta table: No snapshot or version 0 found, perhaps /Users/puneet_sharma1/Documents/GitHub/levi/tests/tests/reader_tests/generated/basic_append/delta/ is an empty dir?

MrPowers commented 1 year ago

@jeremyjordan - yes, I like the idea of returning results in a structured format. That's pretty much always my preference. Thanks for calling that out.

MrPowers commented 1 year ago

@puneetsharma04 - looks like you're using this path: /Users/puneet_sharma1/Documents/GitHub/levi/tests/tests/reader_tests/generated/basic_append/delta/

You should be using this path: /Users/puneet_sharma1/Documents/GitHub/levi/tests/reader_tests/generated/basic_append/delta/

Let me know if that fixes your issue!

puneetsharma04 commented 1 year ago

@MrPowers : You are right. However i haven't made any changes to code. Should i provide the full path and change the code or any other work around for it ? It like it creates a test folder (/Users/puneet_sharma1/Documents/GitHub/levi/tests/tests/reader_tests/generated/basic_append/delta/) on its own on this path. I am not sure why this kind of behaviour is there. its failing on "delta_table = DeltaTable("./tests/reader_tests/generated/basic_append/delta")" However if i use "delta_table = **DeltaTable("../tests/reader_tests/generated/basic_append/delta")"** its passed or successful.