redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.09k stars 817 forks source link

How to execute sql statement in a loop? #1468

Closed Counterflowwind closed 1 year ago

Counterflowwind commented 1 year ago

I want to loop insert data from array into database. just like this:

input:
  generate:
    mapping: |
      root = {"sqldata":["delete from b.tax where month_of_salary='202208'","delete from a.tax where month_of_salary='202208'"]}
    interval: 0s
    count: 1
pipeline:
  processors:
    - while:
        at_least_once: false
        max_loops: 0
        check: this.sqldata.length() > 0
        processors:
          - sql_raw:
              driver: clickhouse
              dsn: clickhouse://[username[:password]@][netloc][:port]/dbname[?param1=value1&...&paramN=valueN]
              query: this.sqldata[i]

output:
  drop: {}

the processor of while is seem not appropriate and i can't get index of array.How to loop through array and execute sql statement?

peczenyj commented 1 year ago

hey did you try use the unarchive processor?

pipeline:
  processors:
    - mapping: |
        root = this.sqldata
    - unarchive:
        format: json_array

then use the query: this

or use a mapping root = this.explode("sqldata") and use query: this.sqldata

Counterflowwind commented 1 year ago

@peczenyj thanks for you reply. Unfortunately, the data is not inserted into the database when I execute it, and no error is reported.this is my config.yaml:

input:
  generate:
    mapping: |
      root = {"sqldata":["insert into `message_datawarehouse`.`syn_log` (sql,schema_name,table_name,create_time,message_type) values ('ddd00987733','message_dataware','sync_log',now(),'DML');","insert into `message_datawarehouse`.`syn_log` (sql,schema_name,table_name,create_time,message_type) values ('a22111122c','message_dataware','sync_log',now(),'DDL');"]}
    interval: 0s
    count: 1
pipeline:
  processors:
    - mapping: |
        root = this.sqldata
    - unarchive:
        format: json_array
    - sql_raw:
       driver: clickhouse
       dsn: clickhouse://[username[:password]@][netloc][:port]/dbname[?param1=value1&...&paramN=valueN]
       query: this

output:
  stdout: {}
peczenyj commented 1 year ago

Did you try add the sql query in the output section ?

Counterflowwind commented 1 year ago

Did you try add the sql query in the output section ?

input:
  generate:
    mapping: |
      root = {"sqldata":["insert into `message_datawarehouse`.`syn_log` (sql,schema_name,table_name,create_time,message_type) values ('ddd00987733','message_dataware','sync_log',now(),'DML');","insert into `message_datawarehouse`.`syn_log` (sql,schema_name,table_name,create_time,message_type) values ('a22111122c','message_dataware','sync_log',now(),'DDL');"]}
    interval: 0s
    count: 1
pipeline:
  processors:
    - mapping: |
        root = this.sqldata
    - unarchive:
        format: json_array
output:
  sql_raw:
    driver: clickhouse
    dsn: clickhouse://[username[:password]@][netloc][:port]/dbname[?param1=value1&...&paramN=valueN]
    query: this

panic when i execute config.yaml error info is:

level=info msg="Running main config from specified file" @service=benthos path=/benthos.yaml
level=info msg="Launching a benthos instance, use CTRL+C to close" @service=benthos
level=info msg="Listening for HTTP requests at: http://0.0.0.0:4195" @service=benthos
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0xee77c7]

goroutine 123 [running]:
github.com/benthosdev/benthos/v4/public/service.MessageBatch.BloblangQuery({0xc000eee950, 0x2, 0xc000f39601?}, 0xc000746ce8?, 0xee8585?)
    /go/src/github.com/benthosdev/benthos/public/service/message.go:276 +0x27
github.com/benthosdev/benthos/v4/internal/impl/sql.(*sqlRawOutput).WriteBatch(0xc000e2e000, {0x3c98f20, 0xc000dba200}, {0xc000eee950, 0x2, 0x2})
    /go/src/github.com/benthosdev/benthos/internal/impl/sql/output_sql_raw.go:177 +0x154
github.com/benthosdev/benthos/v4/public/service.(*airGapBatchWriter).WriteBatch(0xc000eee3d0, {0x3c98f20, 0xc000dba200}, {0xc000942400, 0x2, 0x2})
    /go/src/github.com/benthosdev/benthos/public/service/output.go:120 +0xd7
github.com/benthosdev/benthos/v4/internal/component/output.(*AsyncWriter).latencyMeasuringWrite(0xc0004c7280, {0x3c98f20, 0xc000dba200}, {0xc000942400, 0x2, 0x2})
    /go/src/github.com/benthosdev/benthos/internal/component/output/async_writer.go:86 +0x90
github.com/benthosdev/benthos/v4/internal/component/output.(*AsyncWriter).loop.func4()
    /go/src/github.com/benthosdev/benthos/internal/component/output/async_writer.go:228 +0x3c6
created by github.com/benthosdev/benthos/v4/internal/component/output.(*AsyncWriter).loop
    /go/src/github.com/benthosdev/benthos/internal/component/output/async_writer.go:266 +0x6c5
peczenyj commented 1 year ago

the root cause of this panic is the missing args mapping

what happens is:

  1. seems args_mapping is a mandatory field,
  2. the field query must be a string (in this case it try to execute the sql "this")
  3. if you want to use interpolation like ${! this } you must the the field unsafe_dynamic_query to true (see here: https://www.benthos.dev/docs/components/processors/sql_raw/#unsafe_dynamic_query ) but perhaps it is not necessary

a better approach may be

First, define the query as

insert into `message_datawarehouse`.`syn_log` (sql,schema_name,table_name,create_time,message_type) values (?,?,?,?,?);

then use the input generator to build the arguments

input:
  generate:
    mapping: |
      root = [
        ["ddd00987733","message_dataware","sync_log",now(),"DML"],
        ["a22111122c","message_dataware","sync_log",now(),"DDL"]
      ]
    interval: 0s
    count: 1

then use only the unarchive to split the arrays

pipeline:
  processors:
    - unarchive:
        format: json_array

then try this (I did not try it but it seems ok or maybe had some minor bug that anyone can spot quickly)

output:
  sql_raw:
    driver: clickhouse
    dsn: "clickhouse://[username[:password]@][netloc][:port]/dbname[?param1=value1&...&paramN=valueN]"
    query: "insert into `message_datawarehouse`.`syn_log` (sql,schema_name,table_name,create_time,message_type) values (?,?,?,?,?);"
    args_mapping: root = this
Counterflowwind commented 1 year ago

@peczenyj The sql statement I need to execute is the full sql, no placeholders.just like this:

insert into `message_datawarehouse`.`syn_log` values ('aaabbbccc','message_dataware','sync_log',now(),'DML');

Because the data I get has no placeholders,and the sql statement executed each time is different, there are insert , update and delete. And the sql statement obtained each time is a batch of data args_mapping is optional in the documentation(see here https://www.benthos.dev/docs/components/processors/sql_raw/#args_mapping).

peczenyj commented 1 year ago

ok, consider the usage of unsafe_dynamic_query: true like I said

if you had any panics, try to add something bogus on the args_mapping like root = 1 as workaround

Counterflowwind commented 1 year ago

ok, consider the usage of unsafe_dynamic_query: true like I said

if you had any panics, try to add something bogus on the args_mapping like root = 1 as workaround

unsafe_dynamic_query is the field of sql_raw processor, which is not supported by sql_raw in output. I wrote unsafe_dynamic_query: false and args_mapping: root = 1 in the sql_rqw processor, the data was not inserted successfully, and the sql_raw that wrote args_mapping: root = 1 in the output would report an error,the error info is:

level=error msg="Failed to send message to sql_raw: mapping returned non-array result: int64" @service=benthos label="" path=root.output
Jeffail commented 1 year ago

@Counterflowwind close but what you actually need is args_mapping: [] as in your case there are no arguments, so just an empty array. This is actually a bug as leaving the args_mapping field empty should result in the same thing (no arguments). Going to mark this issue as a bug and should hopefully include a fix in the next release, but for now args_mapping: [] should work and let us know if it doesn't.

Counterflowwind commented 1 year ago

@Jeffail cannot use args_mapping: [] because type requirement is string see here(https://www.benthos.dev/docs/components/processors/sql_raw#args_mapping) If use args_mapping: [] will report an error

level=error msg="Config lint error" @service=benthos lint="/benthos.yaml(18,1) expected string value"

I can't insert data into the database using sql_raw processor, and no error is reported. Is it a bug? my config.yaml is this:

input:
  generate:
    mapping: |
      root = {"sqldata":["insert into `message_datawarehouse`.`syn_log` values ('aaabbbccc','message_dataware','sync_log',now(),'DML');","insert into `message_datawarehouse`.`syn_log` values ('EEEEEEFFF','message_dataware','sync_log',now(),'DDL');","insert into `message_datawarehouse`.`syn_log` values ('degdddddd','message_dataware','sync_log',now(),'DDL');"]}
    interval: 0s
    count: 1
pipeline:
  processors:
    - try:
      - mapping: |
          root = this.sqldata
      - unarchive:
          format: json_array
      - sql_raw:
          driver: clickhouse
          dsn: clickhouse://[username[:password]@][netloc][:port]/dbname[?param1=value1&...&paramN=valueN]
          query: this
output:
  stdout: {}
Jeffail commented 1 year ago

Sorry, my bad it should be args_mapping: "root = []" as the value needs to be a bloblang mapping. Fix is in and will be in next release: https://github.com/benthosdev/benthos/commit/674cf386cce7716f47a94ee66f0781f1b26e71e4

As for your original question you need to change query: this to query: "${! this }" as the value is an interpolated string not a raw bloblang mapping. You also need to make sure unsafe_dynamic_query is set to true.

Counterflowwind commented 1 year ago

@Jeffail thanks for your reply.My original problem has been solved.But if I add args_mapping: "root = []" to the sql_raw of output, an error will be reported.error info is:

level=error msg="Failed to send message to sql_raw: code: 62, message: Syntax error: failed at position 1 ('this'): this. Expected one of: ALTER query, Query with output, ALTER PROFILE, RENAME DATABASE, SHOW PRIVILEGES query, TRUNCATE, KILL, KILL QUERY query, SELECT query, possibly with UNION, list of union elements, ALTER ROLE, SELECT subquery, DESCRIBE query, SELECT query, subquery, possibly with UNION, SHOW GRANTS, SHOW CREATE, WATCH, CREATE SETTINGS PROFILE or ALTER SETTINGS PROFILE query, SHOW PROCESSLIST query, ALTER POLICY, ALTER USER, CREATE VIEW query, CHECK TABLE, SET ROLE, SHOW CONNECTIONS query, SELECT query, SELECT, REVOKE, CREATE USER, CREATE DICTIONARY, CREATE PROFILE, SET ROLE DEFAULT, EXPLAIN, ALTER SETTINGS PROFILE, SYSTEM, ALTER LIVE VIEW, RENAME TABLE, DROP query, SHOW ACCESS, OPTIMIZE query, USE, DROP access entity query, RENAME DICTIONARY, DETACH, SET, SHOW, DESC, OPTIMIZE TABLE, CREATE ROW POLICY, SET DEFAULT ROLE, CREATE POLICY, ALTER ROW POLICY, INSERT INTO, INSERT query, SHOW [TEMPORARY] TABLES|DATABASES|CLUSTERS|CLUSTER 'name' [[NOT] [I]LIKE 'str'] [LIMIT expr], GRANT, RENAME query, SHOW GRANTS query, SHOW PRIVILEGES, EXISTS, DROP, SYSTEM query, CREATE LIVE VIEW query, CREATE ROW POLICY or ALTER ROW POLICY query, CREATE QUOTA or ALTER QUOTA query, SHOW PROCESSLIST, ALTER QUOTA, CREATE QUOTA, CREATE DATABASE query, SHOW CONNECTIONS, SET query, Query, CREATE, WITH, CREATE ROLE or ALTER ROLE query, EXTERNAL DDL FROM, EXCHANGE TABLES, EXISTS or SHOW CREATE query, WATCH query, REPLACE, CREATE ROLE, CREATE SETTINGS PROFILE, SET ROLE or SET DEFAULT ROLE query, CREATE USER or ALTER USER query, EXTERNAL DDL query, SHOW ACCESS query, SHOW CREATE QUOTA query, USE query, ATTACH, DESCRIBE, ALTER TABLE, ShowAccessEntitiesQuery, GRANT or REVOKE query, CREATE TABLE or ATTACH TABLE query" @service=benthos label="" path=root.output

my config.yaml is this:

input:
  generate:
    mapping: |
      root = {"sqldata":["insert into `message_datawarehouse`.`syn_log` (sql,schema_name,table_name,create_time,message_type) values ('ddd00987733','message_dataware','sync_log',now(),'DML');","insert into `message_datawarehouse`.`syn_log` (sql,schema_name,table_name,create_time,message_type) values ('a22111122c','message_dataware','sync_log',now(),'DDL');"]}
    interval: 0s
    count: 1
pipeline:
  processors:
    - mapping: |
        root = this.sqldata
    - unarchive:
        format: json_array
output:
  sql_raw:
    driver: clickhouse
    dsn: clickhouse://[username[:password]@][netloc][:port]/dbname[?param1=value1&...&paramN=valueN]
    query: this
    args_mapping: "root = []" 

if i change query: this to query: "${! this }" the error info is:

level=error msg="Failed to send message to sql_raw: code: 62, message: Syntax error: failed at position 1 ('$'): ${! this }. Expected one of: ALTER query, Query with output, ALTER PROFILE, RENAME DATABASE, SHOW PRIVILEGES query, TRUNCATE, KILL, KILL QUERY query, SELECT query, possibly with UNION, list of union elements, ALTER ROLE, SELECT subquery, DESCRIBE query, SELECT query, subquery, possibly with UNION, SHOW GRANTS, SHOW CREATE, WATCH, CREATE SETTINGS PROFILE or ALTER SETTINGS PROFILE query, SHOW PROCESSLIST query, ALTER POLICY, ALTER USER, CREATE VIEW query, CHECK TABLE, SET ROLE, SHOW CONNECTIONS query, SELECT query, SELECT, REVOKE, CREATE USER, CREATE DICTIONARY, CREATE PROFILE, SET ROLE DEFAULT, EXPLAIN, ALTER SETTINGS PROFILE, SYSTEM, ALTER LIVE VIEW, RENAME TABLE, DROP query, SHOW ACCESS, OPTIMIZE query, USE, DROP access entity query, RENAME DICTIONARY, DETACH, SET, SHOW, DESC, OPTIMIZE TABLE, CREATE ROW POLICY, SET DEFAULT ROLE, CREATE POLICY, ALTER ROW POLICY, INSERT INTO, INSERT query, SHOW [TEMPORARY] TABLES|DATABASES|CLUSTERS|CLUSTER 'name' [[NOT] [I]LIKE 'str'] [LIMIT expr], GRANT, RENAME query, SHOW GRANTS query, SHOW PRIVILEGES, EXISTS, DROP, SYSTEM query, CREATE LIVE VIEW query, CREATE ROW POLICY or ALTER ROW POLICY query, CREATE QUOTA or ALTER QUOTA query, SHOW PROCESSLIST, ALTER QUOTA, CREATE QUOTA, CREATE DATABASE query, SHOW CONNECTIONS, SET query, Query, CREATE, WITH, CREATE ROLE or ALTER ROLE query, EXTERNAL DDL FROM, EXCHANGE TABLES, EXISTS or SHOW CREATE query, WATCH query, REPLACE, CREATE ROLE, CREATE SETTINGS PROFILE, SET ROLE or SET DEFAULT ROLE query, CREATE USER or ALTER USER query, EXTERNAL DDL query, SHOW ACCESS query, SHOW CREATE QUOTA query, USE query, ATTACH, DESCRIBE, ALTER TABLE, ShowAccessEntitiesQuery, GRANT or REVOKE query, CREATE TABLE or ATTACH TABLE query" @service=benthos label="" path=root.output
Jeffail commented 1 year ago

@Counterflowwind you forgot to set https://www.benthos.dev/docs/components/processors/sql_raw#unsafe_dynamic_query to true

Counterflowwind commented 1 year ago

@Jeffail But unsafe_dynamic_queryis not supported in sql_raw of output. https://www.benthos.dev/docs/components/outputs/sql_raw

Jeffail commented 1 year ago

@Counterflowwind ahhhhhhhh I'm really sorry, been all over the place with this issue 😅. I've added unsafe_dynamic_query to the sql_raw output: https://github.com/benthosdev/benthos/commit/074663d9e54eab1460ab4aec5d1624dff250d22b, it'll be in the next release. For now you could use the sql_raw processor as an output like this: https://www.benthos.dev/docs/components/processors/about#using-processors-as-outputs, but ideally when the next release is out you can use the sql_raw output directly.