LapoElisacci / Ksql

ksqlDB Client for Ruby
MIT License
25 stars 3 forks source link

Closing stream / rebuild stream handler #17

Closed ook closed 1 year ago

ook commented 2 years ago

I try to use Ksql gem to stream changes into a Rails view. It works great so far.

I meet difficulty about closing the stream: the ksqldb stream subscription occur in a rails action, I was looking for a way to close a ksqldb stream from another rails action.

It seems I can't "rebuild" a stream handler from stream = Ksql::Client.stream("SELECT * FROM my_stream EMIT CHANGES;") to call stream.close (BTW, there's a typo on https://github.com/LapoElisacci/Ksql/blob/main/lib/ksql/stream.rb#L18 s/stared/started/ ) KSQLDB REST API seems to only propose to close the connection. How would you close a stream handler in such situation? Thank you.

LapoElisacci commented 2 years ago

Hi @ook, thanks for your feedback!

If you know what's the STREAM ID, you should be able to close a stream by calling the close_query endpoint with the client https://github.com/LapoElisacci/Ksql/blob/9f2ba15f812c7890a0e88773ab36887327666bcf/lib/ksql/client.rb#L14

Please, let me know if that worked.

I'll fix the typo asap.

Thanks

ook commented 2 years ago

Thank you again for that quick response, @LapoElisacci ! Hum I see.

It seems to close successfully the query, but return a generic error:

Loading development environment (Rails 6.1.4.7)
[1] pry(main)> Ksql::Client.ksql('show queries;')
=> Ksql::Queries {
  :queries       => [
    [0] {
      "id"              => "transient_S_GROW_EVENTS_2830845903553965693",
      "queryString"     => "SELECT * FROM s_grow_events EMIT CHANGES;",
      "queryType"       => "PUSH",
      "sinkKafkaTopics" => [],
      "sinks"           => [],
      "state"           => "RUNNING",
      "statusCount"     => {
        "RUNNING" => 1
      }
    }
  ],
  :statementText => "show queries;",
  :warnings      => []
}
[2] pry(main)> Ksql::Client.close_query("transient_S_GROW_EVENTS_2830845903553965693")
=> Ksql::Error {
  :@type      => "generic_error",
  :error_code => 50000,
  :message    => "On wrong context or worker"
}
[3] pry(main)> Ksql::Client.ksql('show queries;')
=> Ksql::Queries {
  :queries       => [],
  :statementText => "show queries;",
  :warnings      => []
}

I notice that it doesn't trigger the stream.on_close block setup in the "initial" action. I'll search a better way to achieve my goal.

LapoElisacci commented 2 years ago

Hey @ook, that's interesting thanks for sharing!

If you find out a way to better handle this scenario, please feel free to open a PR or suggest some changes here.

Thanks!

LapoElisacci commented 2 years ago

@ook Version 0.1.1 released, I hope you don't mind that I mentioned you as a contributor 😄