Safecast / safecastapi

The app that powers api.safecast.org
44 stars 25 forks source link

Dedupe ingest measurements #352

Open matschaffer opened 7 years ago

matschaffer commented 7 years ago

The way queuing happens upstream of ingest (ttserve and "PCN") can cause duplicate messages.

We should have some de-duping at the ingest layer.

Should be something like "equality of the secure hash of the JSON of any message received within an N-second time window is a duplicate", where the time window is a factor.

So a bit different then the plain md5-ing we have in api today.

See https://safecast.slack.com/archives/api/p1485620246001357 for points from @rayozzie

matschaffer commented 7 years ago

@eitoball when working through this keep #356 in mind where writes to the measurements table may get buffered.

Thinking maybe redis or at least an isolated table just for the purposes of deduping will be in order here.

Frangible commented 7 years ago

Per Ray:

Yes, you’ve likely noticed that “when_captured” will be identical but “when_uploaded” will likely be different, as will transport - but that is only true for devices that have a GPS and thus which can reasonably compute when_captured.

I believe that if you isolated all of the duplicate checking to a method, or created a unique ID field from the hash of something, it would have the following logic:

  • if the when_captured is supplied, it is sufficient to hash as a unique id. No two samples will ever realistically occur within a single second, at least for solarcast
  • if when_captured is not supplied, then create a unique id out of the hash of all fields except when_uploaded and net_transport.

Thus, a theoretical table constraint (needs testing, I'm not 100% on the syntax) might be:

ALTER TABLE measurements
    ADD CONSTRAINT no_dupes
    EXCLUDE USING gist(
        jsonb payload - 'net_transport' - 'when_uploaded'
        WITH =
    );

Theoretically, the hash of the payload minus the problematic fields should be stored as an indexed column using this approach, making it reasonably performant.

If we want to limit the time range that is compared, that should also be possible though messier:

ALTER TABLE measurements
    ADD CONSTRAINT no_dupes
    EXCLUDE USING gist(
        jsonb payload - 'net_transport' - 'when_uploaded'
        WITH =,
        tsrange(
            immutable_tstamp(payload->>'when_uploaded' - interval '60 seconds'),
            immutable_tstamp(payload->>'when_uploaded' + interval '60 seconds')
        ) WITH &&
    );

edit

Sample dupe rows:

{"device": 1188192954, "loc_lat": 42.565, "loc_lon": -70.784, "lnd_7318u": 32, "lnd_7128ec": 14, "net_transport": "lora:0004A30B001BC6B9", "when_uploaded": "2017-03-02T00:54:37Z"}
{"device": 1188192954, "loc_lat": 42.565, "loc_lon": -70.784, "lnd_7318u": 32, "lnd_7128ec": 14, "net_transport": "lora:0004A30B001AE51D", "when_uploaded": "2017-03-02T00:54:37Z"}
{"device": 1188192954, "loc_lat": 42.565, "loc_lon": -70.784, "env_temp": 10.67224, "env_humid": 99.9302, "lnd_7318u": 34, "bat_charge": 91.96484, "lnd_7128ec": 13, "bat_voltage": 4.12125, "net_transport": "lora:0004A30B001AE51D", "when_uploaded": "2017-03-02T00:39:36Z"}
{"device": 1188192954, "loc_lat": 42.565, "loc_lon": -70.784, "env_temp": 10.67224, "env_humid": 99.9302, "lnd_7318u": 34, "bat_charge": 91.96484, "lnd_7128ec": 13, "bat_voltage": 4.12125, "net_transport": "lora:0004A30B001BC6B9", "when_uploaded": "2017-03-02T00:39:36Z"}
{"device": 2565454211, "pms_csecs": 100, "pms_c00_30": 45177, "pms_c00_50": 12819, "pms_c01_00": 1439, "pms_c02_50": 121, "pms_c05_00": 17, "pms_c10_00": 0, "pms_pm01_0": 5, "pms_pm02_5": 7, "pms_pm10_0": 8, "net_transport": "lora:0004A30B001AE51D", "when_uploaded": "2017-03-02T00:39:17Z"}
{"device": 2565454211, "pms_csecs": 100, "pms_c00_30": 45177, "pms_c00_50": 12819, "pms_c01_00": 1439, "pms_c02_50": 121, "pms_c05_00": 17, "pms_c10_00": 0, "pms_pm01_0": 5, "pms_pm02_5": 7, "pms_pm10_0": 8, "net_transport": "lora:0004A30B001BC6B9", "when_uploaded": "2017-03-02T00:39:17Z"}
{"device": 217634580, "loc_alt": 26, "loc_lat": 42.564976, "loc_lon": -70.78405, "lnd_7318u": 27, "opc_csecs": 100, "opc_c00_38": 1023, "opc_c00_54": 276, "opc_c01_00": 50, "opc_c02_10": 3, "opc_c05_00": 0, "opc_c10_00": 0, "opc_pm01_0": 1.4122342, "opc_pm02_5": 1.6872364, "opc_pm10_0": 1.7094944, "net_transport": "lora:0004A30B001AE51D", "when_captured": "2017-03-01T23:51:53Z", "when_uploaded": "2017-03-01T23:52:03Z"}
{"device": 217634580, "loc_alt": 26, "loc_lat": 42.564976, "loc_lon": -70.78405, "lnd_7318u": 27, "opc_csecs": 100, "opc_c00_38": 1023, "opc_c00_54": 276, "opc_c01_00": 50, "opc_c02_10": 3, "opc_c05_00": 0, "opc_c10_00": 0, "opc_pm01_0": 1.4122342, "opc_pm02_5": 1.6872364, "opc_pm10_0": 1.7094944, "net_transport": "lora:0004A30B001BC6B9", "when_captured": "2017-03-01T23:51:53Z", "when_uploaded": "2017-03-01T23:52:03Z"}
matschaffer commented 6 years ago

Seems like this is less of a concern at this stage, so moving to the backlog for now.