databrickslabs / dlt-meta

Metadata driven Databricks Delta Live Tables framework for bronze/silver pipelines
https://databrickslabs.github.io/dlt-meta/
Other
156 stars 71 forks source link

Add non-Delta as Sink #48

Open ravi-databricks opened 7 months ago

ravi-databricks commented 7 months ago

Support non delta as sink using metadata approach.

ravi-databricks commented 3 months ago

This feature can be implemented using DLT's sink API as described below:

API guide

Create a sink To create a sink, you can use the new create_sink() sink API. This API accepts three arguments: A string for the sink name A string specifying the format (can be either kafka or delta) A map of sink options, formatted as {string: string} All of the sink options available in DBR are supported. E.g. All authentication options in Kafka.

create_sink(<sink_name>, <format>, <options_string_string_map>)

Code examples Create a Kafka sink

create_sink(
  "my_kafka_sink",
  "kafka",
  {
    "kafka.bootstrap.servers": "host:port",
    "subscribe": "my_topic"
  }
)

Create a Delta sink by giving the file system path to the table


create_sink(
  "my_delta_sink",
  "delta",
  { "path": "//path/to/my/delta/table" }
)

Create a Delta sink by giving the table name in UC

create_sink(
  "my_delta_sink",
  "delta",
  { "tableName": "my_catalog.my_schema.my_table" }
)

Use append_flow to write to a sink Once the sink object is created, you can set up an append_flow that writes to the sink.

create_sink("my_sink", "delta", {"path": "/tmp/delta_sink"})

@append_flow(name = "flow", target = "my_sink")
def flowFunc():
  return read_stream("xxx")
ravi-databricks commented 2 months ago

Added bronze_sinks and silver_sinks options in onboarding file as below:

[
   {
      "name":"sink_name1",
      "format":"delta",
      "options":{
         "tableName":"uc.tablename"
      }
   },
   {
      "name":"sink_name2",
      "format":"kafka",
      "options":{
         "kafka.bootstrap.servers":"{kafka_sink_broker}",
         "topic":"{kafka_sink_topic}"
      }
   }
]

added write_to_sinks in dataflow_pipeline.py under AppendFlowWriter

    @staticmethod
    def write_to_sinks(sinks: list[DLTSink], write_to_sink):
        """Write to Sink."""
        for sink in sinks:
            dlt.create_sink(sink.name, sink.format, sink.options)
            dlt.append_flow(name=f"{sink.name}_flow", target=sink.name)(write_to_sink)

Above code can be invoked while doing write:

        if self.dataflowSpec.sinks:
            dlt_sinks = DataflowSpecUtils.get_sinks(self.dataflowSpec.sinks, self.spark)
            AppendFlowWriter.write_to_sinks(dlt_sinks, self.write_to_delta)
ravi-databricks commented 2 months ago

Once DLT Direct publishing mode is in PuPr then will merge into release branch.