benthosdev / benthos

Fancy stream processing made operationally mundane
https://www.benthos.dev
MIT License
7.68k stars 752 forks source link

how to use sql_select or sql_raw #2553

Closed NewSun1999 closed 2 weeks ago

NewSun1999 commented 2 weeks ago
http:
  enabled: false

input:
  label: "input_once"
  generate:
    mapping: |
      root = {"table":"demo","id":uuid_v4()}
    interval: 1s
    count: 1
pipeline:
  processors:
  - branch:
      processors:
      - gen_time:
          select: month
      - mapping: |
          root = this
          root.table = "%v%v%v".format(this.table , this.year , this.month)
          root.query = "SELECT * FROM %v%v%v WHERE create_time > ? and  create_time < ?;".format(this.table , this.year , this.month)
      - log:
          level: INFO
          message: hello ${! json("query") }
      - sql_raw:
          driver: mysql
          dsn: root:123456@tcp(127.0.0.1:3306)/test
          query: ${! json("query") }
          args_mapping: '[ this.time_start, this.time_end ]'
      result_map: 'root.test = this'

output:
  stdout:
    codec: lines

error log

level=info msg="Running main config from specified file" @service=benthos path=".\\demo.yaml"
level=info msg="Launching a benthos instance, use CTRL+C to close" @service=benthos
level=info msg="hello SELECT * FROM demo202404 WHERE create_time > ? and  create_time < ?;" @service=benthos label="" path=root.pipeline.processors.0.branch.processors.2
level=error msg="Branch error: processors failed: Error 1064: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '${! json(\"query\") }' at line 1" @service=benthos label="" path=root.pipeline.processors.0
{"id":"a5b4252b-8275-4b77-9613-9eb45f82ac6b","month":"04","table":"demo","time_end":"2024-04-30 16:00:00","time_start":"2024-04-30 15:00:00","week":"","year":"2024"}
level=info msg="Pipeline has terminated. Shutting down the service" @service=benthos

Remark: I wrote my own plugins gen_time add other info

( ${! json("query") } ) sql_raw(query) or sql_select (table) How to use it as a parameter

mihaitodor commented 2 weeks ago

Hey @NewSun1999 👋 I think the issue is that you haven't set unsafe_dynamic_query: true under the sql_raw processor. See the docs here.

PS: Converting to a discussion as per #2026.