Closed bartekn closed 4 years ago
That seems ok, only concern I would have is that if people are not careful they may end up writing processors that race against each other and the mutex won't save you.
Maybe the best would be to have a pattern (maybe using sprocs?) that ensures that it's not possible to have two processors that read or write data that other processors rely on.
I agree that the current DatabaseProcessor
is too large and needs to be refactored.
It is possible to move code out of the monolithic DatabaseProcessor
into separate processors without requiring thread safe db.Session
instances. If you define the ingestion pipeline as a linked-list of processors, then each processor would execute sequentially and there would be no concurrent access to the db.Session
.
ledgerPipeline.SetRoot(
pipeline.LedgerNode(&processors.RootProcessor{}).
Pipe(
pipeline.LedgerNode(&horizonProcessors.ContextFilter{horizonProcessors.IngestUpdateDatabase}).
Pipe(
accountForSignerLedgerNode(historyQ),
).Pipe(
orderBookDBLedgerNode(historyQ),
).Pipe(
trustLinesDBLedgerNode(historyQ),
),
),
)
The only issue I have with implementing thread safe db.Sessions is that it might be a band-aid solution which doesn't address the underlying issue: I suspect the concurrency baked into the ingestion framework is unnecessary.
The concurrency aspects complicate the implementation of the ingestion framework. Also, unless you're familiar with the internals of the ingestion framework, it's not obvious to the end user that sibling processors are executed in parallel.
The concurrency model is supposed to bring performance benefits. However, in https://github.com/stellar/go/pull/1967 I rewrote the history archive ingestion code without using the pipeline orchestration and the new version performed the same if not slightly better than the pipeline version. The new version does not use any concurrency.
The only issue I have with implementing thread safe db.Sessions is that it might be a band-aid solution which doesn't address the underlying issue: I suspect the concurrency baked into the ingestion framework is unnecessary.
Unfortunately, concurrency is necessary if you want to build more complicated solutions that include filtering of data and/or more complicated structure of the processing pipeline.
Let me explain using an example. Let's say that in dump-ledger-state
app you implemented without pipeline (#1967) you want to only dump data of specific accounts (ex. clients of some company) OR you want to dump accounts only (without trust lines, data and offers) OR only trust lines to a given asset. Obviously, you can add a bunch of if
statements in the code but the problem with that is it will make code less readable and testing more difficult (like the old ingest
package in Horizon). The cool thing about processors in the pipeline is that they are reusable, isolated (do one thing) and very easy to test (you just need mock the reader to send data you need to test vs in your code you need to add a bunch of extra code to isolate the code processing a given data type you want to test).
So why do we need concurrency? Let's say you have a processor that filters accounts only (writes accounts to the writer and ignores all the other types). In pubnet, It's actually a lot of data! If you want to run processors sequentially you'd need to store all the accounts in memory so the next processor can read it when started. It's possible that memory requirements will be really high (more than 4M accounts currently in pubnet). With concurrency, the next processor can read data as soon as the previous one writes it so you only need a small chunk of memory for a queue of data objects. You can also limit it so you can control memory usage (we actually use a buffered channels so when a channel is full writers wait for the next processors to read data).
I think it's worth noting that pipeline is not required to build an app using the exp/ingest
package, if developer is building a simple app they are free to use readers directly and process it in a single function. We can build helper functions/structs in exp/ingest
to help developers who are building smaller projects if it's harder than it should be now. I think I'm fine with promoting simple ingestion in the docs/examples as it will probably easier to understand for an average developer. However, for more complicated apps like Horizon I think it's necessary (especially if we want to implement plugins, filtering, etc. in the future).
Also, unless you're familiar with the internals of the ingestion framework, it's not obvious to the end user that sibling processors are executed in parallel.
We need to update the docs to make it clear. The new package is still experimental and docs are not perfect.
Unfortunately, concurrency is necessary if you want to build more complicated solutions that include filtering of data and/or more complicated structure of the processing pipeline.
actually, I don't think this is the case. In the 2nd commit of https://github.com/stellar/go/pull/1967 , I replaced the state pipeline of the horizon experimental ingestion system with an alternative solution. presumably the state pipeline for horizon ingestion is sufficiently complex to not be considered a toy use case. The alternative code accomplishes the same goal as the original with equivalent if not slightly better performance and I would argue it's more modular and easier to test.
The horizon ledger pipeline can also be rewritten in a similar fashion but it requires more refactoring. I will eventually push a third commit on the branch to demonstrate how it would look.
I think the perfect example to illustrate what I mean are OfferStore.Add
and TrustlinesStore.Add
in your PR.
OfferStore
requires offers Q and orderbook graph, TrustlinesStore
requires trust lines Q and asset stats Q. What if, for whatever reason, Horizon user doesn't want path finding or asset stats in their Horizon deployment. To allow this in your PR you'd need to add if
conditions to check what the code should actually insert: trust lines, asset stats OR both / offers to DB/memory OR both. It's easy to forget to add if
in some places, ex. you may have it when adding an offer to a batch but no if
when you actually commit/apply changes. But most importantly it complicates the code. The biggest issue to me is that there is no separation of concerns, Add
methods are doing two different things.
You may say that, fine, you can split each store above into two stores. But what if Horizon user doesn't care about all of the offers/trust lines in the network? What if they only want trust lines of their clients, offers in their markets? In your PR, you will have to add filtering if
condition before calling the methods that do something with offers / trust lines. But what if they want their markets in memory but the rest of offers in a DB? You actually need filtering inside all the methods. You'll actually build a pipeline-like structure but without a pipeline.
Again, you can add this but then the testing matrix need to include: all possible filters ⨯ all the methods processing data (with others turned off to isolate a given code path). It becomes more and more complicated.
In the pipeline, each processor is a separate entity doing exactly one single thing. If you remove one processor, say asset stats, the rest of the pipeline should work fine (no if
s). Processor doesn't make any assumptions about the previous processors, it doesn't care if the previous processor filtered data or not. When testing it, we only test scenarios connected to the code in this processor only not caring about stuff previous processors could do.
Again, I agree with you that for simple use cases (and by simple I mean apps that ex. save data to a single store like file and don't do any filtering and crazy stuff) it's probably easier to use readers directly. But if you want your app to do many different things like: filtering data, storing data in many different stores, maybe sending notifications based on changes in different accounts or different actions in the account pipeline is much easier to manage.
@bartekn I've pushed a commit to https://github.com/stellar/go/pull/1967 now there is a seperate store for asset stats and the orderbook graph.
What if, for whatever reason, Horizon user doesn't want path finding or asset stats in their Horizon deployment.
you would only need to comment out lines of code from https://github.com/stellar/go/pull/1967/files#diff-3b66fe1a6ca34a1d7747791e3054a199R171-R177
don't you think the way this code is structured is more encapsulated and easier to follow than https://github.com/stellar/go/blob/release-horizon-v0.24.0/services/horizon/internal/expingest/processors/database_processor.go#L24-L180 ?
But what if Horizon user doesn't care about all of the offers/trust lines in the network? What if they only want trust lines of their clients, offers in their markets? ... You'll actually build a pipeline-like structure but without a pipeline.
If there are really a set of features which require a pipeline structure, I think it's fine to have "a pipeline-like structure but without a pipeline" because then we can implement the parts of the pipeline model that make sense for the feature set and discard the rest.
But, I don't know if it's worth speculating so much on these types of use cases when we have a concrete list of remaining features to complete for the ingestion rewrite. I'd much rather take the approach of writing the simplest code that implements our ingestion goals. My fear is that we end up building a framework which is optimized for speculative use cases rather than the common use case.
I have not seen any demo or horizon ingestion pipelines that actually require the complexity of the framework. I have attempted to validate this claim by implementing the ledger dump job and the horizon state ingestion workflow without pipelines. I found the resulting code easier to work with.
you would only need to comment out lines of code from https://github.com/stellar/go/pull/1967/files#diff-3b66fe1a6ca34a1d7747791e3054a199R171-R177
In reality this would be an if
statement. In the following paragraph in my previous comment (when I said you can split this into two methods) I asked:
You may say that, fine, you can split each store above into two stores. But what if Horizon user doesn't care about all of the offers/trust lines in the network? What if they only want trust lines of their clients, offers in their markets?
Filtering is hard in this design. Again, you can add filtering to the new methods (as I mentioned in the comment) and so on but it's becomes more and more complicated.
don't you think the way this code is structured is more encapsulated and easier to follow than https://github.com/stellar/go/blob/release-horizon-v0.24.0/services/horizon/internal/expingest/processors/database_processor.go#L24-L180
The current code in database_processor.go
is something we need to fix but is blocked on this issue :). Then we can split the code in database-processor.go
into many processors.
But, I don't know if it's worth speculating so much on these types of use cases when we have a concrete list of remaining features to complete for the ingestion rewrite.
The new ingestion project doesn't end after rewrite of Horizon. Adding Horizon plugins and data filtering is on the list of things we want to work after the rewrite. The features I mentioned isn't speculation. We had this in mind when designing the new ingestion pipeline.
In reality this would be an if statement.
You were asking how would you disable the orderbook and asset stat parts of ingestion. I said that you would need to edit this list:
ingestionStores: []ingestionStore{
processors.NewAccountStore(historyQ),
processors.NewAccountDataStore(historyQ),
processors.NewAccountSignerStore(historyQ),
processors.NewOfferStore(historyQ),
processors.NewOrderbookStore(config.OrderBookGraph),
processors.NewTrustLineStore(historyQ),
processors.NewAssetStatsStore(historyQ),
},
to remove processors.NewAssetStatsStore(historyQ)
and processors.NewOrderbookStore(config.OrderBookGraph)
.
If you wanted to do the same using the current ingestion pipeline code you would edit
ledgerPipeline.SetRoot(
pipeline.LedgerNode(&processors.RootProcessor{}).
Pipe(
pipeline.LedgerNode(&horizonProcessors.ContextFilter{horizonProcessors.IngestUpdateDatabase}).
accountForSignerStateNode(historyQ),
orderBookStateNode(historyQ),
assetStatsStateNode(historyQ),
trustLinesStateNode(historyQ),
),
),
)
to remove orderBookStateNode(historyQ)
and assetStatsStateNode(historyQ)
. Am I missing something here?
The new ingestion project doesn't end after rewrite of Horizon. Add Horizon plugins and data filtering is on the list of thing we want to work after the rewrite.
Can you share more information on plugins and data filtering? Is there a spec for those projects?
We have the original design doc of the pipeline system and requirements doc for the new system - I will try to find both docs.
But basically a processor doing a single thing was meant to be a part of a plugin. For example, a good candidate is asset stats. If Horizon user/admin needs it, they should be able to add this processor to a pipeline (possibly by editing pipeline structure file, like yaml). The full plugin consists of:
http.Handle
?) that is added to the main server of Horizon so user can access data ingested by a processor in the pipeline. This is currently blocked because we aren't sure what what Horizon v2 API will looks like. But to give an example, in the current API, this would be an action exposing data in asset_stats
table.Plugins were discussed several times in OKR meetings (I think some plugin related tasks were part of Q3 OKRs).
I'm have a lot less context so it's very possible I'm way off base here, and apologies if I'm repeating an idea already discussed. Let me know if I am.
It looks like having concurrency baked into the Pipeline
type is forcing us to use concurrency in places we don't want it. e.g. It forced us to bundle all the database code into a single node. Can we pull the concurrency piece and make it a component that can be inserted into the pipeline at the points where we want concurrency? i.e. Could we move the concurrent behavior out of the Pipeline
type and into a new node type that wraps other nodes and runs its child nodes concurrently?
Does the pipeline need to converge on a set of nodes that process serially after a series of nodes that will process in parallel? If so, we could create a synchronizing node, that when injected into a pipeline returns processing to serial around a single mutex.
I think if we can move the branching and convergence into nodes, we can compose the pipeline with or without concurrency as needed.
For this specific issue it might be sufficient to leave the concurrency in Pipeline
and create the synchronizing node MutexNode
that converges on a mutex, instead of building the mutex into a db.Session
.
That steps us towards the concurrency being a composable component of the pipeline rather than baked into other components.
An older ideas doc that mentions plugins: https://docs.google.com/document/d/1i8GzFEzhCUt3toj_eSDvBEPPmlUUA0LkEwhEjl9jZGE/edit?usp=sharing
Requirements doc: https://docs.google.com/document/d/1EYHSPS_O0NW4-jk7T3qDDbXhjivhfXH4xEIjvZPzt84/edit
Original design doc: https://github.com/stellar/go/blob/master/services/horizon/internal/docs/plans/new_horizon_ingest.md
Let's plan to discuss this in a call on Friday. While I'm very interested in the concurrency question, and the bigger picture of the overall architecture, I think our primary focus should be to get unblocked so that we can implement the remaining ingestion processors. From that perspective, I think the question is whether to
1) Keep going with a monolithic DB processor
2) Synchronise db.Session
and break out the processors
3) Move to a different approach that removes or refactors concurrency
If 3) is too big/needs more investigation, perhaps there's a way to choose between 1) and 2), and still leave open the possibility for arch changes in the future without too much extra work?
Feel free to continue discussion here and we'll pull everything together on Friday.
Lets partition the requirements for revamped ingestion into two categories.
Category 1 consists of all the features we've implemented thus far and plan to implement until experimental ingestion replaces the legacy ingestion system in Horizon.
Category 2 consists of plugins, which we'll define as "A general framework for community-supplied endpoints". We plan to implement plugins in Q3 2020.
Now we're trying to evaluate between 2 implementation proposals. We have the status quo: the framework of pipelines, processors, sessions, etc. And, we have the alternative I've outlined here: https://github.com/stellar/go/pull/1967
My argument is that the alternative proposal implements the category 1 feature set in a significantly simpler manner. The reduction in complexity makes it easier for current and future Horizon developers.
@bartekn argument is: It may be true that the alternative proposal is a simpler implementation of the category 1 feature set but, plugins are the ultimate goal and that is more important. The pipeline framework is critical to our desired implementation of plugins. Therefore, it is best to stick with the pipeline framework.
I think it's safe to assert that there are a lot of open questions on the nature of plugins. Can we confidently say that we know exactly how a plugin should be implemented? We need to spend some time tinkering and prototyping real use cases to get some sort of concrete definition of what a plugin is.
Because plugins are currently too abstract I think it's really hard to evaluate whether any proposal is best suited for plugins. But for the sake of argument, let's say that the pipeline framework is better suited for plugins than https://github.com/stellar/go/pull/1967.
If we stick with the pipeline framework we're making a decision to live with more complex code for almost a year (Q3 2020), then all the features of the framework will start to come in handy and the bet will finally pay off.
I don't think we can have so much certainty about a distant project. I think it's more likely than not that:
It may be possible that the pipeline framework can be molded to support plugins. But, I would not bet that the best possible plugin architecture fundamentally requires a pipeline framework.
Instead of taking a top down approach where we design a system which addresses our current needs but prioritizes what we think we'll need in distant in the future, I prefer a bottom up incremental approach.
Let's design a system that is practical and addresses our current requirements (e.g. the category 1 feature set). If and when we start working on plugins, let's take some time to understand the problem. Let's work through various prototypes to inform our desired implementation.
Once we have reduced the uncertainty we can start progressing with a plugin system implementation. This will probably require some refactoring but I suspect that it will be easier to arrive at the ideal plugin system with a simple starting point.
We had a great call and discussed this at length. Actions: 1) In order to maintain our schedule, we will go ahead and implement synchronisation under the existing pipeline approach. We'll use the mutex approach suggested in this thread to make this less invasive. 2) We can continue with Tamir's proof-of-concept at medium priority, to give us something concrete to compare to. 3) We'll find a place to start sketching out the plugin architecture in more detail. This should be in either an existing design doc or a new one. This will help us evaluate possible approaches and pick the best one.
Done in #2024
Currently all code that add data to a database in
expingest/processors
package in Horizon is included in a single processor:DatabaseProcessor
. This was done because a database transaction cannot be shared by many go routines: https://github.com/stellar/go/issues/1836.I think we should actually have separate processors for all ledger entries / historical data because of many reasons:
if
conditions in the code.To solve this we need to update
db.Session
to send one request/response at any given time. To do it in a non-breaking way we can add aSynchronized
field tosupport/db.Session
. When it's set totrue
:sqlx.Rows
are not available (return error or panic).