SNEWS2 / SNEWS_Publishing_Tools

Publishing Tool for SNEWS
BSD 3-Clause "New" or "Revised" License
2 stars 2 forks source link

Size of message problems with hopskotch? #87

Open habig opened 8 months ago

habig commented 8 months ago

When trying to push through the timing tier messages, we got some kafka warnings suggesting that we were trying to jam through too much.

We need to replicate that with a well-specified test case, collect the exact error message, and take it to SCIMMA to see what they think is happening.

justinvasel commented 6 months ago

It would be good to know which error was thrown. Does anyone have a log file?

I suspect this is unrelated, but just in case, scimma included a bug fix in hop-client v0.9.0 (snews_pt uses v0.8.0) for when hopskotch expects JSON data but receives a binary data instead and chokes. https://github.com/scimma/hop-client/pull/198

justinvasel commented 3 months ago

I have an idea for a solution.

According to Kafka documentation, there is a maximum message size of 1,048,588 bytes (~1 MB), but it is configurable.

For the example that @KaraMelih mentioned in PR #98 of 100,000 integers, the size of those integers are 800,056 bytes, which is less that the threshold above, but we're sending multiple SNEWS messages in one go, so it quickly adds up.

There are multiple paths in between SNEWSMessageBuilder and kafka, which looks like this:

snews_pt.messages.SNEWSMessageBuilder.send_messages
└── snews_pt.messages.Publisher.send
    └── hop.io.Stream.open
        └── hop.io.Stream.Producer
            └── adc.producer.ProducerConfig

That last class—adc.proder.ProducerConfig—takes message_max_bytes as a keyword argument, and the methods in the aforementioned chain starting at hop.io.Stream.open pass **kwargs the rest of the way down.

So... We should be able to increase the maximum message size by passing the kwarg into here: https://github.com/SNEWS2/SNEWS_Publishing_Tools/blob/f9b990aea4a63d989b46307febbf9c2f1a354574/snews_pt/messages.py#L51-L53

This would be the appropriate change (for 5x max message size):

self.stream = Stream(until_eos=True, auth=self.auth).open(
    self.obs_broker, 
    'w', 
    message_max_bytes=5242940  # <--- New
)

I have not tested this.

habig commented 3 months ago

The scimma people report that the default max message size is 1MB. They're leery to increase that to head off future scaling problems, and are working on a large file offload service. But: now knowing this we can plan for it, with efficiencies like this PR helping. Could imagine daisy-chaining things together like SMS messages that go over 140 characters, too.

justinvasel commented 3 months ago

@habig: and if that default is something we can override, like in the example above, then problem solved! No need for them to change the default on their end.

habig commented 1 week ago

We were just discussing this now.

If raising the infrastructure limit is the answer (we need to test this), then can we raise it enough to solve the problem? If not, then we'd need to "packetize" things anyway. If we have to write that code, then living within our 1MB per packet means is probably the way to go.

Marta says about 1s of data in the last test fit in the 1MB. So we'd need an order of magnitude more overhead to make ~10s of light curve go in one shot.

KaraMelih commented 1 week ago

We should in any case, warn the user about the large message size and what we do to process. Something like; "Input data is larger than >1mb, we will chunk it into pieces and send several messages" etc. So that if something goes wrong, it is more obvious where it might have gone wrong.

Alternative to chunking, @justinvasel looked into compression options. We should probably check if kafka offers any of these internally. Then, we also need to validate whatever chunked/compressed gets unchunked/decompressed properly on the other end.