warpstreamlabs / bento

Fancy stream processing made operationally mundane. This repository is a fork of the original project before the license was changed.
https://warpstreamlabs.github.io/bento/
Other
1.07k stars 71 forks source link

Template not found after call to `RegisterTemplateYAML(outputTemplate)` #126

Closed jem-davies closed 1 month ago

jem-davies commented 1 month ago

https://warpstreamlabs.github.io/bento/docs/configuration/templating/

Code:

//go:embed bento.yaml
var bento string

//go:embed trackstar/input_template.yaml
var inputTemplate string

//go:embed trackstar/output_template.yaml
var outputTemplate string

//go:embed trackstar/dead_letter_template.yaml
var deadLetterTemplate string

//go:embed trackstar/processor_resources.yaml
var processorResources string

func main() {
    err := godotenv.Load()
    if err != nil {
       log.Fatalf("Error loading .env file")
    }
    err = run(bento)
    if err != nil {
       fmt.Println(err)
    }
}

func run(config string) error {
    env := service.NewEnvironment()
    err := env.RegisterTemplateYAML(inputTemplate)
    if err != nil {
       return fmt.Errorf("input template yaml: %w", err)
    }
    err = env.RegisterTemplateYAML(outputTemplate)
    if err != nil {
       return fmt.Errorf("output template yaml: %w", err)
    }
    err = env.RegisterTemplateYAML(deadLetterTemplate)
    if err != nil {
       return fmt.Errorf("dead letter template yaml: %w", err)
    }
    builder := env.NewStreamBuilder()

    // register base
    err = builder.SetYAML(config)
    if err != nil {
       return fmt.Errorf("base yaml: %w", err)
    }
    // add processor resources
    err = builder.AddResourcesYAML(processorResources)
    if err != nil {
       return fmt.Errorf("processor resources yaml: %w", err)
    }
    stream, err := builder.Build()
    if err != nil {
       return err
    }
    return stream.Run(context.Background())
}

template.yaml:

name: postgres
type: output

fields:
  - name: dsn
    type: string
  - name: columns
    type: string
    kind: list
  - name: args_mapping
    type: string
    kind: list
  - name: table
    type: string

mapping: |
  root.output = {
    "sql_insert": {
      "driver": "postgres",
      "dsn": this.dsn,
      "columns": this.columns,
      "args_mapping": this.args_mapping,
      "table": this.table,
      "init_verify_conn": true
    }
  }

config.yaml:

output:
  switch:
    cases:
      - check: '@event_type == "test1"'
        postgres:
          dsn: "${POSTGRES_DSN}"
          columns: "${TEST_COLUMNS}"
          args_mapping: "${TEST_ARGS_MAPPING}"
          table: test1
      - check: '@event_type == "test2"'
        postgres:
          dsn: "${POSTGRES_DSN}"
          columns: "${TEST_COLUMNS}"
          args_mapping: "${TEST_ARGS_MAPPING}"
          table: test2
      - output:
          resource: print_output

Will fail linting with:

field postgres not recognised
jem-davies commented 1 month ago

wasn't an issue