Closed emmanuelbertho closed 4 months ago
Hey. Good catch. I just gave the code a lookthrough and it appears we don't actually check whether the connection to the database is actually open and healthy when configuring these SQL services.
We log a WARN
when we fail to execute a query against a DB but initial Connect
to the DB silently fails.
Think this deserves some consideration. Will ask @jem-davies to give this a look too.
How we currently do things:
func (s *sqlInsertOutput) Connect(ctx context.Context) error {
// Some code here...
var err error
// Internally calls sql.Open(driver, dsn) which does attempt a connect to the DB.
if s.db, err = sqlOpenWithReworks(s.logger, s.driver, s.dsn); err != nil {
return err
}
// fails with a warning but doesn't actually trigger an error.
s.connSettings.apply(ctx, s.db, s.logger)
// Some more code here...
}
Instead, could include a ping (with context) and mark the output
as a fail if this is unsuccessful:
if err := s.db.PingContext(ctx); err != nil {
return err
}
I wonder if we want this component to lazily connect or if it's more desiable to check whether the connection is alive/possible upon creating the pipeline/component.
Ok great ! Sorry my Golang knowledges are not enough to do a fix myself :(
Have been giving this some more thought.
The output metric docs say the following about the output_connection_up
metric:
output_connection_up
: For continuous stream based outputs represents a count of the number of the times the output has successfully established a connection to the target sink. For poll based outputs that do not retain an active connection this value will increment once.
With the caveat:
The behaviour of connection metrics may differ based on output type due to certain libraries and protocols obfuscating the concept of a single connection.
Technically the Golang SQL package opens a connection pool and opens/closes connection to the DB as needed and I'm wondering if we need actually need to reconsider how this is tracked/managed by bento.
I agree that that metric indicating things are up is confusing. I'm unsure whether checking if the DB can be communicated with at Connect()
time is desirable since it means we can't "lazily load" the ouput sinks.
Will think on this some more and decide if it's necessary to fix. Maybe adding a disable_lazy_loading
option could be a viable workaround while still not introducing a breaking change.
Ok makes sense. My goal is to check if the connection parameters I am using are correct when I create a stream but if it is not possible I can probably find a workaround.
It's probably a bit of a stretch but you could just do a simple check against both your DBs in the input as follows:
sql_raw:
driver: postgres
dsn: postgres://postgres:admin@localhost:5432/?database=postgres&sslmode=disable
query: SELECT 1;
This should fail if the SELECT 1;
query is unable to execute. So you could do:
input:
broker:
inputs:
- sql_raw:
driver: postgres
dsn: postgres://postgres:admin@localhost:5432/?database=postgres&sslmode=disable
query: SELECT 1;
processors:
- mapping: root = deleted()
- sql_raw:
driver: postgres
dsn: postgres://postgres:admin@localhost:5432/?database=timescale&sslmode=disable
query: SELECT 1;
processors:
- mapping: root = deleted()
- resource: foo # run the resource you actually care about here
Definitely undesirable but if you need to ensure the DBs are up and running at start-time, this could be a workaround (albeit an ugly one...).
Thanks for you help I will keep this in mind if I really need this feature :)
Hey @emmanuelbertho. We've added some new functionality to account for this case. You can add an init_verify_conn: true
to your SQL components and this will check if the Bento client can ping the DB at start-time :smile:
Currently, this is unreleased so you'd have to do a go install
or manual build from HEAD of main. Otherwise, this will be included in the next release v1.2
which should be out by a bit later this week.
Hello,
I noticed a bug when using sql_insert with postgres as an output in streams mode. When I put fake connection parameters for postgres the /stats endpoint for my stream displays that the connection is up tough it is not.
Logs :
2024-07-10 09:49:57 {"@service":"bento","label":"","level":"warning","msg":"Failed to execute init_statement: dial tcp: lookup myhost on 127.0.0.11:53: no such host","path":"root.output","stream":"Emmanuel"} 2024-07-10 09:49:57 {"@service":"bento","label":"","level":"info","msg":"Output type sql_insert is now active","path":"root.output","stream":"Emmanuel"}
Stream :
{ "active": true, "uptime": 6.814057709, "uptime_str": "6.814057788s", "config": { "buffer": { "none": { } }, "input": { "redis_streams": { "body_key": "data", "streams": [ "value-change-Emmanuel" ], "url": "redis://redis:6379" } }, "output": { "sql_insert": { "args_mapping": "[ this.timestamp, this.data.string() ]", "columns": [ "timestamp", "data" ], "driver": "postgres", "dsn": "postgres://fakeuser:fakepasswd@myhost:1234/?database=fakedb&sslmode=disable", "init_statement": "CREATE TABLE IF NOT EXISTS aa (\n timestamp TIMESTAMPTZ NOT NULL,\n data jsonb NOT NULL);\nSELECT create_hypertable('aa', by_range('timestamp'));", "table": "aa" } }, "pipeline": { "processors": [ { "mapping": "\r\n root.data = this\r\n root.data.\"\" = deleted()\r\n root.timestamp = now()" } ] } } }
Stats :
{ "input_connection_failed{label=\"\",path=\"root.input\",stream=\"Emmanuel\"}": 0, "input_connection_lost{label=\"\",path=\"root.input\",stream=\"Emmanuel\"}": 0, "input_connection_up{label=\"\",path=\"root.input\",stream=\"Emmanuel\"}": 1, "input_latency_ns{label=\"\",path=\"root.input\",stream=\"Emmanuel\"}": { "p50": 0, "p90": 0, "p99": 0 }, "input_received{label=\"\",path=\"root.input\",stream=\"Emmanuel\"}": 0, "output_batch_sent{label=\"\",path=\"root.output\",stream=\"Emmanuel\"}": 0, "output_connection_failed{label=\"\",path=\"root.output\",stream=\"Emmanuel\"}": 0, "output_connection_lost{label=\"\",path=\"root.output\",stream=\"Emmanuel\"}": 0, "output_connection_up{label=\"\",path=\"root.output\",stream=\"Emmanuel\"}": 1, "output_error{label=\"\",path=\"root.output\",stream=\"Emmanuel\"}": 0, "output_latency_ns{label=\"\",path=\"root.output\",stream=\"Emmanuel\"}": { "p50": 0, "p90": 0, "p99": 0 }, "output_sent{label=\"\",path=\"root.output\",stream=\"Emmanuel\"}": 0, "processor_batch_received{label=\"\",path=\"root.pipeline.processors.0\",stream=\"Emmanuel\"}": 0, "processor_batch_sent{label=\"\",path=\"root.pipeline.processors.0\",stream=\"Emmanuel\"}": 0, "processor_error{label=\"\",path=\"root.pipeline.processors.0\",stream=\"Emmanuel\"}": 0, "processor_latency_ns{label=\"\",path=\"root.pipeline.processors.0\",stream=\"Emmanuel\"}": { "p50": 0, "p90": 0, "p99": 0 }, "processor_received{label=\"\",path=\"root.pipeline.processors.0\",stream=\"Emmanuel\"}": 0, "processor_sent{label=\"\",path=\"root.pipeline.processors.0\",stream=\"Emmanuel\"}": 0, "uptime_ns": 123757643066 }