Closed iostat closed 7 months ago
Hold off on merging -- tests run file locally, I swear :P -- seems like I had a lucky race on machine. I think it's indicative that the "afterFilterHook" might not make any sense due to the order in which the producer/consumer coroutines may produce and consume values. Gonna see if that's true or not
Ok, so I ended up nuking the afterFilterWindow
hook, and adding beforeEvent
and afterEvent
. This should let you create checkpoints for each individual event processor -- which you can use to determine a starting point for your whole pipeline.
what is an example of beforeFilterWindow
that would be used in a real scenario?
at this juncture, after getting rid of afterFilterWindow -- it's a roundabout way of notifying you that that the chain has made progress.
so for example, if you know how to reconcile chain re-orgs, you can use it as a notification that the chain has progressed -- triggering your logic that reconciles any potential chain reorganization and commits any "pending" events. so you can run an event' against Latest with trailBy: 0, computing "uncommitted"/"pending" events, and then use beforeFilterWindow to trigger your reconciliation/commit-to-db logic when your actual finalization window has elapsed.
You can't just do this using the event hooks, as if you have sparse events then you'd be "pending" until the next event comes in, which is possibly never or way after your finalization window
for a real example, let's say you're running an indexer to generate NFT icons. there's a pretty good chance those icons won't change in the event of a chain reorg but you want the icon to be available ASAP. let's say that even if a mint event does get re-orged, only a subset of your render data changes and needs to be updated. for the sake of argument, you want a 5-block window before a block is final.
if you just use trailBy, the end user has to wait 5 blocks before the icon of their minted token is visible which sucks for UX.
instead, you can run trailBy: 0, and any mint that happens gets rendered and made available immediately -- just flagged as "pending". every time you get a beforeFilterWindow hook, you check your pending queue to see if anything changed -- reconcile those changes -- and strip the pending flag on icons that are finalized.
your alternative is to run one event' against Latest, and a second thread that polls for block progress and triggers an event' { from: chainHead - 5, to: chainHead }
to do the reconciliation logic this can lead to some very nasty race conditions (node is single threaded, so no guarantee that your two "threads" will run in "parallel" -- parallel meaning interleaved here) this is obviously less than ideal...
the race condition is that your second thread might NEVER run for whatever reason. or it runs before the first event' had a chance to render the images at chain head. so the only way to really make it "reliable" is to have the second thread run the whole pipeline all over again over the last 5 blocks, which is wasteful, can potentially be interfered with by the first thread, and again, there's no guarantee it'll actually even run.
by using a beforeFilterWindow hook, you guarantee that your reconciliation logic runs once per block once event' starts following the chain head -- and that your reconciliation logic runs before events from that block can be processed -- so you know that you won't be stepped over by another thread. and even if you have to double/triple your "finalization" window to compensate for unprocessed events due to how the coroutine library runs the event handler, that's OK, because you can still serve pending token images that you know will get finalized after some time without needing an event to trigger it
I think it might be better to back up and write an issue that describes the missing feature that we want in a succinct way. For one thing, if you're thinking about an indexer and the ability to checkpoint events, then this before
and after
hooks can already easily be done imo without adding library code:
wrapHandler
:: forall e.
ReaderT Change Web3 unit
-> ReaderT Change Web3 unit
-> (e -> ReaderT Change Web3 unit)
wrapHandler beforeAction afterAction h = do
ReaderT beforeAction
result <- h
ReaderT afterAction
pure result
pollEvents :: forall fs. Web3 (Either (MultiFilterStreamState _) ChangeReceipt)
pollEvents =
event'
{ foo: Filter ...
, bar: Filter ..
}
-- NOTE: there is a way to fold over the record so you don't have to put this on every one
{ foo: wrapHandler checkIn checkOut fooHandler
, bar: wrapHandler checkIn checkOut barHandler
}
where
checkIn = ... :: Change -> Web3 unit
checkOut = ... :: Change -> Web3 unit
Im just not sure it's worth adding library code unless this is coming up time and again.
I do see the value in being able to run some kind of before or after hook for eth_getLogs
to checkpoint that an entire request was made, since you can't currently do that without modifying the library. But there it maybe makes more sense to me to use the the Filter
type itself as an input to the hook.
unless I'm mistaking, onFilterTermination
is also not really needed here, because when you run use event'
that value is not even always returned, but when it is you can just run your handler there.
So, running checkpoints can be implemented like that by wrapping the handler - but you run into the "sparse events" issue. As you said you can get around that by modifying the filter type to let you run code before/after a getLogs call -- but you run into a different problem.
If you have complex interactions with different types of events, you need to now synchronize your different event handlers/wrappers/hooks placed into the Filter. Consider a TCR interaction for example -- you have to watch for token transfers, tcr events, etc. Sure, you can probably accomplish this with a set of semaphores or whatever triggered by each event, but a much more straightforward way to do this is by running logic before any events from a given block can be processed. You can't really put something like that in an individual filter -- it makes the most sense to do that as a hook to the filter producer (which is exactly what beforeFilterWindow does). Not to mention, if you have a spotty web3 provider that drops event results if they exceed a certain count in one request (you know who i'm talking about) -- that solution breaks down completely... And if the getLogs hooks are on a per-filter basis, then you still have to synchronize all those hooks
Ideally, you'd actually want to run reconciliation logic after ALL events have been processed for a given window -- but that's hard to do with the coroutine library without significant modifications. It's not a big deal though because you can guarantee that a filter window won't change until all events from a previous window have been processed (due to how pullFrom
works in conjunction with stagger
). so worst case your beforeFilterWindow hook just runs a block or two ahead of any events seen. Now, before-/afterEvent will let you know exactly what your lag is (if any -- it's really only noticeable if you're running block times <= 3 seconds (the filter producer's chain head polling interval), which almost nobody is) without having to add synchronization logic to all of your event handler wrappers. In other words, beforeFilterWindow for window N is the same thing as afterFilterWindow for window (N - 1 - eventProcessingLag), where eventProcessingLag is quantifiable thanks to before and/or afterEvent -- and is actually 0 in 99% of cases.
Now, the only way to get such a hook to filterProducer is through an argument to event' (or have a version of event' where you supply your own filterProducer, but at that point why are you even using this library?) You can change the record that event' takes and break existing code, requiring someone to pepper in beforeFilterHook: Nothing
everywhere they used event'. Or you can use Option
like this PR does which lets existing code work without any changes. Moreover, it's future proof -- any new hooks don't incur any breaking API changes either as they're all contained in that args record.
To the point of running stuff after a getLogs call, that's what "afterFilterWindow" effectively did -- I just removed it because I realized it doesn't really get you anything useful that you don't already get with beforeFilterWindow.
In a way, this is something that comes up time and again -- we've always worked around this issue by just trying our best to avoid running into it -- usually by incurring some design change in our indexing architecture. but saying a problem doesn't need to be solved because it can be worked around doesn't actually solve it :P. This PR's secondary goal was to solve it without breaking existing stuff (hence the use of Option and shoving everything into event' instead of changing Filter's type, etc.).
I agree that if you want the granularity of hooks around a getLogs call for a specific filter then it belongs in the Filter type -- but I don't see what capability that gains you that you can't get with before-/afterFilterWindow + wrapping your existing event handlers. If this API is ever extended to allow you to do modifications to the requests/results within the hooks then I can see the value -- but for the moment this is designed to be strictly "notifications".
re onFilterTermination: sure. the idea here is you can have a function that makes all the hooks you care about as part of your reorg reconciliation logic (beforeFilterWindow, beforeEvent, and potentially onFilterTerm as well) and just gives you back a arguments record to pass to event'.
so, without onFilterTerm, if your logic did care about a filter terminating the event' -- you'd have to forward it that information downstream of where you instantiate it, which turns the whole thing into a very leaky abstraction.
Since that was a braindump of sorts, here's a set of interrelated user stories, followed by how this PR solves them:
Story 1: I have some contract which emits events, which I need to index from the moment it was deployed (block 1) to now (block 1,000,000). My contract was interacted with very sporadically over its lifetime (let's say at blocks 30, 50, 1200, 456000, 789000, and 789010). I'd like to avoid reprocessing all 1 million blocks if something goes wrong -- so I want some "checkpoint"-like functionality. I also want to indicate progress to the user that even though no new events are showing up, the chain indexing process is still ongoing. In this scenario, if I store a checkpoint and update my progress every time my event handler gets fired, I would only do so at the aforementioned blocks. This means that between blocks 1200 and 456000, the user has no way of knowing that any progress is being made -- the progress bar would sporadically jump from 0.001% to 45.6% after some long period of apparent inactivity. Moreover, let's say somewhere between 1200 and 456000, the connection to my web3 provider gets lost and my indexing process has to restart. If we store checkpoints just when events get fired, then any error between 1200 and 456000 means my indexer would have to restart at 1200 -- very inefficient.
If I had some ability to be notified that all event processors have finished running for a given block interval I could: a) create a checkpoint at that block window, and avoid restarting my indexing process in an inefficient manner that does up to ~445k blocks of logs queries that will return no logs b) notify the user that in fact the indexing is still ongoing -- there are really are just no events all that time.
Story 2:
I want to be able to show the result of processing certain contract events to the user as soon as they happen, as opposed to after some arbitrary finality interval (i.e., trailBy
). These event processors are computationally expensive, and so I would like to avoid re-running them when possible. I am aware that running against the chain head makes me susceptible to inconsistencies due to chain re-organizations. I'm OK with doing this, as I have a method (which I call the "finalizer") to reconcile such inconsistencies without reprocessing all events in a finality window. In my architecture, my event processors simply emit their outputs into some "pending" queue, and my finalizer can determine which outputs in the pending queue are valid and which need to be recomputed. For a given block number -- as long as the finalizer runs after all events in that block number have been processed, I can guarantee that the finalizer can correctly reconcile any inconsistencies in the pending queue.
Just like in story 1, I need some way to know when all event processors for a given block interval have finished running. My finalizer doesn't care about how many or what kind of event processors there are in the pipeline -- just that it runs after all event processors for block N
have fininshed.
Moreover, recall that my event processors are computationally expensive. The naive approach of simply running two event'
with trailBy: finalityInterval
and trailBy: 0
would incur wasteful recomputation, which also has the potential to lead to race conditions. Running an event' { trailBy: finalityInterval }
and a separate thread which polls for new blocks and tries to run the finalizer opens up another difficult synchronization problem -- I have no insight into the progress of the event'
-- so I'd have to pollute my event processing logic to try and synthesize that information whenever an event comes in (and I can't think of a scheme that would allow that to be computed without unaccountable edge cases). I could try make some sort of clever loop that ends up reproducing the chain head trailing logic and running event' against one block and then my finalizer, but the whole point of the hooks is to allow one to have a facility that lets them avoid doing that.
This is very difficult to do currently -- there is simply no way for me to execute any logic between filter windows. Even if I had the ability to execute some logic before or after an individual filter's getLogs query, I would have to include a contrived synchronization mechanism to ensure that my finalizer runs once between every filter window (afaik, I have no guarantee as to what order the getLogs queries run -- just that the event handlers run in logIndex
order after they all finish).
Solution: It is actually possible to satisfy the following, which enables the use cases in the above stories:
N
N' < N
.Req. 1 is trivially satisfied by beforeFilterWindow
being called prior to yieldT fs'
in filterProducer
-- your logic is ran before the getLogs parameters are yielded to the routine that actually queries the logs so it's impossible for the event processors for events at that filter window to have ever started running.
Req. 2 is a little trickier to satisfy. Because of how the coroutine library runs the filter producer and event consumers, there's no way to directly say "all event processors for a given block number have finished" (i.e., afterFilterWindow
). However, from the implementation of pullFrom
, you can see that when a producer reaches an Emit
point (i.e., via yield
), it waits until the consumer reaches an Await
point (and vice versa). This means that the filter producer and the event runner coroutines run in a sort of lockstep. Because Ethereum blocks do not get produced instantaneously, filterProducer will eventually reach a point where it delays for 3 or more seconds before trying to yield new filter windows. If the event runner is still getting logs or running handlers from a previous iteration, this gives it some time to reach an Await
point. What this means is that once event' reaches "chain head trailing mode", then execution flow very quickly boils down to a lockstep similar to the following:
filterProducer block N
filterProducer block N + 1
filterProducer block N + ...
filterProducer block N + lag
eventRunner block N
filterProducer block N + lag + 1
eventRunner block N + 1
filterProducer block N + lag + 2
eventRunner block N + 2
filterProducer block N + lag + 2
eventRunner block N + 3
filterProducer block N + lag + 3
eventRunner block N + 4
filterProducer block N + lag + 4
eventRunner block N + 5
filterProducer block N + lag + 5
To figure out what lag
is, first remember that the filter polling interval for chain head is 3 seconds. This means that the only possible way lag
can ever grow is if your chain's block time is <= 3 seconds, and/or your eventRunners take more than blockTime
to process. In the vast majority of use cases, neither of those apply, and it means that lag
will be 0 or 1. Nonetheless, lag
is straightforward to quantify:
N_finished_fw .~ event'_start_block - 1
N_fw .~ Just event'_start_block
N_lag .~ N_fw - N_prev_fw`
N_finished_fw .~ case N_fw of
Nothing -> (N_finished_fw + N_lag) -- N_fw was cleared, so N_lag was updated by eventRunner. Last fully processed filter window is N_finished_fw + lag.
Just _ -> N_finished_fw -- N_fw wasn't cleared by eventRunner, so eventRunner hasn't run yet. Keep N_finished_fw as is so N_lag can grow by 1
N_fw .~ (Just window_block_number) -- Store our latest filter window so eventRunners can use it recompute lag
N_lag .~ (N_fw - N_finished_fw) -- Our lag is this filter window minus our last fully complete filter window.
if isJust N_fw then -- if N_fw is Nothing, then an event runner has already computed lag for this filter window.
N_lag .~ N_fw - N_ev -- Recompute lag from last new filter window.
N_fw .~ Nothing -- Set N_fw to Nothing so other event runners dont recompute until next filter window comes in.
With the lag quantified, beforeFilterWindow (N + lag)
becomes equivalent to afterFilterWindow (N - 1)
in the chain head trailing case. Note we use a Maybe
to prevent event runners from recomputing the lag until a new filter window is established in the case that a set of event handler takes more than one block time to finish. N_finished_fw is used to compensate for the initial unknown lag (when there are many filterProducers in a row that had no eventRunners triggered). If no eventRunners triggered step 3 yet, N_finished_fw remains unchanged (as we don't know for sure if events from that block have been acknowledged yet) and lag keeps growing by 1. You can then insert a call to afterFilterWindow N_finished_fw
in step 2 after setting N_finished_fw
and before setting N_fw
to trigger your afterFilterWindow
logic.
Conclusion:
Adding the hook mechanism in this PR allows the above solution to be implemented, without breaking existing code or changing the record that event'
takes, due to the use of Option
. It also enables the finalizer described in the user stories to be implemented in a fashion that makes the event processors mostly unaware of its existence. While we could implement filter-specific hooks as you propose, that adds a layer of complication to the "entire filter window" scenarios outlined above. Naturally, such a hook shouldn't belong in an individual filter (separation of concerns)
This expands the functionality of
event'
by allowing you to add hooks that run at certain parts of theevent'
lifecycle. Currently three are implemented -- notifying you when when a new filter window is about to be queried, after a filter window has finished querying, and when a filter has terminated theevent'
loop via aTerminateEvent
.Naturally, this is very handy when writing chain indexers, as you can have a checkpointing system that doesn't just checkpoint at the last event seen. If on-chain interaction with your target contracts is sparse, this could mean you checkpoint very far from the chain head and have to wait to a long time to catch up to chain head when restarting from a checkpoint. Previously, the alternative was to somehow hook your
Provider
and intercepteth_getLogs
calls -- which is pretty lame and unreliable. Now, you can get indexing progress straight from the horse's mouth.Using
option
, we can actually make all of these hooks as "optional" keys to theevent'
arguments record (which was originally used only for filter window parameters) -- i.e., you don't have to supply a record containinghookYouDontCareAbout: Nothing
-- so no need to have adefaultEvent'Args { fieldsYouCareAboutSetting }
either.In other words, this PR does not break the existing API, and adding more hooks in the future won't either.
Tests are included to ensure that using
Option
won't break any type inference of existingevent'
use sites, as well as tests to ensure that the hooks run in the expected order.In principle, this could be expanded to the regular (single-filter)
event
function as well since it's a single-filter MultiFilter under the hood, but that would actually require breaking API changes, and doesn't seem worth it anyway (i.e., if you're just polling a single event, you probably don't care about checkpointing, and someone who really cares about both can just construct a single-event multifilter)