Open danthegoodman1 opened 2 years ago
One thing to note, for this to be reliable I would want to bring in the pgx
package, as it has a very elegant way of handling the changefeeds and cancelleable queries. It's not something that is currently a dependency.
Hey @danthegoodman1, that'd be awesome. I've been hoping to get some CDC support eventually (https://github.com/Jeffail/benthos/issues/82) but that's been mostly blocked on getting dedicated help from someone with experience so you're very welcome here.
Bringing in new dependencies is fine, the way that components are defined is such that they're included in the main project (at https://github.com/Jeffail/benthos/tree/master/internal/impl) but they're optional imports (https://github.com/Jeffail/benthos/blob/master/public/components/all/package.go), so it'd still be possible to build leaner versions of Benthos without it. I would imagine pgx
was likely coming in at some point anyway.
For persisting offsets it'd be good to support our concept of caches (https://www.benthos.dev/docs/components/caches/about) which would allow users to choose remote destinations if needed. That would mean adding a string field cache
to the config spec of the input you define and using the *Resources
provided to the constructor you register https://pkg.go.dev/github.com/Jeffail/benthos/v3@v3.59.0/public/service#InputConstructor to access it with https://pkg.go.dev/github.com/Jeffail/benthos/v3@v3.59.0/public/service#Resources.AccessCache, so you don't need to build a cache client yourself.
Let me know if you need any support.
Great to hear @Jeffail , I'm sure I'll need plenty of help getting going and I'll know more once I get started :)
As for existing examples using the new impl
API for writing new integrations, which one would you suggest as something to follow? It seems maybe pulsar might be a good one to look at? Or is there an interface in which these integrations need to have specific methods for?
@danthegoodman1 the sql_select
input might be worth copying from as there's some overlap: https://github.com/Jeffail/benthos/blob/master/internal/impl/sql/input_sql_select.go, the implementation is the exact same as public plugins so the documentation is all here: https://pkg.go.dev/github.com/Jeffail/benthos/v3@v3.59.0/public/service.
There's also a video walkthrough for processor plugins that covers how the config fields are defined and used: https://youtu.be/uH6mKw-Ly0g
@Jeffail Thanks, I'll take a look and try to get started today
Hey @danthegoodman1 I have been playing with cockroach CDC also.
The open and closed versions have a slightly different way of doing CDC: https://www.cockroachlabs.com/docs/stable/stream-data-out-of-cockroachdb-using-changefeeds.html
which are you using ?
@gedw99 Core changfeeds
@Jeffail So I think I've gotten far enough to where I have some questions, see https://github.com/danthegoodman1/benthos/blob/crdb-changefeed-input/internal/impl/crdb/input_changefeed.go
Read()
function, should I expect that it keeps getting called such that I process one row per read? The way CDC works is that there is basically a single query with a never-ending stream of rows (blocks forever), so I am not sure how to handle returning single messages (rows). I tried to base this off what I saw from the NATS Jetstream input.Close()
function, all that is needed to be done in terms of graceful shutdown of the CDC is to cancel the query with the cancel func I have in the struct. Just want to make sure I am handling the order of closing correctly (again based on jetstream input)Note: I still need to figure out caching and overriding of the CURSOR
, but I have some docs in place warning about it's usage.
sounds awesome @danthegoodman1
Alright so I think the docs in the public input service answer my question for (1) and it seems I am doing it correctly.
As per the caches, because the core changefeeds do stream a single row at a time, lots of big changing queries can make rows stream in extremely fast. There are 2 ways we can handle this:
file
but it would have to support super ihgh velocity writes.I'd like to get the changefeeds working without the cache first, but just laying down some thoughts on the matter.
@danthegoodman1 yeah a ticker based commit would probably be the best option here, and then a final flush on Close
. Benthos is quite strict about graceful termination so you can be pretty confident that Close
will be called and waited on before shutting down (within certain configurable timeouts).
I'm not 100% opposed to using a specific cache implementation as it's what we do with the kinesis input and some other unique cases, but ideally I'd want to be able to point to either a local filestore for testing or a remote destination for production.
@Jeffail Have there been any performance tests regarding the local filestore? I imagine Redis would be quite good as a remote cache for this regardless. Maybe I'm prematurely optimizing.
I think the ticker based commit would be something configurable so it can be tuned based on the cache.
Maybe something like the badger KV store might be a good addition down the road for built-in caching KV style... but that's another story :)
Regarding data / file store
Nats now has a kv store that is clustered and synchronised . Was introduced a month ago and really changes the game with NATS .
—-
also local caches using genji with NATS is also a proven approach .
https://github.com/simpleiot/simpleiot, has nats and genji synchronised .
explained here : https://github.com/simpleiot/simpleiot/blob/master/docs/architecture.md
With genji you get a mongodb like golang store with no cgo and a sql api to it .
@Jeffail A bit stuck and not sure where to go about debugging. I added lots of debug fmt.Println()
statements and observe the following output:
dangoodman: ~/clusterSpaceCode/benthos git:(crdb-changefeed-input) ✗ docker run --rm -v /Users/dangoodman/clusterSpaceCode/benthos/config.yml:/config.yml -v /tmp/data:/data -p 4195:4195 jeffail/benthos -c /config.yml
## REGISTERED INPUT
## MAKING CRDB
## FINISHING INIT
{"@timestamp":"2021-11-25T13:29:34Z","@service":"benthos","component":"benthos","level":"INFO","message":"Launching a benthos instance, use CTRL+C to close."}
{"@timestamp":"2021-11-25T13:29:34Z","@service":"benthos","component":"benthos","level":"INFO","message":"Listening for HTTP requests at: http://0.0.0.0:4195"}
## CONNECTING
{"@timestamp":"2021-11-25T13:29:34Z","@service":"benthos","component":"benthos.input","level":"DEBUG","message":"Waiting for pending acks to resolve before shutting down."}
{"@timestamp":"2021-11-25T13:29:34Z","@service":"benthos","component":"benthos.input","level":"DEBUG","message":"Pending acks resolved."}
## CLOSING
## CANCELLED CONTEXT
## SHUTDOWN
(see the code for where these debug statements lie)
3 interesting things:
Not sure if you know why this might be the case, I would expect everything to work and I can see the crdb_changefeed
input in the docs, and it is imported and created properly from my config:
input:
crdb_changefeed:
dsn: postgresql://dan:xxxxxx@free-tier.gcp-us-central1.cockroachlabs.cloud:26257/defaultdb?sslmode=require&options=--cluster%3Dportly-impala-2852
tables:
- strm_2
options:
- UPDATED
logger:
prefix: benthos
level: DEBUG
format: json
add_timestamp: true
static_fields:
'@service': benthos
output:
label: ""
stdout:
codec: lines
(That DSN is something I use successfully in my local testing of core changefeeds with pgx)
The full output of the logs:
dangoodman: ~/clusterSpaceCode/benthos git:(crdb-changefeed-input) ✗ docker run --rm -v /Users/dangoodman/clusterSpaceCode/benthos/config.yml:/config.yml -v /tmp/data:/data -p 4195:4195 jeffail/benthos -c /config.yml
## REGISTERED INPUT
## MAKING CRDB
## FINISHING INIT
{"@timestamp":"2021-11-25T13:29:34Z","@service":"benthos","component":"benthos","level":"INFO","message":"Launching a benthos instance, use CTRL+C to close."}
{"@timestamp":"2021-11-25T13:29:34Z","@service":"benthos","component":"benthos","level":"INFO","message":"Listening for HTTP requests at: http://0.0.0.0:4195"}
## CONNECTING
{"@timestamp":"2021-11-25T13:29:34Z","@service":"benthos","component":"benthos.input","level":"DEBUG","message":"Waiting for pending acks to resolve before shutting down."}
{"@timestamp":"2021-11-25T13:29:34Z","@service":"benthos","component":"benthos.input","level":"DEBUG","message":"Pending acks resolved."}
## CLOSING
## CANCELLED CONTEXT
## SHUTDOWN
^C{"@timestamp":"2021-11-25T13:29:39Z","@service":"benthos","component":"benthos","level":"INFO","message":"Received SIGTERM, the service is closing."}
{"@timestamp":"2021-11-25T13:29:54Z","@service":"benthos","component":"benthos","level":"INFO","message":"Unable to fully drain buffered messages within target time."}
{"@timestamp":"2021-11-25T13:29:59Z","@service":"benthos","component":"benthos","level":"ERROR","message":"Failed to stop stream gracefully within target time."}
{"@timestamp":"2021-11-25T13:29:59Z","@service":"benthos","component":"benthos","level":"DEBUG","message":"goroutine profile: total 14\n6 @ 0x4380d6 0x40640c 0x405e78 0xe3a774 0x468c21\n#\t0xe3a773\tgithub.com/klauspost/compress/zstd.(*blockDec).startDecoder+0x93\t/go/src/github.com/Jeffail/benthos/vendor/github.com/klauspost/compress/zstd/blockdec.go:212\n\n1 @ 0x40b8f4 0x4651b8 0x1d9bd39 0x468c21\n#\t0x4651b7\tos/signal.signal_recv+0x97\t/usr/local/go/src/runtime/sigqueue.go:169\n#\t0x1d9bd38\tos/signal.loop+0x18\t\t/usr/local/go/src/os/signal/signal_unix.go:24\n\n1 @ 0x4380d6 0x40640c 0x405e38 0x1170c2d 0x468c21\n#\t0x1170c2c\tgithub.com/ClickHouse/clickhouse-go.init.0.func1+0x2c\t/go/src/github.com/Jeffail/benthos/vendor/github.com/ClickHouse/clickhouse-go/bootstrap.go:48\n\n1 @ 0x4380d6 0x40640c 0x405e38 0x1dbc465 0x468c21\n#\t0x1dbc464\tgithub.com/Jeffail/benthos/v3/lib/service.cmdService.func3.2+0x44\t/go/src/github.com/Jeffail/benthos/lib/service/service.go:437\n\n1 @ 0x4380d6 0x40640c 0x405e78 0x13d74ea 0x468c21\n#\t0x13d74e9\tgithub.com/golang/glog.(*loggingT).flushDaemon+0x69\t/go/src/github.com/Jeffail/benthos/vendor/github.com/golang/glog/glog.go:882\n\n1 @ 0x4380d6 0x447e72 0x154a359 0x468c21\n#\t0x154a358\tgo.opencensus.io/stats/view.(*worker).start+0xb8\t/go/src/github.com/Jeffail/benthos/vendor/go.opencensus.io/stats/view/worker.go:276\n\n1 @ 0x4380d6 0x447e72 0x1dcbcef 0x199a11f 0x435035 0x44ba47 0x44ba17 0x1ec2be9 0x1ec7f51 0x1dcbe6c 0x1dcb82b 0x1999d96 0x1998fe2 0x468c21\n#\t0x1dcbcee\tgithub.com/Jeffail/benthos/v3/public/service.(*airGapReader).WaitForClose+0x6e\t\t/go/src/github.com/Jeffail/benthos/public/service/input.go:147\n#\t0x199a11e\tgithub.com/Jeffail/benthos/v3/lib/input.(*AsyncReader).loop.func1+0x5e\t\t\t/go/src/github.com/Jeffail/benthos/lib/input/async_reader.go:84\n#\t0x435034\truntime.gopanic+0x214\t\t\t\t\t\t\t\t\t/usr/local/go/src/runtime/panic.go:1038\n#\t0x44ba46\truntime.panicmem+0x326\t\t\t\t\t\t\t\t\t/usr/local/go/src/runtime/panic.go:221\n#\t0x44ba16\truntime.sigpanic+0x2f6\t\t\t\t\t\t\t\t\t/usr/local/go/src/runtime/signal_unix.go:735\n#\t0x1ec2be8\tgithub.com/jackc/pgx/v4/pgxpool.ConnectConfig+0x28\t\t\t\t\t/go/src/github.com/Jeffail/benthos/vendor/github.com/jackc/pgx/v4/pgxpool/pool.go:162\n#\t0x1ec7f50\tgithub.com/Jeffail/benthos/v3/internal/impl/crdb.(*crdbChangefeedInput).Connect+0x90\t/go/src/github.com/Jeffail/benthos/internal/impl/crdb/input_changefeed.go:138\n#\t0x1dcbe6b\tgithub.com/Jeffail/benthos/v3/public/service.(*autoRetryInput).Connect+0x2b\t\t/go/src/github.com/Jeffail/benthos/public/service/input_auto_retry.go:52\n#\t0x1dcb82a\tgithub.com/Jeffail/benthos/v3/public/service.(*airGapReader).ConnectWithContext+0x2a\t/go/src/github.com/Jeffail/benthos/public/service/input.go:114\n#\t0x1999d95\tgithub.com/Jeffail/benthos/v3/lib/input.(*AsyncReader).loop.func3+0xb5\t\t\t/go/src/github.com/Jeffail/benthos/lib/input/async_reader.go:105\n#\t0x1998fe1\tgithub.com/Jeffail/benthos/v3/lib/input.(*AsyncReader).loop+0x301\t\t\t/go/src/github.com/Jeffail/benthos/lib/input/async_reader.go:122\n\n1 @ 0x4380d6 0x447e72 0x1ec8925 0x1dcc6ec 0x1dcbc58 0x468c21\n#\t0x1ec8924\tgithub.com/Jeffail/benthos/v3/internal/impl/crdb.(*crdbChangefeedInput).Close+0x1a4\t/go/src/github.com/Jeffail/benthos/internal/impl/crdb/input_changefeed.go:201\n#\t0x1dcc6eb\tgithub.com/Jeffail/benthos/v3/public/service.(*autoRetryInput).Close+0x2b\t\t/go/src/github.com/Jeffail/benthos/public/service/input_auto_retry.go:111\n#\t0x1dcbc57\tgithub.com/Jeffail/benthos/v3/public/service.(*airGapReader).CloseAsync.func1+0x37\t/go/src/github.com/Jeffail/benthos/public/service/input.go:140\n\n1 @ 0x462e25 0xad1d75 0xad1b8d 0xaced0b 0x1d4f25b 0x1db9df3 0x1dbc2e3 0x1dbbcb3 0x1dbe93c 0x1d6cc68 0x1db884e 0x1db8827 0x2146697 0x437d07 0x468c21\n#\t0x462e24\truntime/pprof.runtime_goroutineProfileWithLabels+0x24\t\t\t/usr/local/go/src/runtime/mprof.go:746\n#\t0xad1d74\truntime/pprof.writeRuntimeProfile+0xb4\t\t\t\t\t/usr/local/go/src/runtime/pprof/pprof.go:724\n#\t0xad1b8c\truntime/pprof.writeGoroutine+0x4c\t\t\t\t\t/usr/local/go/src/runtime/pprof/pprof.go:684\n#\t0xaced0a\truntime/pprof.(*Profile).WriteTo+0x14a\t\t\t\t\t/usr/local/go/src/runtime/pprof/pprof.go:331\n#\t0x1d4f25a\tgithub.com/Jeffail/benthos/v3/lib/stream.(*Type).Stop+0x23a\t\t/go/src/github.com/Jeffail/benthos/lib/stream/type.go:369\n#\t0x1db9df2\tgithub.com/Jeffail/benthos/v3/lib/service.(*swappableStopper).Stop+0xd2\t/go/src/github.com/Jeffail/benthos/lib/service/service.go:207\n#\t0x1dbc2e2\tgithub.com/Jeffail/benthos/v3/lib/service.cmdService.func3+0x262\t/go/src/github.com/Jeffail/benthos/lib/service/service.go:452\n#\t0x1dbbcb2\tgithub.com/Jeffail/benthos/v3/lib/service.cmdService+0x1012\t\t/go/src/github.com/Jeffail/benthos/lib/service/service.go:480\n#\t0x1dbe93b\tgithub.com/Jeffail/benthos/v3/lib/service.Run.func2+0x2db\t\t/go/src/github.com/Jeffail/benthos/lib/service/run.go:218\n#\t0x1d6cc67\tgithub.com/urfave/cli/v2.(*App).RunContext+0x7a7\t\t\t/go/src/github.com/Jeffail/benthos/vendor/github.com/urfave/cli/v2/app.go:322\n#\t0x1db884d\tgithub.com/urfave/cli/v2.(*App).Run+0xd6d\t\t\t\t/go/src/github.com/Jeffail/benthos/vendor/github.com/urfave/cli/v2/app.go:224\n#\t0x1db8826\tgithub.com/Jeffail/benthos/v3/lib/service.Run+0xd46\t\t\t/go/src/github.com/Jeffail/benthos/lib/service/run.go:359\n#\t0x2146696\tmain.main+0x16\t\t\t\t\t\t\t\t/go/src/github.com/Jeffail/benthos/cmd/benthos/main.go:12\n#\t0x437d06\truntime.main+0x226\t\t\t\t\t\t\t/usr/local/go/src/runtime/proc.go:255\n"}
Sneaking suspicion it has something to do with those ACKs, but I have no idea why because there are messages in the changefeed it should read immediately.
The stack trace looks as though Connect
is being blocked on pgxpool.ConnectConfig
. Not sure why the service would shut down immediately, maybe try without your plugin and something like http_server
to make sure it's not unrelated., and maybe also try removing service.AutoRetryNacks
to see if there's some odd interaction going on.
Didn't think about the nacks, will try that.
@Jeffail http seems to be fine as an input, and removing the nack did not seem to help
My guess would be this for the immediate shutdown, however I never get that READING log:
fmt.Println("## READING")
if c.pgPool == nil && c.rows == nil {
return nil, nil, service.ErrNotConnected
}
if c.rows == nil {
return nil, nil, service.ErrEndOfInput
}
Hmm, when I change the line to fmt.Println("## CONNECTING to", c.pgConfig.ConnString())
I don't get the log... that tells me a lot..
Lmao I figured it out, I had everything under a if err != nil {}
:)
There it is:
## READING
{"primary_key":"[\"414b8f21-9b1c-4a97-b2c8-eb27295431d9\", 1]","row":"{\"after\": {\"k\": \"414b8f21-9b1c-4a97-b2c8-eb27295431d9\", \"v\": 1}, \"updated\": \"1637874650209908152.0000000000\"}","table":"strm_2"}
I'll get this cleaned up and try to write some tests either today or tomorrow :)
Still having issues with closing, looking for similar bugs but it does not seem like the Close()
function is being called on ^C
:
## SENDING NEW MESSAGE
## READING
{"primary_key":"[\"0ab7abb5-94ca-4cbe-9a59-0065f39ecc23\", 2]","row":"{\"after\": {\"k\": \"0ab7abb5-94ca-4cbe-9a59-0065f39ecc23\", \"v\": 2}, \"updated\": \"1637874693016437872.0000000000\"}","table":"strm_2"}
^C{"@timestamp":"2021-11-25T21:12:02Z","@service":"benthos","component":"benthos","level":"INFO","message":"Received SIGTERM, the service is closing."}
{"@timestamp":"2021-11-25T21:12:17Z","@service":"benthos","component":"benthos","level":"INFO","message":"Unable to fully drain buffered messages within target time."}
{"@timestamp":"2021-11-25T21:12:22Z","@service":"benthos","component":"benthos","level":"ERROR","message":"Failed to stop stream gracefully within target time."}
I think the issue here could be that the if c.rows.Next() {
line will block, since it will keep waiting for a row. That will prevent closure is my guess since it is still waiting for Read()
to return
@Jeffail Is there any way that I can tell benthos it is allowed to abort an in-progress read without waiting for the timeout?
I wonder if what you're looking for is shutdown_timeout
. See the "Full" section here. The default value is 20s
.
@mihaitodor Sorry if I have not been clear enough. The issue is that the Close()
function never gets called. Shutdown happens fine after the timeout, but I'd rather get closure working properly instead of reducing the timeout.
My guess as to why it is never called is because the Read()
function (in particular the c.rows.Next()
function inside) blocks forever until there is a message. I haven't found in the code but my guess is that this blocks the Close()
function from being called since there is still a read in progress - benthos will let the read gracefully finish.
I looked at some of the Pulsar code and saw some ReadWithContext()
and CloseAsync()
functions and wonder if I might need to implement that way. Immediatelly changing the Close()
to CloseAsync()
did not seem to invoke it either.
Even if I had a close signal I could listen for just to cancel the query context, that would finish the Read()
with the ErrEndOfInput
and would trigger a close by itself.
I see:
Input is an interface implemented by Benthos inputs. Calls to Read should block until either a message has been received, the connection is lost, or the provided context is cancelled.
Hmm yeah that's pretty awkward, ideally if rows.Next
is blocking it would have a context argument. Since it doesn't you're kind of force to run that on a goroutine and maybe push rows over a channel which is consumed by Read
, that way you can use the context to cancel the read.
Also ignore the CloseAsync
etc methods as they're old.
@Jeffail The context is used in the .Query()
function in the Connect()
method. This is the weirdness of CRDB changefeeds how it is basically a never ending query.
If I can somehow cancel that, it would work... Let me try using the passed in context for the Query instead to see if that works
Otherwise I will do the suggested method with a goroutine and channel
Does the Read()
context get cancelled when close is called? I assume so
You need to be a bit careful as the context provided to Read
will be closed when the service is shutting down, and also when the Read is finished, so if you use that context to cancel the long running query it'll end after the first read.
Hmm that is tricky. I'll see what I can do. I am thinking I still cancel the Query in the close function, but I listen for that Read() cancel to tell when to return an end of input error.
Alright that seems to have worked well, still having some issues with the Close()
function blocking at the select
:
func (c *crdbChangefeedInput) Close(ctx context.Context) error {
fmt.Println("## CLOSING")
c.cancelFunc()
fmt.Println("## CANCELLED CONTEXT")
c.pgPool.Close()
fmt.Println("## CLOSED POOL")
c.shutSig.CloseNow()
fmt.Println("## SHUTDOWN")
select {
case <-c.shutSig.HasClosedChan():
case <-ctx.Done():
return ctx.Err()
}
fmt.Println("## EXITING")
return nil
}
Output:
## CLOSING
## CANCELLED CONTEXT
{"@timestamp":"2021-11-26T15:58:53Z","@service":"benthos","component":"benthos.input","level":"DEBUG","message":"Read context cancelled, ending"}
{"@timestamp":"2021-11-26T15:58:53Z","@service":"benthos","component":"benthos.input","level":"DEBUG","message":"Waiting for pending acks to resolve before shutting down."}
{"@timestamp":"2021-11-26T15:58:53Z","@service":"benthos","component":"benthos.input","level":"DEBUG","message":"Pending acks resolved."}
## CLOSED POOL
## SHUTDOWN
{"@timestamp":"2021-11-26T15:59:08Z","@service":"benthos","component":"benthos","level":"INFO","message":"Unable to fully drain buffered messages within target time."}
{"@timestamp":"2021-11-26T15:59:13Z","@service":"benthos","component":"benthos","level":"ERROR","message":"Failed to stop stream gracefully within target time."}
{"@timestamp":"2021-11-26T15:59:13Z","@service":"benthos","component":"benthos","level":"DEBUG","message":"goroutine profile: total 14\n6 @ 0x4380d6 0x40640c 0x40
Changing to c.shutSig.ShutdownComplete()
worked awesome, but getting a panic on exit:
## EXITING
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x1dcb992]
goroutine 22 [running]:
github.com/Jeffail/benthos/v3/public/service.(*airGapReader).ReadWithContext(0xc000a525a0, {0x31dfc78, 0xc001245040})
/go/src/github.com/Jeffail/benthos/public/service/input.go:132 +0xf2
github.com/Jeffail/benthos/v3/lib/input.(*AsyncReader).loop(0xc0003db0a0)
/go/src/github.com/Jeffail/benthos/lib/input/async_reader.go:130 +0x3da
created by github.com/Jeffail/benthos/v3/lib/input.NewAsyncReader
/go/src/github.com/Jeffail/benthos/lib/input/async_reader.go:63 +0x2b5
Another hurdle is that the .Query()
function blocks until there is a row available. I've moved it into the Read()
function however through this I have found tha the pool.Close()
blocks if the first row has not become available.
I've found in this exit scenario, the c.rows
is nil
in the Close()
, but not after the .Query()
cancels and returns. Found that the rows needed to be closed in the Read()
if we cancelled before finding a row. Back to the exiting panic but otherwise everything works well.
One time it did not panic on exit, not sure if that indicates something.
I think the msg is nil in the input.go
line. I think because there are places where I am returning nil
for everything on close, so instead I am returning service.ErrEndOfInput
as the err
so it knows not to try to read the nil
msg. This seems to have done the trick.
The last thing to do is write some tests, then it should be ready for PR.
I will do the caching in another PR but getting an initial version working is the goal of this one.
First PR is up.
Once this goes in I will make more issues for the other features listed in the PR
This was addressed in ab0a2bc and released in v4.25.0.
I've been working on my own CRDB Changefeeds as a standalone repo, but I think it might fit really well in here.
Would want support for many of the options such as
WITH UPDATE
, as well as tracking the offset timestamp to disk for restart durability (prevent re-emitting as much as possible).