Closed jimmarino closed 2 years ago
thx a lot for the proposal, that's great!
Some questions I have - partially beyond the scope of the DPF, just to sort things in the right way in my slowly-working head:
Hi @MoritzKeppler just saw your questions now.
The Data Plane Framework is just one implementation of a DataFlowController. There can be more (e.g. for streaming), right?
In the scope, I initially wrote, "The Data Plane Framework is designed to only work with finite data transfers. Streaming (non-finite) data transfers should be handled by other DataFlowController implementations" However, your use case in the next point brings up some interesting scenarios.
a consumer wants to pull data multiple times on demand, e.g. from a REST API like the one defined by AAS. (is every call a separate transfer process?)
That's an interesting scenario. It seems a bit heavyweight and onerous to require a transfer process for these types of data exchange. Maybe that should be modeled as a non-finite process (i.e. a subscription)? The question would then be: should that type of data flow be handled by the data plane or a separate extension? My initial idea was only the Connector should initiate flows through the DPF, and those flows should be finite, but that could be changed. What I think we need to avoid is mediating non-finite data streams, specifically interposing the DPF between messaging endpoints (e.g. Kafka). Maybe the AAS transfer should be handled by its own extension rather than through the DPF? Are there specific ways you think the DPF could be relevant here?
a provider agreed to push data as soon as an event occurs (e.g. an error event in a machine; can happen far away in future)?
This seems like a subscription but one that happens intermittently. Again, maybe the DPF could be beneficial since it provides an n-way data flow model. We could open the DPF up receive requests from infrastructure other than the Connector in this case.
who takes care for logging events (data sent, retrieved) including notifying a clearing house if needed?
I think this should be done by the Connector or other controlling system. The DPF is just a set of pipes.
who takes care for managing retrieved assets? In the data transfer diagrams we drew in summer, we discussed adding retrieved assets to some index and new assets with backpointers if an asset needs to be reshared.
This is probably a much broader discussion. IMO the DPF should not manage this I also don't think the Connector should ever store retrieved assets or have long-term access to them for security reasons. Let's discuss this because there are a lot of scenarios to go over.
Are workers thought to run in separate processes? Or how will it scale horizontally? Yep, just spin up as many workers as one needs and multiplex requests to them.
The DataPlaneFramework (or more generic: a DataFlowController) will just run on one side, either on Consumer side if it's a pull flow, or on provider side, if it's push. Or is it thought to run on both sides?
It theoretically only needs to run on one side, I think :-)
Here's the related PR #496.
@MoritzKeppler @bscholtes1A
I made an update to the spec and pushed corresponding changes to the PR branch. I added the notion of ad hoc sinks and sources. This will allow us to send/receive messages and events directly through the DPF. For example, client code dequeues a message and wraps it in a InputStreamDataSource
that is passed to the DPM and into the transfer pipeline. The data source is just a wrapper around the dequeued message. Similarly, client code can transfer from a source to an OutputStreamDataSink
which can be a wrapper around an in-memory object or a queue message.
Operations are async but return a CompletableFuture so clients can choose to wait on the results. This isn't fast, but it is important to the design since external sinks and sources that are slow should not impact the overall operation and progress of the DPF instance. If a use case requires lower latency, we should recommend a direct streaming solution such as a Kafka extension.
Let me know what you guys think
@jimmarino If it is possible to have DPF running on both consumer and provider sides, what is the process of deciding where to have it running? Is it supposed to be a dedicated property in DataAddress.TYPE
? I can imagine HTTP requests be handled always on the consumer side whereas S3 requests - on the provider side.
For example, Here you mentioned that HttpFunctionDataFlowController
would call to DPF on both sides, how would it decide what side to call?
@MoritzKeppler @bscholtes1A
I made an update to the spec and pushed corresponding changes to the PR branch. I added the notion of ad hoc sinks and sources. This will allow us to send/receive messages and events directly through the DPF. For example, client code dequeues a message and wraps it in a
InputStreamDataSource
that is passed to the DPM and into the transfer pipeline. The data source is just a wrapper around the dequeued message. Similarly, client code can transfer from a source to anOutputStreamDataSink
which can be a wrapper around an in-memory object or a queue message.Operations are async but return a CompletableFuture so clients can choose to wait on the results. This isn't fast, but it is important to the design since external sinks and sources that are slow should not impact the overall operation and progress of the DPF instance. If a use case requires lower latency, we should recommend a direct streaming solution such as a Kafka extension.
Let me know what you guys think
Hi @jimmarino and thanks for the improvements. Two main points:
HttpDataSource
which is in charge of pulling the data out from the http endpoint. Reminder: in our use-case, the Asset
represents the API source but you can (should) provide query parameters in order target only a subset of the data that are accessible through this API.Hi @MikhailGordienk The DPF can run on the producer or client side. Its job is to retrieve data from a source and send it to a target.
This can work with "push" or "pull" data transfers:
In a push scenario (the provider sends data to the client), the DPF is on the provider side and retrieves the data from the provider's storage and sends it via the DPF to some endpoint the client has sent with the request. This endpoint could be an S3 bucket or HTTP endpoint. From the perspective of the provider's DPF, the infrastructure behind that endpoint isn't visible.
In the pull scenario (the client retrieves data from the provider), the DPF is on the client side and retrieves the data using the address sent by the provider. The data is sent by the DPF to a storage location specified by the client. Again, what is behind the provider address isn't visible to the client DPF.
Does that clarify how it works?
Hi @jimmarino and thanks for the improvements. Two main points:
- I still do not see in the code how the query parameters are conveyed to the
HttpDataSource
which is in charge of pulling the data out from the http endpoint. Reminder: in our use-case, theAsset
represents the API source but you can (should) provide query parameters in order target only a subset of the data that are accessible through this API.
Let's discuss. I didn't put this in yet. One possibility is to put this in an extensible property or use the data address.
- About the ad-hoc source/sink, I do not get it :D Based on the code of your PR, could you detail what is yet needed to implement in order to have the scenario we discussed? i.e. data provider reading data from a kafka topic and then sending them to the consumer through HTTP.
This can be done in a couple of ways. For the first case, let's assume the DPF is setup as a server with an HTTP(S) interface:
InputStreamDataSource
:var dataFlowRequest = // ...
var messageBytes = // ....
var source = new InputStreamDataSource("test", new ByteArrayInputStream(messageBytes))
// DataPlaneManager is injected into the extension
dataPlaneManager.transfer(source, dataFlowRequest).whenComplete(r-> ...);
The other case is when the DPF is embedded in the KafkaConsumer (or the DPF server subscribes directly to Kafka). In that case, the code is virtually identical save for the HTTP hop.
You can also use an OutputStreamDataSink
to implement the reverse scenario to send data directly to a destination. For example, retrieve data from a DataSource and pipe it directly to the output stream sink which could be attached to a queue or some other system.
Does that make sense?
Overview
This issue will detail the architecture and design of the Data Plane Framework (DPF), which will provide a mechanism for implementing an extensible
DataFlowController
that supports a myriad of deployment topologies.Scope
The Data Plane Framework is designed to only work with finite data transfers and small payload, latent non-finite transfers such as events. High-volume or latency-sensitive streaming (non-finite) data transfers should be handled by other
DataFlowController
implementations that delegate to specialized third-party infrastructure such as Kafka.Principles
Minimal state. All state pertaining to a transfer process will be maintained by the Connector Control Plane as part of the
TransferProcess
. The only state that will be maintained by the DPF is if a transfer process has been completed. This requires the Control Plan to issue retries in the event of failure.No transformations. The DPF is not an ETL tool. There will be no facilities for data transformations or processing. This is expected to handled by the Control Plane as part of the provisioning state.
Do not reinvent the wheel. The DPF will rely on existing data transfer technology such as S3, Azure Object Storage, FTP, etc. As a rule of thumb: the DPF should not contain any wire protocol implementations.
Flexible Deployment. It must be possible to:
Extensible. The DPF will be built using the EDC modularity and extensibility system.
Design
Data Plane Manager
The
DataPlaneManager
(DPM) will enqueueDataFlowRequests
requests, which may originate from an ingress such as an HTTP endpoint. The queue implementation will be a bounded, in-memory data structure that provides support for backpressure when the system is overloaded. However, since the queue is in-memory, the client control plane submitting the request will be responsible for re-submitting a request in the event of failure:The
DataPlaneManager
will look similar to the following:The DPM implementation will have a configurable number of workers that dequeue requests for processing (data transfer). This processing operation will complete asynchronously and when a result is returned, an entry for the process indicating success or failure will be made using extensible storage. This will allow the control plane to query the DPF for the status (COMPLETED or ERROR) of a process. No other state will be maintained by the DPF.
The DPM will also contain transfer methods that take a DataSource and DataSink respectively, These can be used to provide an ad hoc transfer source or destination. For example, user code may de-queue a message to memory and send it to a destination using an
InputStreamDataSource
that wraps the in-memory message. Similarly, user code may wish to transfer data from a source to a providedOutputStreamDataSink
.Pipeline Service
When a DPM worker dequeues a request, it will delegate to
PipelineService#transfer(request)
:The implementation of this service is straightforward: it connects a sink with its source, both of which are instantiated by extensible factories. This design allows for scalable, n-way data transfers since all data will be directly streamed from its origin to the terminus.
Pull Streaming
The DPF design is based on a pull streaming model; a
DataSink
pulls data from aDataSource
. This allows the sink to control the rate of transfer and potentially parallelize operation. Moreover, optimizations can be made such as copy-in-place when the source and sink are the same infrastructure such as object storage hosted by a single cloud provider.The following is the provisional
DataSource
interface:Implementations may support random data access for parallel transfers of large files. For example, both AWS S3 and Azure Object Storage have facilities for ranged gets.
The
DataSink
interface is tentatively defined as:DataSink
implementations will provide the ability to configure parallel transfers, e.g. controlling the number of threads to perform ranged gets.Extensions
HTTP
The HTTP Data Plane extension will provide support for sending data sourced from an HTTP endpoint and posting data to an HTTP endpoint. By nature of the DPF design, which supports n-way transfers, HTTP-sourced data can be sent to any
DataSink
type and an HTTP endpoint can receive data from anyDataSource
type. The extension is designed to stream content to limit memory consumption under load.Note that the large data transfers should use the Azure Object Storage or S3 extensions as those support more scalable parallelization.
Requirements
End users will need to provide a source endpoint that serves data via
HTTP GET
. The sub-path of the endpoint will include the name of the artifact. An authentication token will be provided as an HTTP header.End users will need to provide a target endpoint that received data via
HTTP POST
. The sub-path of the endpoint will include the name of the artifact. An authentication token will be provided as an HTTP header.Azure Object Storage
See #618.
S3 Compatible Object Storage
Coming soon.