ArroyoSystems / arroyo

Distributed stream processing engine in Rust
https://arroyo.dev
Apache License 2.0
3.45k stars 189 forks source link

feat: Add NATS source/sink connector #577

Closed gbto closed 3 months ago

gbto commented 3 months ago

Opening the PR to get some feedbacks. This is a refactored version of https://github.com/gbto/arroyo/tree/feat/add-nats-source-connector which I've been using to run 6 pipelines in development for now one month on arrow = { version = "0.9.0" }. The source only support consuming from NATS Streams and publishing to NATS Subjects. Both supports leverage connection profiles and tables configurations, with environment variable substitution for secrets.

The tests are currently not implemented yet, neither are features that were note absolutely required to get it to work in our context of our architecture (e.g. autocomplete, schema validation and registry support, different semantics or deduplication..,).

There still are a few flawed feature, in particular the way optional extra client configuration are parsed and the checkpointing in the sink connector. Comments on these 2 points would be really appreciated and implemented asap.

-- source configuration
create table demo_source (
    value text
    ) with (
    type = 'source',
    connector = 'nats',
    servers = 'nats-1:4222,nats-2:4222',
    'nats.stream' = 'demo-source,
    'auth.type' = 'credentials',
    'auth.username' = '{{ NATS_USER }}',
    'auth.password' = '{{ NATS_PASSWORD }}',
    format = 'json',
    'json.unstructured' = 'true'
);
-- sink configuration
create table demo_sink (
   value text
) with (
    type = 'sink',
    connector = 'nats',
    servers = 'nats-1:4222,nats-2:4222',
    'nats.subject' = 'demo.topic,
    'auth.type' = 'credentials',
    'auth.username' = '{{ NATS_USER }}',
    'auth.password' = '{{ NATS_PASSWORD }}',
    format = 'json',
    'json.unstructured' = 'true'
);