datacontract / datacontract-cli

CLI to manage your datacontract.yaml files
https://cli.datacontract.com
Other
352 stars 60 forks source link

refact: add exporter factory #260

Closed teoria closed 1 week ago

teoria commented 2 weeks ago

This refactoring makes it easier to create new exporters. If you want to add a new type of Exporter, you just need to create the class implementing the interface and add the new feature to the exporter factory. This idea follows the open and closed principle of SOLID and will remove a lot of if else from datacontracts.py

What do you think ? the same ideia can be used with importers

jochenchrist commented 2 weeks ago

We have always been very cautious about introducing abstractions (KISS) and because we were unsure about the exact requirements and parameters. However, I think the time has come to consider refactoring, especially for the exports.

That being said, what would the implementation look like if an exporter, such as the rdf_n3 exporter, requires additional parameters?

teoria commented 2 weeks ago

If there is an Exporter with different arguments, we could create a DTO class for the exporter. buuuut passing the datacontract reference as an argument and the exporter class would have access to all attributes of the class

data_contract.py

self.data_contract = 
self.model =
self.export_format =

exporter = factory_exporter.get_exporter(export_format) 
return exporter.export( self ) ## <<< passing datacontract instance

and inside exporter we can get (datacontract/export/avro_converter.py)


class AvroExporter(Exporter):
    def export(self, data_contract_ref: DataContract) -> dict:
        model_name, model_value = data_contract_ref._check_models_for_export( ## << using the reference method
                   data_contract_ref.data_contract, ## << using the reference attribute
                   data_contract_ref.model, 
                   data_contract_ref.export_format)
        return self.to_avro_schema_json(model_name, model)

this ideia looks like a visitor design patten

jochenchrist commented 2 weeks ago

Or we use kwargs...

@abstractmethod
def export(self, data_contract, model_name, model, **kwargs) -> dict:
    pass
teoria commented 2 weeks ago

huuuum kwargs is a good one! Another option is a generic dict with all fields

self.data_contract = 
self.model =
self.export_format =

dict_or_dto = { "data_contract" : data_contract ..... }

exporter = factory_exporter.get_exporter(export_format) 
return exporter.export( dict_or_dto ) ## <<< passing dict or dto

google apis uses this because dont need signature changes for new versions

but for each new exporter we need to change the data_contract.py if send the instance as argument we need to change only de init.py of the export module and create a new export class. Much easier to grow

teoria commented 2 weeks ago

i think better the exporter will send in datacontract.export( HERE ) <<<<<

the cli create the exporter using the factory and send to export method

i'll remove from datacontratc class move to cli.py

at night i ll try

teoria commented 2 weeks ago

you were right! with kwargs it worked well and now the cli can pass additional parameters to different exporters.


def export(
        self, export_format: ExportFormat,   model: str = "all", **kwargs   #<<<<
    ) -> str:
        data_contract = resolve.resolve_data_contract(
            self._data_contract_file,
            self._data_contract_str,
            self._data_contract,
            inline_definitions=True,
            inline_quality=True,
        )
        print(kwargs)

        exporter = factory_exporter.get_exporter(export_format)
        model_name, model_value = self._check_models_for_export(data_contract, model, export_format)
        export_args = {
            'data_contract': data_contract,
            'model_name': model_name,
            'model_value': model_value 
        }
        export_args.update(kwargs)  #<<<<< merge default and aditional args
        return exporter.export(export_args)  #<<<<< generic argument

exporters :


class RDFExporter(Exporter):
    def export(self, export_args) -> dict:
        self.dict_args = export_args     
        return f"RDF {self.dict_args.get('rdf_n3')} works !! " 

class AvroExporter(Exporter):
    def export(self, export_args) -> dict:
        self.dict_args = export_args  
        return self.to_avro_schema_json(
            self.dict_args.get('model_name'), 
            self.dict_args.get('model_value')
            )
datacontract_obj  = DataContract(
                    data_contract_file= "datacontract.yaml"
                    )

datacontract_obj.export(
                        export_format=ExportFormat.rdf, # <<< RDF
                        model='orders',    # <<< default arg
                        rdf_base='teoria',
                        rdf_n3='rdf_config_aditional_n3',  # <<< new arg 
                        teste=True,
                        teste2=False
                    )

Output:

RDF rdf_config_aditional_n3 works !! 

if i change to Avro:


datacontract_obj.export(
                        export_format=ExportFormat.Avro,
                        model='orders',   
                        teste=True,
                        teste2=False
                    )

Output:

{
  "type": "record",
  "name": "orders",
  "doc": "One record per order. Includes cancelled and deleted orders.",
  "fields": [
    {
      "name": "order_id",
      "doc": "An internal ID that identifies an order in the online shop.",
      "type": "string"
    },
    {
      "name": "order_timestamp",
      "doc": "The business timestamp in UTC when the order was successfully registered in the source system and the payment was successful.",
      "type": {
        "type": "long",
....
teoria commented 2 weeks ago

To implement new exporters we just need to add the new class to the factory.

from datacontract.export.exporter import ExportFormat, FactoryExporter
from datacontract.export.avro_converter import AvroExporter, RDFExporter

factory_exporter = FactoryExporter()
factory_exporter.add_exporter(ExportFormat.avro, AvroExporter)
factory_exporter.add_exporter(ExportFormat.rdf, RDFExporter)
# factory_exporter.add_exporter(ExportFormat.jsonschema, JsonExporter)
# factory_exporter.add_exporter(ExportFormat.pydantic_model, PydanticExporter)
# factory_exporter.add_exporter(ExportFormat.sodacl, SodaExporter)
# factory_exporter.add_exporter(ExportFormat.dbt, DBTExporter) 

__all__ = ['factory_exporter','ExportFormat']

without any 'if' like if export_format == "jsonschema": if export_format == "sodacl": if export_format == "dbt":

teoria commented 1 week ago

@jochenchrist, would you take a look at this branch when you have some time?

jochenchrist commented 1 week ago

@teoria I added some review comments. Nice work so far :)

teoria commented 1 week ago

@jochenchrist the "server" argument is no clear the exporter and the DataContract class have a argument called server

jochenchrist commented 1 week ago

@jochenchrist the "server" argument is no clear the exporter and the DataContract class have a argument called server

You can specify a specific server key (such as "production") that you want to use to read the connection details (e.g. to connect for the tests or to determine the server type for export)

teoria commented 1 week ago

@jochenchrist i got it

teoria commented 1 week ago

Using the new datacontract exporter interface with a custom exporter without change anything in the core

import pprint
from datacontract.data_contract import DataContract
from datacontract.export.exporter import Exporter
from datacontract.export.exporter_factory import exporter_factory

## Create a custom class implementing export def
class CustomExporter(Exporter):
    def export(self, data_contract, model, server, sql_server_type, export_args) -> dict: 
        result = {
                    "data_contract_servers": data_contract.servers,
                    "model": model, 
                    "server": server,
                    "sql_server_type": sql_server_type,
                    "export_args": export_args,
                    "custom_args": export_args.get('custom_arg', '')
                    }
        return result

## register the new exporter
exporter_factory.register_exporter( 'custom' , CustomExporter )

if __name__ == "__main__":
    dc = DataContract(
        data_contract_file="/Users/C10017Q/estudos/datacontract-cli/datacontract/datacontract.yaml", server="production"
    )
    ## call the exporter method with custom args
    result = dc.export(export_format='custom', model="orders", server="production", custom_arg='my_custom_arg')
    pprint.pp(result)

Output:

{
     'data_contract_servers': {'production': Server(type='s3', format='json', project=None, dataset=None, path=None, delimiter='new_line', endpointUrl=None, location='s3://datacontract-example-orders-latest/data/{model}/*.json', account=None, database=None, schema_=None, host=None, port=None, catalog=None, topic=None, http_path=None, token=None, dataProductId=None, outputPortId=None, driver=None)},
     'model': 'orders',
     'server': 'production',
     'sql_server_type': 'auto',
     'export_args': {'server': 'production', 'custom_arg': 'my_custom_arg'},
     'custom_args': 'my_custom_arg'
}
teoria commented 1 week ago

@jochenchrist done ! if you like this solution i'll refact the import process too

jochenchrist commented 1 week ago

Thanks for your contribution and your effort. Happt to merge :)