Closed skinkie closed 10 months ago
There is a maximum of 2000 retained messages currently. This could be make configurable. There has to be some maximum, otherwise if you have a million and clients subscribe to '#', you get overloads. 2000 was chosen as higher than some clients are willing to receive.
That last bit is important, because your client may also be the limiting factor.
What is your message volume?
Much higher than 2000. My rule of thumb: 5 public broadcasters 24 hours 100 days in the future? The idea is that a connected client synchronizes state, and just follows up. So having the ability to configure this would certainly help.
https://github.com/halfgaar/FlashMQ/blob/master/subscriptionstore.cpp#L607
Sure, I'll get that done. However, clients may not take kindly to it either.
Also note that MQTT isn't event streaming. Instead, topics have a value. If you write to the same topic twice, the retained value is the last one.
Sure, I'll get that done. However, clients may not take kindly to it either.
Thanks as always :-)
Also note that MQTT isn't event streaming. Instead, topics have a value. If you write to the same topic twice, the retained value is the last one.
Exactly, what happens is that a playlist update replaces the topic.
On another note, I have been experimenting with CRDT's over MQTT in FlashMQ. That is fun too :-)
I made a branch: https://github.com/halfgaar/FlashMQ/tree/retained-delivery-limit
I think this suits you?
On another note, I have been experimenting with CRDT's over MQTT in FlashMQ. That is fun too :-)
I had to look up what that is :). Sounds interesting.
I made a branch: https://github.com/halfgaar/FlashMQ/tree/retained-delivery-limit
I think this suits you?
Source looks good, but did not compile it yet.
On another note, I have been experimenting with CRDT's over MQTT in FlashMQ. That is fun too :-)
I had to look up what that is :). Sounds interesting.
As opposed to running specialistic websocket instances, without tthe ability to do some authentication, it it remains an interesting usecase for persistance of documents (and the decoupling).
I'll probably make a release today then.
Version 1.6.7 with the change has been released.
@halfgaar I am now running FlashMQ Version 1.6.7 with SSE4.2 support (ubuntu) But I still end up with only a few retained messages using mosquitto_sub as a test. That few is about 334 messages. Configuration contains retained_messages_delivery_limit 65535
I am publishing via:
clock = Clock.objects.get(clock_id=clock_id)
now = datetime.datetime.now(tz=timezone.utc)
properties = Properties(PacketTypes.PUBLISH)
properties.MessageExpiryInterval = (clock.end_time - now).seconds + 600 # retain messages for x seconds
if settings.MQTT_USER != '':
client.publish(topic=f"portal/clocks/{clock.clock_id}", payload=json.dumps(render_clock(clock, radios, broadcasters), default=json_serial, indent=4) , retain=True, qos=0, properties=properties)
Can you also try setting client_max_write_buffer_size
, to something like 16777216
?
And how many 'clocks' do you have?
You may be running into what I said though; clients have limits too. When I tested with millions of retained messages, mosquitto_sub
only gave me 1500 or so.
@halfgaar incrementing client_max_write_buffer_size gives me 4819 topics via mosquitto_sub. So that certainly improves the situation. I cannot tell you exactly how much I would expect. But th 4819 / 5 / 24 suggests about 30 days of data. Doubling client_max_write_buffer_size it again gives me 9285 topics. Adding some more brings me to 10k. Hence I guess this limit was the other culpit.
You're kind of operating on the fringes for MQTT, if your total volume is that big. You may benefit from some segmentation. Software often does this trick on disk storage, to avoid very large directory sizes.
For instance, you could modulo your clock_id
with 32 and have 32 trees:
portal/clocks/0/{clock.clock_id}
portal/clocks/1/{clock.clock_id}
...
portal/clocks/31/{clock.clock_id}
When you subscribe to a path, you get a SUBACK
and the messages. On receiving either of those, subscribe to the next, etc.
This will change the order, but retained messages don't have an order anyway.
@halfgaar order is not of my interest. But if I take for example the subsription portal/clocks/#
with or without segmentation, would this lead to smaller packages? Or would this only work if you have smart clients taking 0-31 individually.
In my case I could segment on radio, but if it would only work with individual subscriptions, and certainly not via portal/radio/+/clocks/#
then it does not feel like added benefit.
That won't help, no. The effect is the same: the message tree is walked and matching messages are deposited in the client buffer until full. By using the technique I described, the buffers are flushes periodically.
Your requirements sound more like you need Kafka, which is pull-based.
It seems that I am not getting all the (expected) retained messages. Is there a configurable maximum for storage?