pinax / pinax-stripe-light

a payments Django app for Stripe
MIT License
684 stars 285 forks source link

Reprocess missed events #621

Closed blueyed closed 5 years ago

blueyed commented 5 years ago

In case your event consumer might not have been running for a while (longer than some queue that would have hold the messages), it would be helpful to have a management command / helper method to reprocess missing event.

For this stripe.Event.list / stripe.Event.auto_paging_iter could be used, given some starting event (the last processed one).

  1. It should check if the event does not exist yet, of course.
  2. It should handle Stripe accounts, i.e. iterate over all your connected accounts.

I might end up writing this myself, but it might not become a full blown management command, but only a script / snippet to post here then.

blueyed commented 5 years ago

This is the script I've used for reference, closing.

# Reprocess events that have been missed due to stopped stripe-events-consumer.
import datetime as dt
import stripe

from configurations import importer
importer.install()
import django  # noqa
django.setup()

from pinax.stripe.models import Account, Event  # noqa
from velodrome.lock8.utils import ingest_stripe_event

stripe.api_key = "XXX"

for account in Account.objects.all():
    print("=== %s ===" % account)
    stripe_account = account.stripe_id

    events = Event.objects.filter(
        stripe_account=account,
    )
    existing_events = set(x[0] for x in events.values_list("stripe_id"))

    process = []
    for e in stripe.Event.auto_paging_iter(
            stripe_account=stripe_account,
            limit=500,
    ):
        created = dt.datetime.fromtimestamp(e.created).replace(
            tzinfo=dt.timezone.utc)
        if created < dt.datetime(2018, 10, 11, tzinfo=dt.timezone.utc):
            break
        if e.stripe_id in existing_events:
            continue
        process += [e]

    for e in reversed(process):
        msg = e
        msg['account'] = stripe_account
        created = dt.datetime.fromtimestamp(e.created).replace(
            tzinfo=dt.timezone.utc)
        print("process: %s: %s" % (e.type, created))
        ingest_stripe_event(e)