fabricks-framework / fabricks

MIT License
5 stars 0 forks source link

Custom Data Sources #13

Open dominikpeter opened 2 months ago

dominikpeter commented 2 months ago

Similar to how we register UDFs in the Fabricks.Runtime, we could do something similar with a custom data source: https://docs.databricks.com/en/pyspark/datasources.html

This could be nice for reading and writing from Excel or from a Sharepoint to name a few examples.

dominikpeter commented 2 months ago

Example how an Excel reader could look like:

from pyspark.sql.datasource import DataSource, DataSourceReader, DataSourceWriter
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DateType
import polars as pl
from typing import Iterator, Tuple, Union, Sequence, Any
import traceback

class ExcelDataSource(DataSource):
    @classmethod
    def name(cls):
        return "excel"

    def schema(self) -> Union[StructType, str]:
        try:
            file_path = self.options.get("path")
            sheet_name = self.options.get("sheet_name")
            sheet_id = self.options.get("sheet_id")
            has_header = self.options.get("has_header", True)
            columns = self.options.get("columns")
            infer_schema_length = self.options.get("infer_schema_length", 100)
            read_options = self.options.get("read_options", {})

            df = pl.read_excel(
                source=file_path,
                sheet_name=sheet_name,
                sheet_id=sheet_id,
                has_header=has_header,
                columns=columns,
                infer_schema_length=infer_schema_length,
                **read_options
            )
            return self._infer_schema(df)
        except Exception as e:
            print(f"Error inferring schema: {str(e)}")
            print(traceback.format_exc())
            # Return a default schema if inference fails
            return "value STRING"

    def reader(self, schema: StructType):
        return ExcelDataSourceReader(schema, self.options)

    def _infer_schema(self, df: pl.DataFrame) -> StructType:
        polars_to_spark = {
            pl.Utf8: StringType(),
            pl.Int64: IntegerType(),
            pl.Float64: FloatType(),
            pl.Date: DateType(),
        }
        return StructType([
            StructField(name, polars_to_spark.get(dtype, StringType()))
            for name, dtype in zip(df.columns, df.dtypes)
        ])

class ExcelDataSourceReader(DataSourceReader):
    def __init__(self, schema, options):
        self.schema = schema
        self.options = options
        self.file_path = options.get("path")
        self.sheet_name = options.get("sheet_name")
        self.sheet_id = options.get("sheet_id")
        self.has_header = options.get("has_header", True)
        self.columns = options.get("columns")
        self.infer_schema_length = options.get("infer_schema_length", 100)
        self.read_options = options.get("read_options", {})

    def read(self, partition) -> Iterator[Tuple]:
        try:
            df = pl.read_excel(
                source=self.file_path,
                sheet_name=self.sheet_name,
                sheet_id=self.sheet_id,
                has_header=self.has_header,
                columns=self.columns,
                infer_schema_length=self.infer_schema_length,
                **self.read_options
            )

            for row in df.iter_rows():
                yield tuple(self._convert_value(value, field.dataType) for value, field in zip(row, self.schema.fields))
        except Exception as e:
            print(f"Error reading Excel file: {str(e)}")
            print(traceback.format_exc())
            raise

    def _convert_value(self, value, data_type):
        if value is None:
            return None
        if isinstance(data_type, StringType):
            return str(value)
        if isinstance(data_type, IntegerType):
            return int(value)
        if isinstance(data_type, FloatType):
            return float(value)
        if isinstance(data_type, DateType):
            return value.date() if hasattr(value, 'date') else value
        return str(value)

# Register the data source
spark.dataSource.register(ExcelDataSource)
df = spark.read.format("excel").option("path", "file_path/file_name.xlsx").option("sheet_name", "Sheet1").load()
aersam commented 2 months ago

To use it, we also need a simple way to configure this in a bronze step

dominikpeter commented 2 months ago

Similar to this:

- job:
    step: bronze
    topic: foo
    item: bar
    options:
      mode: append
      keys: [id]
      parser: csv
      uri: abfss://raw@$datastore.dfs.core.windows.net/path
    parser_options:
      read_options:
        delimiter: ;
        header: true