stellar / go

Stellar's public monorepo of go code
https://stellar.org/developers
Apache License 2.0
1.29k stars 502 forks source link

ingest/pipeline: Create functional producer for BufferedStorageBackend #5412

Open sreuland opened 1 month ago

sreuland commented 1 month ago

What problem does your feature solve?

BufferedStorageBackend provides ledger close meta(LCM) individually per GetLedger, but there is no more efficient way to participate as a streaming producer of LCM's.

What would you like to see?

Follow design proposal on Functional Processors Lib Provide a ‘Producer’ function for BufferedStorage Backend. The function will be used as ‘producer’ operator in a pipeline, emitting tx-meta LCM over a callback fn and acts as a closures to encapsulate private instance of BufferedStorageBackend and avoid any unintended side effects.

// the returned channel will either return one error or it will be closed when the publishing is finished for bounded range
// if it's unbounded, then chan would never be closed.

PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range,
                                            bufferedConfig ledgerbackend.BufferedStorageBackendConfig,
                                            dataStoreConfig datastore.DataStoreConfig 
                                            ctx context.Context, 
                                            callback func (xdr.LedgerCloseMeta) error) chan error

}

The method will return immediate, creating an async worker routine in background to continue processing.

Visualization of where the producer function fits in the larger CDP design for data transformation pipeline:

dual_func_general

Relates to:

What alternatives are there?

new streaming ingestion app use cases would have to implement the same locally.

tamirms commented 1 month ago

I think there are some subtleties to this interface change that we should consider:

Given these potential issues, I think for the MVP we should avoid changing the LedgerBackend interface. In the future, as we see more uses of the ingestion library we can come up with some helper functions which will reduce boilerplate.

sreuland commented 3 weeks ago
  • If captive-core terminates unexpectedly this interface does not allow us to communicate that error to the caller

publishing can return a channel to propagate completion status to the caller, an error is sent on channel and then closed or if no errors and finished publishing for requested range, then channel is closed.

  • What happens if you call Publish() and while that go routine is running you call GetLedger() concurrently?
  • What happens if you call Publish()multiple times before the go routine finishes running?
  • What happens if you call Publish() and then call PrepareRange() before the go routine finishes running?

yes, to avoid these re-entrancy problems with LegerBackend instance needed to drive publishing, I think can skip adding the notion of publishing on to LedgerBackend, instead go with functional closures to encapsulate a private instance of LedgerBackend for publishing concerns, internally the closures can iterate on GetLedger(), to avoid any side effects and edge cases related to the underlying backend, the net change for new proposal would be:

// this will create a private instance of CaptiveStellarCore using NewCaptive() 
PublishFromCaptiveCore(ledgerRange ledgerbackend.Range,
                                            captiveCoreConfig ledgerbackend.CaptiveCoreConfig, 
                                            ctx context.Context, 
                                            callback func (xdr.LedgerCloseMeta) error) chan error

// this will create a private instance of BufferedStorageBackend using NewBufferedStorageBackend()
PublishFromBufferedStorageBackend(ledgerRange ledgerbackend.Range,
                                            bufferedConfig ledgerbackend.BufferedStorageBackendConfig,
                                            dataStoreConfig datastore.DataStoreConfig 
                                            ctx context.Context, 
                                            callback func (xdr.LedgerCloseMeta) error) chan error

}

Given these potential issues, I think for the MVP we should avoid changing the LedgerBackend interface. In the future, as we see more uses of the ingestion library we can come up with some helper functions which will reduce boilerplate.

I think if we can provide this sdk mechanism up front for automating the streaming of ledger tx-meta it will be worthwhile for demonstrating the DX during the MVP timeframe as it lowers resistance for app development(DX) to adopt CDP approach of transforming network data to derived models in a pipeline. Apps avoid investing in boilerplate(ledgerbackend, GetLedger iteration, etc) and they get stellar tx-meta 'source of origin' operator(publisher) to use in their pipeline out-of-box.