Open MattTriano opened 1 year ago
The prior CLI workflow guided you through some prompts, providing options for the kind of data to connect to (filesystem or relational database) and either options for either the execution engine (for filesystem datasources) or options for the database backend (for SQL datasources), and then gx
formatted those inputs into a yaml
config string in a jupyter notebook. For the SQL branch, you would also enter the details needed to format a connection string so gx
could connect to the database, and after gx
tested the connection, it would save that information to the great_expectations.yml
file as a "data_connectors
" (as shown in my great_expectations.yml
, which I can safely include in version control because "secret" credentials in that file just reference the env-vars defined in the dot-env files created by the setup process). This enables users to just invoke that data_connector
when they need to connect to the data. Now, gx
calls this a SQL database block-configuration and documents it as an advanced datasource configuration.
The new mode for connecting to a (postgres) datasource is described here, and points to this other documentation page for guidance on "securely" storing connection credentials.
I'll see how cumbersome it is to reference the cached data_connector
using the new functionality. Based on preliminary experiments, it looks like I can access one of my data_connector
s with a few lines of code like this
import great_expectations as gx
datasource_name = "where_house_source"
data_connector_name = "data_raw_inferred_data_connector_name"
context = gx.get_context()
datasource = context.get_datasource(datasource_name)
data_connector = datasource.data_connectors[data_connector_name]
but the new documentation seems largely focused on a Datasource
-> DataAsset
flow, and after playing with/reading the source code of the relevantly named methods of my data_connector
object, I don't yet see a clean way to get from a DataConnector
to a DataAsset
without going through a pattern like
expectation_suite_name = "data_raw.cook_county_parcel_sales.warning"
data_asset_name = "data_raw.cook_county_parcel_sales"
batch_request = {
"datasource_name": datasource_name,
"data_connector_name": data_connector_name,
"data_asset_name": data_asset_name,
"limit": 1000,
}
validator = context.get_validator(
batch_request=BatchRequest(**batch_request),
expectation_suite_name=expectation_suite_name,
)
I'll experiment more to see if it's worth switching to the new flow or if I should maintain the bloc-config + DataConnector
-> Validator
flow.
I added an env-var to the airflow dot-env file (it was the same as AIRFLOW_CONN_DWH_DB_CONN
, just with the postgres://
prefix replaced with postgresql+psycopg2://
, and I used the AIRFLOW_CONN_DWH_DB_CONN
version because it already escaped the special characters in my dwh_db password), and then it was straightforward to create a new fluent.sql_datasource.Datasource
via
import great_expectations as gx
from great_expectations.exceptions import DataContextError
datasource_name = "fluent_dwh_source"
context = gx.get_context()
try:
datasource = context.sources.add_sql(name=datasource_name, connection_string="${GX_DWH_DB_CONN}")
except DataContextError:
datasource = context.get_datasource(datasource_name)
and that fluent-style Datasource
instance has methods that make it easy to programmatically register data tables or queries as data assets. The latter will make it very easy to adapt data to work with existing expectations.
table_asset = datasource.add_table_asset(
name="data_raw.temp_chicago_food_inspections",
schema_name="data_raw",
table_name="temp_chicago_food_inspections",
)
query_asset = datasource.add_query_asset(
name="food_inspection_results_by_zip_code",
query="""
SELECT count(*), results, zip
FROM data_raw.temp_chicago_food_inspections
GROUP BY results, zip
ORDER BY count DESC
"""
)
And after the table or query is registered, you can just reference it by name.
Sidenote: I should add tasks to the general pipeline that registers tables as assets if they don't already exist. I could even set up the a task to run every time but just do nothing if the asset is already registered (datasource.add_table_asset
throws a ValueError
if a DataAsset with that name has already been registered; it would have been nice if they made a more specific error to catch this, like DataContextError
above).
table_name="temp_chicago_food_inspections"
schema_name="data_raw"
if f"{schema_name}.{table_name}" not in datasource.get_asset_names():
table_asset = datasource.add_table_asset(
name=f"{schema_name}.{table_name}",
schema_name=schema_name,
table_name=table_name,
)
else:
print(f"A DataAsset named {schema_name}.{table_name} already exists.")
fluent
-style DataAsset
In the GX framework, a BatchRequest
is the way to specify the data GX should validate, and the fluent
-style DataAsset
class provides methods and tools that help configure a BatchRequest
.
import great_expectations as gx
datasource_name = "fluent_dwh_source"
table_name="temp_chicago_food_inspections"
schema_name="data_raw"
context = gx.get_context()
data_asset = context.get_datasource(datasource_name).get_asset(f"{schema_name}.{table_name}")
batch_request = data_asset.build_batch_request()
Before you can validate a DataAsset
, you have to create an ExpectationSuite
and then add Expectations
to your suite
expectation_suite_name = f"{schema_name}.{table_name}_suite"
expectation_suite = context.add_or_update_expectation_suite(
expectation_suite_name=expectation_suite_name
)
To set expectations, you can either define them manually (review the Expectation Gallery for options), or you can use a profiler to profile your data and generate expectations.
exclude_column_names = [] # If there are columns that shouldn't be profiled, add their names to this list
data_assistant_result = context.assistants.onboarding.run(
batch_request=batch_request,
exclude_column_names=exclude_column_names,
)
then extract the generated expectations from the profiler's run
expectation_suite = data_assistant_result.get_expectation_suite(
expectation_suite_name=expectation_suite_name
)
The expectations set by the profiler values should be reviewed and edited before saving them (by design, the profiler generates expectations that fit the batches exactly).
These methods return Expectations organized/ordered by grouping
expectation_suite.get_grouped_and_ordered_expectations_by_expectation_type()
expectation_suite.get_grouped_and_ordered_expectations_by_column()
expectation_suite.get_grouped_and_ordered_expectations_by_domain_type() # not too useful
and these methods are helpful in removing or replacing
expectation_suite.remove_all_expectations_of_type(expectation_types=[
"expect_column_values_to_match_regex", "expect_column_proportion_of_unique_values_to_be_between", ...
])
expectation_suite.replace_expectation(
new_expectation_configuration=ExpectColumnMedianToBeBetween(...),
existing_expectation_configuration=ExpectationConfiguration_returned_in_last_codeblock
)
When you're content with your suite of Expectations
, save it to your DataContext
via
context.add_or_update_expectation_suite(expectation_suite=expectation_suite)
Then you can create a Checkpoint
to run all of the table data against your expectations
checkpoint_config = {
"class_name": "SimpleCheckpoint",
"validations": [
{
"batch_request": batch_request,
"expectation_suite_name": expectation_suite_name,
}
],
}
checkpoint = SimpleCheckpoint(
f"{schema_name}.{table_name}_checkpoint",
context,
**checkpoint_config,
)
checkpoint_result = checkpoint.run()
assert checkpoint_result["success"] is True
If any expectations fail, replace/edit them until you pass your checkpoint. Then you can generate data_docs and/or save your checkpoint to your DataContext
context.build_data_docs()
context.add_checkpoint(checkpoint=checkpoint)
Three days ago, the Great Expectations team published a blog post titled A fond farewell to the CLI, announcing that they've recently introduced new functionality (
Fluent Datasources
) that makes their CLI-to-jupyter-notebook workflow obsolete. I already built that CLI-to-notebook workflow into this system's developer workflow, but I agree that it's rather cumbersome, so I'm keen to explore the new functionality and hopefully improve the jupyter notebook workflow beyond what was generated by thegx
CLI tools.I'll use this issue to largely document experiments, useful bits, and usage notes, and I'll close it when I've merged in documentation reflecting the new workflow.