redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.1k stars 824 forks source link

[Feature Request] Support inserting UUID from string in cassandra #2587

Open mmayantz opened 4 months ago

mmayantz commented 4 months ago
  - output:
      cassandra:
        addresses: [ "localhost:9042" ]
        query: "insert into tableX (property1, property2, property3, property4, property5) VALUES (?, ?, ?, ?, ?)"
        args_mapping: |
          root = [
            this.prop1,
            this.prop2,
            this.prop3,
            this.prop4,
            this.prop5
          ]

If the table im inserting into has uuid types, can this type be supported in the args mapping? Currently this does not appear to be the case.

if i manually insert into scylladb this:

VALUES (00000000-0000-0000-0000-000000000000, etc..

it inserts correctly, but with quotes:

VALUES ('00000000-0000-0000-0000-000000000000'

we get this error:

message="Invalid STRING constant (00000000-0000-0000-0000-000000000000) for "object_id" of type uuid"

Mizaro commented 4 months ago

@mmayantz, Can you please provide a docker-compose and a working Benthos config to replicate this problem? I am willing to try to solve it if I can replicate it well.

It's important to note that I am not affiliated with the Benthos maintainers and am new to Benthos. However, I am eager to contribute to the project and learn from this experience 😇

avardbacon commented 4 months ago

Here is a docker-compose.yaml and benthos_config.yaml to help you reproduce the issue. After you run docker-compose up, you should see the error message level=warning msg="Failed to send message: can not marshal int64 into uuid" @service=benthos label=fan path=root.output. It's actually the insert using the args_mapping array that's faulty.

docker-compose.yaml

version: '3.8'

services:
  scylladb:
    image: scylladb/scylla:latest
    ports:
      - "9042:9042"
    volumes:
      - scylla_data:/var/lib/scylla
    environment:
      - SCYLLA_RPC_ADDRESS=0.0.0.0
      - SCYLLA_LISTEN_ADDRESS=0.0.0.0
      - SCYLLA_BROADCAST_ADDRESS=scylladb

  benthos:
    image: jeffail/benthos
    depends_on:
      - scylladb
    volumes:
      - ./benthos_config.yaml:/benthos.yaml
    command: ["-c", "/benthos.yaml"]

volumes:
  scylla_data:

benthos_config.yaml

input:
  generate:
    count: 1
    mapping: |
      root = {
        "id": "test_id",
        "message": "test_content"
      }

output:
  label: "fan"
  broker:
    pattern: fan_out
    outputs:
      - cassandra:
          addresses:
            - scylladb:9042
          query: "CREATE KEYSPACE IF NOT EXISTS mykeyspace WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' };"
          max_retries:
            1
          batching:
            count: 500
            period: 1s
      - cassandra:
          addresses:
            - scylladb:9042
          query: "CREATE TABLE IF NOT EXISTS mykeyspace.mytable ( id uuid PRIMARY KEY );"
          max_retries:
            1
          batching:
            count: 500
            period: 1s
      - cassandra:
          addresses:
            - scylladb:9042
          query: 'INSERT INTO mykeyspace.mytable (id) VALUES (?);'
          args_mapping: |
            root = [
              00000000-0000-0000-0000-000000000000
            ]
          max_retries:
            1
          batching:
            count: 500
            period: 1s
      - cassandra:
          addresses:
            - scylladb:9042
          query: 'INSERT INTO mykeyspace.mytable (id) VALUES (12341234-1234-5678-1234-567801010101);'
          args_mapping: |
            root = [

            ]
          max_retries:
            1
          batching:
            count: 500
            period: 1s
mihaitodor commented 4 months ago

I think the libraries expect an object of this type https://pkg.go.dev/github.com/gocql/gocql#UUID which, currently, can't be created using Benthos. One way to do it would be to add a new Bloblang function which does the conversion so then you'd get an instance of that type that you can pass into the args_mapping. I'd try to PoC that and see if it works. Then maybe send a PR to add it here in a bloblang.go file similar to this.

Mizaro commented 4 months ago

@mihaitodor I didn't fully understand what you meant there, but both google/uuid and cassandra/uuid are [16] bytes. Maybe we can do an unsafe cast there to fix the problem.

mihaitodor commented 4 months ago

It seems to me that the gocql library I linked above exposes a custom type called UUID. Since the current implementation in Benthos returns plain strings, my expectation is that gocql sends them as-is and it probably requires an implementation which returns this custom UUID type instead of a plain string. Hope that helps.