MeltanoLabs / meltano-map-transform

A map transformer which implements the `Stream Maps` capability from Meltano's tap and target SDK: https://sdk.meltano.com/
Apache License 2.0
18 stars 15 forks source link

Support named functions #288

Open gruckion opened 2 months ago

gruckion commented 2 months ago

It would be very useful if we can define re-usable functions.

version: 1
default_environment: dev
project_id: ee2b1be3-e8f2-4962-a6c8-0af0952c60a5
environments:
  - name: dev
  - name: staging
  - name: prod
send_anonymous_usage_stats: false

plugins:
  extractors:
    - name: tap-mysql
      variant: meltanolabs
      pip_url: git+https://github.com/MeltanoLabs/tap-mysql.git
      config:
        user: root
        port: 3306
        host: 127.0.0.1
        database: mydatabase
        filter_schemas:
          - mydatabase
        ssh_tunnel:
          enable: false
          host: your_ssh_host
          port: 22
          username: your_ssh_username
          private_key: |
            -----BEGIN OPENSSH PRIVATE KEY-----
            Your private key content here
            -----END OPENSSH PRIVATE KEY-----
      select:
        # Core Business Entities
        - "mydatabase-companies.*"
        - "mydatabase-sites.*"
        - "mydatabase-site_details.*"
        - "mydatabase-contacts.*"
        - "mydatabase-contacts_sites.*"

  loaders:
    - name: target-postgres
      variant: meltanolabs
      pip_url: meltanolabs-target-postgres
      config:
        host: 127.0.0.1
        port: 5432
        user: postgres
        dbname: postgres
        database: postgres

  utilities:
    - name: dbt-postgres
      variant: dbt-labs
      pip_url: dbt-core dbt-postgres git+https://github.com/meltano/dbt-ext.git@main
      config:
        host: 127.0.0.1
        port: 5432
        user: postgres
        dbname: postgres
        schema: tap_mysql

  mappers:
    - name: meltano-map-transformer
      variant: meltano
      pip_url: git+https://github.com/MeltanoLabs/meltano-map-transform.git
      mappings:
        - name: transform_dates
          config:
            stream_map_config:
              clean_string: |
                lambda s: s.replace('\0', '') if s else None
              clean_date: |
                lambda d: d if d and '0000' not in d and '-00' not in d else None
              clean_timestamp: |
                lambda t: t if t and '0000' not in t and '-00' not in t else None
            stream_maps:
              "*": # Apply to all tables
                created_at: clean_timestamp(record['created_at'])
                updated_at: clean_timestamp(record['updated_at'])
              mydatabase-contacts:
                dob: clean_date(record['dob'])
              mydatabase-sites:
                last_viewed: clean_timestamp(record['last_viewed'])
                date_moved_in: clean_date(record['date_moved_in'])
                address_postcode: clean_string(record['address_postcode'])
                password: clean_string(record['password'])
                address_1: clean_string(record['address_1'])
                address_2: clean_string(record['address_2'])
                address_3: clean_string(record['address_3'])
                address_county: clean_string(record['address_county'])
                tag: clean_string(record['tag'])
                company_name: clean_string(record['company_name'])
                office_number: clean_string(record['office_number'])
                office_number_2: clean_string(record['office_number_2'])
                sector: clean_string(record['sector'])
                address_town: clean_string(record['address_town'])
              mydatabase-site_details:
                date_incorporated: clean_date(record['date_incorporated'])
              mydatabase-site_sources:
                date_moved_in: clean_date(record['date_moved_in'])

jobs:
  - name: run_etl_with_transform
    tasks:
      - tap-mysql transform_dates target-postgres

Instead of having to repeat the same bit of code over and over. In reality we actually need to do this instead.

version: 1
default_environment: dev
project_id: XXX
environments:
  - name: dev
  - name: staging
  - name: prod
send_anonymous_usage_stats: false

plugins:
  extractors:
    - name: tap-mysql
      variant: meltanolabs
      pip_url: git+https://github.com/MeltanoLabs/tap-mysql.git
      config:
        user: root
        port: 3306
        host: 127.0.0.1
        database: mydatabase
        filter_schemas:
          - mydatabase
        ssh_tunnel:
          enable: false
          host: your_ssh_host
          port: 22
          username: your_ssh_username
          private_key: |
            -----BEGIN OPENSSH PRIVATE KEY-----
            Your private key content here
            -----END OPENSSH PRIVATE KEY-----
      select:
        # Core Business Entities
        - "mydatabase-companies.*"
        - "mydatabase-sites.*"
        - "mydatabase-site_details.*"
        - "mydatabase-contacts.*"
        - "mydatabase-contacts_sites.*"

  loaders:
    - name: target-postgres
      variant: meltanolabs
      pip_url: meltanolabs-target-postgres
      config:
        host: 127.0.0.1
        port: 5432
        user: postgres
        dbname: postgres
        database: postgres

  utilities:
    - name: dbt-postgres
      variant: dbt-labs
      pip_url: dbt-core dbt-postgres git+https://github.com/meltano/dbt-ext.git@main
      config:
        host: 127.0.0.1
        port: 5432
        user: postgres
        dbname: postgres
        schema: tap_mysql

  mappers:
    - name: meltano-map-transformer
      variant: meltano
      pip_url: git+https://github.com/MeltanoLabs/meltano-map-transform.git
      mappings:
        - name: transform_dates
          config:
            stream_maps:
              "*": # Apply to all tables
                created_at:
                  record['created_at'] if record.get('created_at') and '0000'
                  not in record['created_at'] and '-00' not in record['created_at'] else None
                updated_at:
                  record['updated_at'] if record.get('updated_at') and '0000'
                  not in record['updated_at'] and '-00' not in record['updated_at'] else None
          mydatabase-contacts:
            dob:
              record['dob'] if record.get('dob') and '0000' not in record['dob']
              and '-00' not in record['dob'] else None
          mydatabase-sites:
            last_viewed:
              record['last_viewed'] if record.get('last_viewed') and '0000'
              not in record['last_viewed'] and '-00' not in record['last_viewed'] else None
            date_moved_in:
              record['date_moved_in'] if record.get('date_moved_in') and
              '0000' not in record['date_moved_in'] and '-00' not in record['date_moved_in'] else None
            address_postcode: record['address_postcode'].replace('\0', '') if record.get('address_postcode') else None
            password: record['password'].replace('\0', '') if record.get('password') else None
            address_1: record['address_1'].replace('\0', '') if record.get('address_1') else None
            address_2: record['address_2'].replace('\0', '') if record.get('address_2')else None
            address_3: record['address_3'].replace('\0', '') if record.get('address_3')else None
            address_county: record['address_county'].replace('\0', '') if record.get('address_county')else None
            tag: record['tag'].replace('\0', '') if record.get('tag') else None
            company_name: record['company_name'].replace('\0', '') if record.get('company_name')  else None
            office_number: record['office_number'].replace('\0', '') if record.get('office_number') else None
            office_number_2: record['office_number_2'].replace('\0', '') if record.get('office_number_2') else None
            sector: record['sector'].replace('\0', '') if record.get('sector') else  None
            address_town: record['address_town'].replace('\0', '') if record.get('address_town') else None
          mydatabase-site_details:
            date_incorporated:
              record['date_incorporated'] if record.get('date_incorporated')
              and '0000' not in record['date_incorporated'] and '-00' not in record['date_incorporated'] else None
          mydatabase-site_sources:
            date_moved_in:
              record['date_moved_in'] if record.get('date_moved_in') and
              '0000' not in record['date_moved_in'] and '-00' not in record['date_moved_in'] else None

jobs:
  - name: run_etl_with_transform
    tasks:
      - tap-mysql transform_dates target-postgres
edgarrmondragon commented 2 months ago

We could support a new key (e.g. user_functions) under stream_map_config, then parse and cache each function definition and pass it to the simpleeval context.

EDIT: That should be done in https://github.com/meltano/sdk.