atc-net / atc-dataplatform

A common set of python libraries for DataBricks
https://atc-net.github.io/repository/atc-dataplatform
MIT License
8 stars 3 forks source link

AutoLoader framework #130

Open LauJohansson opened 2 years ago

LauJohansson commented 2 years ago

Since fullloading is very time consuming and often expensive, it is neccesary to introduce Autoloader.

LauJohansson commented 1 year ago

See e.g. Databricks Autoloader: https://docs.databricks.com/ingestion/auto-loader/index.html

LauJohansson commented 1 year ago

Use Trigger.AvailableNow https://docs.databricks.com/ingestion/auto-loader/production.html:

"Auto Loader can be scheduled to run in Databricks Jobs as a batch job by using Trigger.AvailableNow. The AvailableNow trigger will instruct Auto Loader to process all files that arrived before the query start time. New files that are uploaded after the stream has started will be ignored until the next trigger.

With Trigger.AvailableNow, file discovery will happen asynchronously with data processing and data can be processed across multiple micro-batches with rate limiting."

LauJohansson commented 1 year ago
spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

The expected_schema could be DeltaHandle.from_tc("something").read.schema

LauJohansson commented 1 year ago

https://docs.databricks.com/ingestion/auto-loader/directory-listing-mode.html:

Auto Loader uses directory listing mode by default. In directory listing mode, Auto Loader identifies new files by listing the input directory. Directory listing mode allows you to quickly start Auto Loader streams without any permission configurations other than access to your data on cloud storage

By default, Auto Loader automatically detects whether a given directory is applicable for incremental listing by checking and comparing file paths of previously completed directory listings.

You can explicitly enable or disable incremental listing by setting cloudFiles.useIncrementalListing to "true" or "false" (default "auto"). When explicitly enabled, Auto Loader does not trigger full directory lists unless a backfill interval is set.

LauJohansson commented 1 year ago

Use Trigger.AvailableNow https://docs.databricks.com/ingestion/auto-loader/production.html:

"Auto Loader can be scheduled to run in Databricks Jobs as a batch job by using Trigger.AvailableNow. The AvailableNow trigger will instruct Auto Loader to process all files that arrived before the query start time. New files that are uploaded after the stream has started will be ignored until the next trigger.

With Trigger.AvailableNow, file discovery will happen asynchronously with data processing and data can be processed across multiple micro-batches with rate limiting."

https://docs.databricks.com/ingestion/auto-loader/production.html

To reduce compute costs, Databricks recommends using Databricks Jobs to schedule Auto Loader as batch jobs using Trigger.AvailableNow (in Databricks Runtime 10.1 and later) or Trigger.Once instead of running it continuously as long as you don’t have low latency requirements.