googleforgames / open-match

Flexible, extensible, and scalable video game matchmaking.
http://open-match.dev
Apache License 2.0
3.16k stars 333 forks source link

Proposal: Replace redis with custom store implementation #1117

Closed Laremere closed 4 years ago

Laremere commented 4 years ago

Proposal overview

This is a proposal to remove the entirety of redis from Open Match (OM), and instead write a custom in memory database. (Redis is also an in memory database, but this would be custom.)

The primary goal of doing this replacement is to support greater scale numbers, but there are a large number of other advantages as well.

The current redis implementation is outlined in the alternatives considered section.

A working prototype of this proposal is here: https://github.com/Laremere/open-match/tree/store

Implementation Details

A new protobuf service and component are added to OM, named "store". Redis is removed, and all the places which talk to redis instead talk to the store (though not always in the same way). Query also has an entirely different internal composition, though it's interface to the MMF remains unchanged.

image

As a rough view of what the protobufs for store look like, here's what the prototype currently uses (read on for explanation):

message FirehoseRequest {
}

message FirehoseResponse {
  // 0 means no watermark
  uint64 watermark = 1;

  oneof update {
    openmatch.Ticket new_ticket = 3;
    string relisted_id = 4;
    string pending_id = 5;
    string assigned_id = 6;
    string deleted_id = 7;
  }
}

message CreateTicketRequest {
    openmatch.Ticket ticket = 1;
}

message CreateTicketResponse {}

message GetTicketRequest {
    string id = 1;
}

message GetTicketResponse {
    openmatch.Ticket ticket = 1;
    openmatch.Assignment assignment = 2;
}

message AssignTicketsRequest {
    repeated string ids = 1;
    openmatch.Assignment assignment = 2;
}

message AssignTicketsResponse {}

message ReleaseTicketsRequest {
  repeated string ids = 1;
}

message ReleaseTicketsResponse {}

message MarkPendingRequest{
    repeated string ids = 1;
}

message MarkPendingResponse {
    // Improvement: Return ids already assigned or removed, to fail matches.
}

message GetCurrentWatermarkRequest{}

message GetCurrentWatermarkResponse{
    uint64 watermark = 1;
}

message DeleteTicketRequest {
    string id = 1;
}

message DeleteTicketResponse {}

service Store {
  rpc Firehose(FirehoseRequest) returns (stream FirehoseResponse);
  rpc CreateTicket(CreateTicketRequest) returns (CreateTicketResponse);
  rpc GetTicket(GetTicketRequest) returns (GetTicketResponse);
  rpc AssignTickets(AssignTicketsRequest) returns (AssignTicketsResponse);
  rpc ReleaseTickets(ReleaseTicketsRequest) returns (ReleaseTicketsResponse);
  rpc MarkPending(MarkPendingRequest) returns (MarkPendingResponse);
  rpc GetCurrentWatermark(GetCurrentWatermarkRequest) returns (GetCurrentWatermarkResponse);
  rpc DeleteTicket(DeleteTicketRequest) returns (DeleteTicketResponse);
}

Everything except for "Firehose" and "GetCurrentWatermark" is the a fairly CRUD operation. They mirror create/edit/delete operations currently done to redis. MarkPending and ReleaseTickets specifically implements the ignoreList behavior, where tickets which have been returned by fetch matches are not returned in queries for a set amount of time.

"Firehose" here is a stream of all updates which are occuring to the OM data. For a typical ticket, it will be created, marked as pending, given an assignment, and then deleted.

The Firehose starts with a full update of the current state of the system. This is a create for every ticket, followed by a mark as pending or assignment for the relevant tickets.

After which, every modification is shared on the firehose. The store only introduces valid modifications (eg, no deleting an non-existent ticket) onto the stream.

Each update after the initial updates to bring the stream to the current state will also include a monotonically increasing watermark. As reading from the stream has the potential for delay, a client can ensure that the current operation will contain up to date information by using get watermark (which retrieves the watermark currently used in the store), followed by waiting until the watermark from the Firehose is greater than that of the watermark retrieved from GetWatermark.

Store internals

The internals of store are rather simple: There is a hashmap from ticket id to the tickets value and state. Also present is the current watermark. Guarding this map and watermark is a ReadWrite mutex. Any reads simply read lock and lookup into the map. Any edits will write lock, validate the edit (eg, not deleting a non-existent id), and then both make the modification to the map as well as send the update to all listening Firehose streams.

Query Internals

The query internals are also not complicated: When query starts up, it also starts a Firehose stream to the store. It has a loop where it gets updates from the store, and waits for a query from the MMF. When one or more mmf queries are waiting to run, query:

  1. perform a GetWatermark to ensure that its Firehose is up to date
  2. freeze the updates to the internal map
  3. in parallel perform in memory filters on all of the ticket data
  4. return to getting updates from the Firehose.

The go routines for each query request, after getting pointers to all of the relevant tickets, serialize and return the requests independent of any centralized synchronization.

Persistence

The current prototype has no persistence. However adding persistence is rather easy (though may affect perf numbers.)

The Firehose is inspired by append only databases, and provides the mechanism for saving and restoring state. As modification requests create entries on the Firehose, those are appended to a file then the file is persisted to disk.

If the store crashes, it simply opens the file and performs the Firehose updates to return to the current state.

This file will eventually grow long, and would require processing a large number of irrelevant updates (Tickets that have come and gone) to return to the current state. As such, on a heuristic (eg, capturing the current state is 1/20 of the size update file so far) a new Firehose is created to save the current state. This can be done in the background without pausing. Once the new file is up to date, atomically switch from the old file to the new file.

Scaling characteristics

Unlike redis, which currently has all queries go against the master, the store's load only scales due to the number of ticket and the number of instances of query. If running the Firehose is too expensive on the store to all the query instances, relay instances can also be utilized. As such this should scale out to any number of queries per Fetch Matches iteration that is required.

Alternatives considered

Current redis implementation

Redis is the current database layer for OM.

Tickets

Tickets are stored in individual keys on redis. The key is the ticket id, and the value is the proto serialization of the ticket. When a ticket is assigned, it will read the ticket, update the assignment field, and the write the ticket back to redis.

Indexing

Indexing is currently done through use of sorted sets. For a given index, the ticket id is inserted as the value, with associated number depending on the type of index. Eg, for double fields, the index value is the value of the field.

Doing a query for a ticket pool is achieved not through a standard DB index or scan, but by getting all of the ticket ids which match each of the index fields, and doing a set union. These keys are used to do ticket lookups for the full value.

This is not an optimal algorithm, as it doesn't really index data as much as do a full table scan without unmarshaling data. (See the "Typical SQL indexing" section for comparison to the typical indexing that redis isn't capable of.)

As seen in this example, querying from skill 30 to 60, with latency less than 150ms, a large number of tickets id which won't be included in the response are returned:

image

This can be a large problem for OM. Pools will probably only use a few filters, but those filters are likely to each be orthogonal.

SQL database

Typical SQL databases spend a lot of effort providing support for larger on disk storage, which isn't useful for OM. Additionally the indexing doesn't support OM very well:

Typical SQL indexing:

A typical database index would look like this:

In this example, tickets have three parameters: game mode (either controlPoint or payload), region (either EU, or NA), and then skill.

A binary tree index on this data would look like this:

image

(Where the starred nodes represent a ticket)

A query which specified a mode, region, and skill range can efficiently transverse this tree. Eg, for mode=CP, region=EU, and 4 <= skill <= 9, only the blue region of data is accessed.

This methodology isn't particularly useful for OM either, as updating any indexes is typically an expensive operation, combined with the fact that queries in OM doesn't always conform to this nice tree. Eg, tickets could be in multiple regions, searching for multiple modes. More indexes can make the queries more efficient, but the amount of ticket creations and deletions happening is to make maintaining the indexes very expensive.

Other noSQL databases

From what I could find, other NoSQL projects had similar problems as redis: they are fast but not particularly in the way OM needs.

Methods for scaling redis

Yufan is investigating this alternative, and will present it seperately.

Easy things to do if adopted

Beyond performance, there are a large number of modifications to OM which suddenly become easier to perform if using the store.

Limit total number of tickets

One of the biggest dangers of running OM in production is having so many tickets that things start to slow down, which causes a further tickets piling up, which causes further slow down, etc.

With store, it's rather easy to compare a configured limit on number of tickets against the current size of the map, rejecting requests if it would go over. In this case, OM could return an error indicating as much, and clients could retry later. (Probably presenting to the play "matchmaking", just taking longer because it hasn't actually been added to OM yet.)

Limit tickets in query

In a similar vain, a query might want to limit the number of tickets returned. With redis, we can limit the total number of tickets returned, however it doesn't save much processing, as any given "index" lookup has to return all of the ids. (As you don't know if out of the million ids returned, only 5 at the end will actually be in another set from another filter.)

Query pending and assigned tickets

We currently remove assigned tickets from indexes for efficiency. There are some cases where one might want to look up pending and assigned tickets (eg, delete all tickets), which is rather easy to support when query merely stores those tickets in its map.

Delete assigned tickets after period

A ticket which has been assigned for more than a minute either wasn't properly cleaned up, or likely is too late for the client to get into the game anyways. (backfill or the match would be canceled) We could add a process to clean up tickets in redis, but it would require setting new indexes and a background process, or using no indexes and setting the redis timeout.

Freeze pool during synchronizer window

Pausing updates to the Firehose during a synchronization window would allow all MMFs to work on the exact same pool of tickets. I think this is desirable as it would simplify the conceptual model for working with OM. It helps with debugging MMF logic, and diagnosing quality problems.

Write firehose to archive, rewind test instance to archive snapshot

Related: If the firehose persistence files are saved in an archive, they can be used to set any instance of OM to a specific point in time. This would allow OM users to diagnose quality problems or bugs by identifying the specific time they want to diagnose. Then instruct OM to load the file from that point (would require tooling). At this point, the match functions, queries, etc, can all be run in the same state as it was in production.

This might also be useful for testing rollouts of new code, and changes to code to detect changes in quality.

Atomic - assign only if present and not already assigned

This is somewhat a flaw of the current implementation: Tickets can be assigned multiple times, and may revive a ticket which is deleted in very specific race conditions. It would be better to have a clear error in this case.

Delete only if safe to delete

Currently deleting a ticket is very forceful. Even if the ticket is currently being marked as pending by the synchronizer, or is pending, the ticket will be deleted. It would be much better to be able to indicate on the delete whether it is a forceful delete or not. Eg, a client clicks cancel matchmaking, but they're currently being assigned a server, either the cancel should fail or the match should be canceled. Otherwise it broadens the window for clients to never show up to potentially competitive games.

Some discussion on slack was about this.

FE listen to assignments

Given the Firehose, a simplified version which just has the IDs of created and assigned tickets would make waiting for tickets much more efficient. If a client is waiting for assignment on the FE, the FE instance would be notified that that ticket ID has been assigned, and it can now ask for that assignment without causing undue strain on the main database.

Idempotent ticket creation

Currently a failed created followed by a successful create might result in two tickets for the same player/group to exist in OM, if the fail occurs after the ticket has been added to the database. It would be rather cheap to add an idempotent key to the creation. Adding this to redis would involve adding another type of key, or allowing clients to specify their own ticket ids.

sawagh commented 4 years ago

I have listed some questions and some other thoughts for disussion in the TSC.

Questions

Firehose details

Trying to understand the content that will be streamed. Lets say the following happens:

What FirehoseResponse() will FH2 get?

Deletes

The store only introduces valid modifications (eg, no deleting an non-existent ticket) onto the stream.

Can you explain Delete? What happens after delete on the store? Does the ticket get removed from the map? Does the Firehose relay this to all the open firehose streams so they respectively act on it?

Watermark and current state

Whats the intent of GetWatermark(). Say in a scenario, a Query server calls GetWatermark() and get a value and then waits for Firehose responses to match or exceed that, wouldn't the store watermark have moved ahead?

Other Notes

Here are some thoughts / questions on specific sections of the proposal:

Query Internals

Query stores an internal map (similar to store) and with the Firehose mechanism keeps it up to date with the store right? When receiving a query with multiple filters, do you iterate over the entire map to shortlist subsets that match the filters?

Logically, it looks like there are two windows - one where the hose keeps updating the map - another where a set of one or more queries freeze and read. Any constraints in place to make sure a continuous stream of reads dont hog all the time and the map goes stale from store?

Similar question for store - would you freeze the map from writes when sending firehose updates (specially the new request that requests where the intent is to build the entire state from scratch)?

Persistence

The logic you have laid out works. However, I do think implementing it right with the error handling / failure paths and making this performant and validating it may need significant work.

Scaling 

In the basic design, I guess the core store is limited to 1 instance? Is that correct? Firehose streams seem like the mechanism to essentially replicate entire state to other scalable entities (isn't Query a copy of the same map always trying to get lazily in sync with it?).

For the case you mention around scaling firehose calls on the master, you mention a relay mechanism. Can you elaborate? Do you have a replicated store instances (identical) in mind? Or do you have instances with sharded key space / request forwarding in mind?

Again, my gut is, scaling this (either to consistently updated replicas - or sharded key-space) would be a significantly complex.

Consistency

As we are considering spreading reads across Redis replicas (other proposal), we are exploring some concerns around staleness impacting correctness. Given that Query servers have a (potentially) lazy copy of the map, are we not prone to the same concerns? Basically writes going to a master copy and reads happening from lazy replicas?

High Availability 

What happens if the store instance is completely hosed? Would we build multiple instances (slaves) that use a Firehose to get updates from master and can themselves be master if it goes down? And need some sort of leader election mechanism for the cluster? (Similar to Redis Sentinel?) 

Performance

Single ReadWrite lock guarding the map and the watermark? With multiple frontend / backends making edits to the ticket state (and the synchronizer too), and scaling all query requests to the store, do you foresee contention on the single lock being a concern at scale (when we have multiple firehose running to query / persistence / slaves?)

If the stream terminates / or if a new query server comes up by autoscaling, how efficient is it to pass the entire stream of all the tickets to it? Also, if writing needs exclusive locks. do you prevent updates to the master map till the entire state is replicated? does this have perf implications?

Laremere commented 4 years ago

What FirehoseResponse() will FH2 get?

TicketA -- Created TicketB -- Created --> Pending TicketC -- Created--> Assigned TicketD -- Created

Basically, bring up to current state.

Can you explain Delete? What happens after delete on the store? Does the ticket get removed from the map? Does the Firehose relay this to all the open firehose streams so they respectively act on it?

Yes, delete removes it from the map and instructs through the firehose to the query instances to also delete it.

Whats the intent of GetWatermark(). Say in a scenario, a Query server calls GetWatermark() and get a value and then waits for Firehose responses to match or exceed that, wouldn't the store watermark have moved ahead?

Watermarks are the consistency mechanism. When a query comes in, it will always perform a getquery after the connection started, and only process the query once updates are past the getquery's watermark. image

Query operates in a loop:

  1. Process updates and wait for a query request.
  2. Join all currently waiting query requests into one.
  3. Get watermark, and while waiting for the watermark process updates.
  4. Process updates until the update's watermark meets or exceeds the watermark from get watermark.
  5. Let the query requests read and filter tickets from the map. (Waiting for them to finish using the map BUT NOT return all the data to the client.)
  6. Loop back to one.

If any requests come in after #2, they must wait until the cycle gets back around to #1 or #2 again. However if multiple requests come in, they all join the same cycle.

(There also aren't any consistency concerns which appear with redis from info being in multiple places. Eg, in this if a ticket goes from pending to deleted, query will either see it as pending or as deleted, it won't have any potential intermediate states where it has to de-index and remove from ignore list which also need to be read correctly)

Query stores an internal map (similar to store) and with the Firehose mechanism keeps it up to date with the store right?

correct

When receiving a query with multiple filters, do you iterate over the entire map to shortlist subsets that match the filters?

Yes. All waiting queries loop at the same time, so it alternates between updating and running queries. In practice this is very fast, I think I measure 4ns for 1 million tickets at one point.

Logically, it looks like there are two windows - one where the hose keeps updating the map - another where a set of one or more queries freeze and read. Any constraints in place to make sure a continuous stream of reads dont hog all the time and the map goes stale from store?

See details for getWatermark

Similar question for store - would you freeze the map from writes when sending firehose updates (specially the new request that requests where the intent is to build the entire state from scratch)?

New firehose request does require a short read lock (as does any get) on the map. Updates do not need to be sent before the lock is released: It's basically a linked list, where new items are added, and each firehose individually reads the linked list and send it. A channel which is closed on each linked list node prevents firehoses from reading too far.

The logic you have laid out works. However, I do think implementing it right with the error handling / failure paths and making this performant and validating it may need significant work.

I agree.

In the basic design, I guess the core store is limited to 1 instance? Is that correct? Firehose streams seem like the mechanism to essentially replicate entire state to other scalable entities (isn't Query a copy of the same map always trying to get lazily in sync with it?).

Yes, without building a sharding mechanism it is limited to one store.

For the case you mention around scaling firehose calls on the master, you mention a relay mechanism. Can you elaborate? Do you have a replicated store instances (identical) in mind? Or do you have instances with sharded key space / request forwarding in mind?

I don't think it's actually going to be necessary at all, but this is the idea: image

Again, my gut is, scaling this (either to consistently updated replicas - or sharded key-space) would be a significantly complex.

Consistency is easy and already there. Sharding within just the store (eg, 10 maps with individual locks around them) seems rather easy. If you're at the scale you need to shard, affording a large VM which can handle the large store shouldn't be a concern.

As we are considering spreading reads across Redis replicas (other proposal), we are exploring some concerns around staleness impacting correctness. Given that Query servers have a (potentially) lazy copy of the map, are we not prone to the same concerns? Basically writes going to a master copy and reads happening from lazy replicas?

See get watermark details

What happens if the store instance is completely hosed? Would we build multiple instances (slaves) that use a Firehose to get updates from master and can themselves be master if it goes down? And need some sort of leader election mechanism for the cluster? (Similar to Redis Sentinel?) 

As long as persistence is solve, this is solved. There is a recovery period when reading the file. So no to failover, but yes to recovery.

Single ReadWrite lock guarding the map and the watermark? With multiple frontend / backends making edits to the ticket state (and the synchronizer too), and scaling all query requests to the store, do you foresee contention on the single lock being a concern at scale (when we have multiple firehose running to query / persistence / slaves?)

Firehoses don't affect lock contention except when first starting. Lock contention is a minor concern, but as mentioned above, sharding within a store instance seems like it'd be rather easy.

If the stream terminates / or if a new query server comes up by autoscaling, how efficient is it to pass the entire stream of all the tickets to it? Also, if writing needs exclusive locks. do you prevent updates to the master map till the entire state is replicated? does this have perf implications?

There is a short lock period, but it does not last until the entire state is replicated, only until the current list of tickets and their state updates is read from the map.

Laremere commented 4 years ago

A couple other notes:

The firehose doesn't need to actually need to have ticket objects. It would be more efficient to use send bytes. The only thing store needs is the ID, which could be read without deserializing and reserializing tickets and assignments.

One more thing: More filter types are super easy

It's just a matter of proto changes, and then a change in the filter library. If people need a custom type of filter, they can easily fork OM and customize.

Laremere commented 4 years ago

Going to replace this proposal with one that is only modifying the query, leaving redis as the state store (though used differently)