nsqio / nsq

A realtime distributed messaging platform
https://nsq.io
MIT License
24.93k stars 2.9k forks source link

nsqd: changes to support IoT usage, avoid data loss #855

Open mcorb opened 7 years ago

mcorb commented 7 years ago

I am using nsq as a store-and-forward agent for IoT devices that report metrics to a central HTTPS API.

The small devices are offline for periods of time, and need to send back data opportunistically. The power supply is also intermittent.

You can think of the use case as a passenger bus instrumentation, collecting metrics between stops where wifi is available, and the power supply going up and down in between those bus stops.

As with most IoT devices the flash storage has really limited write lifetime.

I've identified the following nsqd options relevant to this use-case:

  -mem-queue-size int
        number of messages to keep in memory (per topic/channel) (default 10000)
  -sync-timeout duration
        duration of time per diskqueue fsync (default 2s)
  -max-output-buffer-timeout duration
        maximum client configurable duration of time between flushing to a client (default 1s)

Our first approach was to disable the memory queue fully with -mem-queue-size 0 and setting -sync-timeout 20s.

However this isn't working out for a couple of reasons:

  1. The OS is likely to flush writes sooner than 20s, so we are still wearing down the flash storage. We want to avoid writing to disk except every 20s, something which can't be guaranteed by disabling the memory queue.
  2. "in-flight" and "rescheduled" messages seem to remain solely in memory, sometimes for several minutes or longer, even when the memory queue is disabled as above. I get the impression these messages are needlessly lost when the power supply is cut. I may be wrong - is it possible to sync in-flight and deferred messages to disk aggressively (say every 20s)?

My next attempt was to implement flushing of the memory queue to to the disk queue as below (based on the first part of Channel.flush()) called by a new 20s ticker:

+++ b/nsqd/channel.go
+func (c *Channel) flushToDisk() error { ... }

This way we can enable the memory queue, allowing good performance and avoiding disk wear, while still achieving data safety guarantees.

Two problems with this:

  1. It now flushes the entire memory queue to disk every 20s. Ideally we would like to avoid committing short-lived messages to disk that happen to be in the memory queue at that moment.
  2. This still doesn't seem to solve in-flight and deferred messages stuck in memory. We could commit them to disk too as Channel.flush() does but that would break the semantics of operation.

I'd like to get a recommendation from the nsqd developer community on how to proceed. It looks like whatever change won't be more than a few lines of code, but I'm not sure of the best way to solve these requirements.

I'll be glad to submit patches and documentation for IoT use once we hear the best approach from an experienced developer.

mcorb commented 7 years ago

I found #34 (nsqd: disk-backed deferred queue) and #376 related to this issue from 2012 and 2014 respectively. Both were closed in favour of #510 and #625. The WAL feature looks like it's on the back-burner.

So it looks like persisting deferred queue is not trivial with the current codebase.

In contrast I'm looking for something simple, and I don't have strong requirements for the semantics of deferred messages to be preserved.

So here's my spec for a third attempt at poor-man's resilience:

  1. Adapt the existing Channel.flush() which is currently called only during process shutdown and moves everything to the disk queue. Let's call this flushPartial()
  2. Provide user option, tentatively disk-flush-interval
  3. The new flushPartial() is called every disk-flush-interval (say 20s)
  4. It removes all messages that are not scheduled to go out within the next disk-flush-interval from the deferred queue, plus all in-flight messages older than, say disk-flush-interval, and saves them to the disk queue.
  5. Maybe require that -max-req-timeout < -disk-flush-interval and -max-msg-timeout < -disk-flush-interval on the nsqd command-line. Thus the wire protocol will reflect the semantics.

This pretty simple (maybe 100 LoC) change will give a lifeline "resilience" mode where users can elect to restrict requeue times in order to achieve a degree of safety from powercuts.

When the WAL feature @mreiferson is working on lands the above stopgap can be easily removed and users will just get improved functionality with a natural upgrade path.

mreiferson commented 7 years ago

Hi @mcorb, thanks for the feedback and suggestions.

As you've correctly discovered, the ultimate solution to this problem would be to complete #510 (as implemented by #625). While your proposal might address the issues, the changes don't feel like generally useful additions and would serve as a rather specific stop-gap for this particular use case.

I'm curious about the system's architecture though — have you considered not putting nsqd on the devices but rather at the collection edge? You could then write a much simpler service for the device itself (perhaps using https://github.com/nsqio/go-diskqueue, which we pulled out of this repo in #847) and still benefit from NSQ in your backend (where it was designed to operate).

dimaqq commented 7 years ago

@mcorb ~5 years ago I dealt with exact same problem, where user-mode code used sqlite.

Ultimately, it turned out that no production ready (reliable) file-systems were good enough, as sqlite (with or without WAL) writes to same place on each transaction (typ. 3 or 4 times), and standard filesystems (ext, xfs, etc.) map that to exact same block on the disk.

Flash cards advertise wear-levelling, but tests of both commercial and consumer cards showed that it's a joke. In fact, consumer cards were more reliable.

The system solution would be to use a log-structured file system on the data (rw) volume. YMMV wrt. recovering the volume after power outage, do your tests 😄

The ad-hoc solution was to buffer data in RAM in the user code for up to N events, up to M minutes. This assures a statistical bound on how much data may be lost in the system.

In any case, I would advise against placing this functionality into nsqd itself.