segmentio / analytics-next

Segment Analytics.js 2.0
https://segment.com/docs/connections/sources/catalog/libraries/website/javascript
MIT License
403 stars 136 forks source link

Node: add flush method #1010

Closed silesky closed 8 months ago

silesky commented 10 months ago

Support analytics.flush

Example

analytics.track('foo')
await analytics.flush()

analytics.track('bar')
await analytics.flush()

analytics.closeAndFlush({ timeout: 3000 }) simply calls analytics.flush({ close: true, timeout: 3000 }).

Behavior

  1. Require awaiting analytics.flush() before calling flush again.
  2. Ignore additional flush calls while a flush is pending, and log warning to console.

Recap of internal discussion

changeset-bot[bot] commented 10 months ago

🦋 Changeset detected

Latest commit: 3c3bdace821e185884ca9be7fd8ca954e9cfd678

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 1 package | Name | Type | | ----------------------- | ----- | | @segment/analytics-node | Minor |

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

chrisradek commented 9 months ago

Just wanting to confirm I'm reading the code correctly:

With this change, am I understanding correctly that if you call flush() multiple times with track() calls interspersed, all flush() calls will wait to resolve until all the track() calls completed? This is assuming that track() calls were made before any of the flush() calls completed.

silesky commented 9 months ago

@chrisradek

Just wanting to confirm I'm reading the code correctly:

With this change, am I understanding correctly that if you call flush() multiple times with track() calls interspersed, all flush() calls will wait to resolve until all the track() calls completed? This is assuming that track() calls were made before any of the flush() calls completed.

Not quite -- all calls should not wait. My intention here is that if any events enter the pipeline after .flush is called() (not necessarily awaited), those events are not flushed, but rather, included with the next batch whenever that gets flushed. Any of these late arriving events are treated normally, i.e. maxEventsInBatch is still in effect.

That way, the flush promise can't get held up in a pending state indefinitely if new events continue to come in after .flush is called.

I think the best way to illustrate the behavior is with this test: I added some additional code comments. https://github.com/segmentio/analytics-next/pull/1010/files#diff-a58dd0e72b28d53cee2b402d434904f47d91d318d2c4816b5331d6a67f2bbeafR283

Let me know if you think anything is amiss with the implementation of the logic I described.

chrisradek commented 9 months ago

I think there might be a bug then in this scenario:

  1. Have default batch sizes (or let's say 5, has to be more than 1)
  2. Call analytics.track() 3 times.
  3. Call analytics.flush() without awaiting
  4. Call analytics.track() 1 time.
  5. Call analytics.flush() with or without awaiting.

That flush call ends up overwriting the publisher's pendingFlushCount value, so initially it is set to 3, but the 2nd flush call resets it to 1. This ends up causing the 2nd flush to not flush properly - it waits the full flushInterval (10s default).

I was testing with this logic - the test times out at 5 seconds.

test('what happens here', async () => {
    ajs = new Analytics({
      writeKey: 'abc123',
      maxEventsInBatch: 5,
      httpClient: testClient,
    })
    let drainedCalls = 0
    ajs.on('drained', () => {
      drainedCalls++
    })
    let trackCallCount = 0
    ajs.on('track', () => {
      trackCallCount += 1
    })

    // make regular calls
    _helpers.makeTrackCall()
    _helpers.makeTrackCall()
    _helpers.makeTrackCall()
    const flushed = ajs.flush()

    // add another event to the queue to simulate late-arriving track call. flush should not wait for this event.
    await sleep(100)
    _helpers.makeTrackCall()

    const flushed2 = ajs.flush()

    await flushed
    expect(trackCallCount).toBe(3)
    expect(_helpers.getFetchCalls().length).toBe(1)
    expect(drainedCalls).toBe(1)

    // should be one event left in the queue (the late-arriving track call). This will be included in the next flush.
    // add a fourth event to the queue.
    _helpers.makeTrackCall()

    await flushed2
    expect(drainedCalls).toBe(2)
    expect(_helpers.getFetchCalls().length).toBe(2)
    expect(trackCallCount).toBe(4)
  })