scramjetorg / transform-hub

Scramjet Transform Hub (STH) is a runtime supervisor that can run data processing programs called Sequences and manage local resources on any Linux server, Docker on small edge servers, and even large-scale Kubernetes clusters in the cloud or datacenters. It connects to Scramjet Spaces in Scramjet Cloud Platform.
GNU Affero General Public License v3.0
67 stars 8 forks source link

fix/topic streaming bug #932

Closed MichalCz closed 1 year ago

MichalCz commented 1 year ago

What?

Why?

Topics were wrongly redirected to new instances on every request. This fixes the issue.

Also makes topic writers queue up after each other - first topic writer needs to be consumed, before others can start writing.

Also audit stream logs were turned off - these cause a loop of messages.

Usage:

In separate terminals:

yarn start:dev
cat /dev/random | hexdump | head -n 10 | si topic send mytopic
cat /dev/random | hexdump | head -n 10 | si topic send mytopic # yes, again
# wait a little - above terminals should wait and topics should wait
si topic get mytopic
# topics should be consumed only after this command

Clickup Task:

How it works:

Review checks:

These aspects need to be checked by the reviewer:

a-tylenda commented 1 year ago

This PR additionally fixes two issues:

but:

image

In GHA we can notice that sometimes error 415 is returned and sometimes not:

both jobs were done on the same commit of course.

What is more, this error is also returned when you try another way around, meaning:

  1. First si topic get pets (default content type text/plain is used)
  2. Second si topic send pets -t application/x-ndjson (overwritten content type)

image

a-tylenda commented 1 year ago

Also, the method getNamedData() has changed so that now, when you use it on hostClient you need to pass more arguments than just the topic name as it used to be before.

hostClient.getNamedData(topic, {}, "text/plain")

otherwise, you will get a bad request error 400:

image