influxdata / influxdb

Scalable datastore for metrics, events, and real-time analytics
https://influxdata.com
Apache License 2.0
28.97k stars 3.56k forks source link

Extend replication-stream APIs to set up a durable queue #22168

Closed danxmoran closed 3 years ago

danxmoran commented 3 years ago

Extend the implementation from #22167 to include this logic:

When the server processes a replication-stream creation request, it should:

  1. Initialize a durable queue with the requested size
  2. Start running a scanner on the queue to write points to the requested remote instance
  3. Save the stream's metadata to sqlite

When the server processes a deletion request, it should:

  1. Close the stream's durable queue
  2. Delete the stream's metadata from sqlite
  3. Delete any enqueued, un-flushed data for the queue on disk

When the server processes an update request, it should:

  1. Check if the max queue size has changed. If so, update the durable queue with the new size
  2. Check if the connection details for the target bucket have changed. If so, update the queue's remote-writer with the new info.
  3. Update the stream's metadata in sqlite

Note: Steps above related to the scanner/request writer have been split into a separate issue due to complexity. See #22733 for this new issue

mcfarlm3 commented 3 years ago

I’m working on the queue management portion of this issue right now and I’m not sure about appropriate parameters to use when the durable queue is initialized. I’m using this method to initialize a new durable queue anytime a new replication stream is created: func NewQueue(dir string, maxSize int64, maxSegmentSize int64, queueTotalSize *SharedCount, depth int, verifyBlockFn func([]byte) error) (*Queue, error) dir - I’m not sure about this one - what location/name makes the most sense? maxSize - This would be specified by the replication create request. I’m good on this one. maxSegmentSize - I think this should be the constant DefaultSegmentSize defined here queueTotalSize - Use a new SharedCount struct for this depth - I think this should be the constant MaxWritesPending defined here verifyBlockFn - I know the purpose of this is to verify that a block within a segment contains valid data. So is the idea that this function should read a segment, then compare it to what we expect to be there? What would be considered valid data in this case? I’ve been looking at some of the hinted handoff code on plutonium, to try to get an idea of how the durable queue was used there, but I’m still not sure what makes the most sense for some of these in the context of replications.

lesam commented 3 years ago

dir should be inside the engine-path I think. As a name, maybe something like$HOME/sarnold/.influxdbv2/engine/replicationq/<some identifier per queue> for the default install?

maxSegmentSize, queueTotalSize, depth - I'm OK setting these to fixed values but we may eventually need tuning knobs for them, I believe all these are configurable in Enterprise.

verifyBlockFn - this is attempting to find out if a block of data in the queue is valid or not (meaning that we can deserialize it into something that makes sense). The idea is to detect disk corruption. In a perfect environment, it could be a no-op, but if we can do some cheap validity checks we can do on whatever we plan to store in the block that would be a good idea.

lesam commented 3 years ago

How many queues do we intend to be setting up? One per replication destination? One per bucket?

mcfarlm3 commented 3 years ago

So I think it should be one durable queue per replication stream (with unique id), or at least that's my interpretation. For reference, the code for the replication create command is here. When a new replication stream is created, the user needs to specify a name, remote connection id, local bucket ID and remote bucket ID. So one queue per each of these unique replications?

dgnorton commented 3 years ago

I think dir should be configurable, with a default like @lesam described. Some users will be replicating a lot of data and may need the option to locate it separately.

dgnorton commented 3 years ago

So I think it should be one durable queue per replication stream (with unique id)

That sounds right to me. It's possible that streams could have different destinations. One of those destinations could be down while the other is still up. Separate queue for each stream ensures that if a destination is down it won't interfere with the other streams. It also makes cleanup easier when a stream is removed. I'm sure all of that could be handled with a single queue but would be easier with multiple.

mcfarlm3 commented 3 years ago

I think dir should be configurable, with a default like @lesam described. Some users will be replicating a lot of data and may need the option to locate it separately.

Okay, that makes sense. I'm not entirely sure what is meant by making dir configurable, like what that looks like in the actual code. Could you point me toward a similar example and/or file?

lesam commented 3 years ago

@dgnorton @mcfarlm3 - If we allowed configuration of WAL and TSM write locations separately I'd probably agree that replication should be configurable as well. But as is, there's only one knob to configure where 'heavy disk usage stuff' lives at the moment (the 'engine' directory location is configurable), and it seems like replication streams should fit into that pattern.

mcfarlm3 commented 3 years ago

@dgnorton @mcfarlm3 - If we allowed configuration of WAL and TSM write locations separately I'd probably agree that replication should be configurable as well. But as is, there's only one knob to configure where 'heavy disk usage stuff' lives at the moment (the 'engine' directory location is configurable), and it seems like replication streams should fit into that pattern.

I'll go ahead with default path only then (non-configurable for now), unless you have strong objections @dgnorton?

dgnorton commented 3 years ago

@mcfarlm3 no objections