usedatabrew / blink

OpenSource data platform to build event-driven systems. It's like Deebezium for golang :)
https://docs.databrew.tech/open-source/prerequisites
MIT License
26 stars 1 forks source link

Postgres Sink requires at least one PK column to execute statements #14

Closed le-vlad closed 10 months ago

le-vlad commented 10 months ago

Problem

func getColumnNames(columns []schema.Column) string {
    var columnNames []string
    var pkColumn string
    for _, column := range columns {
        if column.PK {
            pkColumn = column.Name
        } else {
            columnNames = append(columnNames, column.Name)
        }
    }

    columnNames = append(columnNames, pkColumn)
    return strings.Join(columnNames, ", ")
}

Works incorrectly when there are no PK columns, which results in an invalid SQL statement being generated.

Steps to reproduce

The following config will result in creating a table successfully, but no insert statements will be executed due to broken SQL generated.

service:
  id: 132
  pipeline_id: 132
  # {"coin":"BTC","price":916}  27
  stream_schema:
    - stream: coin_price
      columns:
        - name: coin
          databrewType: String
          pk: false
          nullable: false
        - name: price
          databrewType: Int64
          pk: false
          nullable: false

processors:
  - driver: sql
    config:
      query: "select * from stream.coin_price where price > 800"
#      query: "select price from stream.coin_price where price > 800"
  - driver: log
    config:
      stream: "*"

source:
  driver: websocket
  config:
    url: wss://databrew-ws-gateway.fly.dev/ws

sink:
  driver: postgres
  config:
    host: postgres
    port: 5432
    user: postgres
    password: lorem123
    database: postgres
    schema: public
    ssl_required: false