peburrows / diplomat

Elixir library for interacting with Google's Cloud Datastore
94 stars 22 forks source link

Transactions not working correctly? #25

Closed jvoegele closed 7 years ago

jvoegele commented 7 years ago

The documentation for Datastore states the following about transactions:

When two or more transactions simultaneously attempt to modify entities in one or more common entity groups, only the first transaction to commit its changes can succeed; all the others will fail on commit. Because of this design, using entity groups limits the number of concurrent writes you can do on any entity in the groups. When a transaction starts, Cloud Datastore uses optimistic concurrency control by checking the last update time for the entity groups used in the transaction. Upon commiting a transaction for the entity groups, Cloud Datastore again checks the last update time for the entity groups used in the transaction. If it has changed since our initial check, an error is returned.

Essentially, two or more transactions may not concurrently modify any given entity (or any set of entities in the same entity group). If more than one transaction attempts to make concurrent modifications, only the first to commit will succeed and any others will receive an error. I have verified this behavior using a Ruby script and the google-cloud-datastore Ruby gem.

When using Elixir and Diplomat, however, it seems as if concurrent modifications by multiple transactions do not exhibit this behavior. Instead, all transactions succeed on commit and data written by one transaction will be overwritten by data from the next transaction. It should be the case that only the first transaction should successfully commit, and subsequent transactions should fail with an error.

This seemingly incorrect behavior can be verified by having multiple processes updating the same entity within a transaction. Each update in each process would:

  1. Start a new transaction with Diplomat.Transaction.begin/0.
  2. Read the current value of the entity from Datastore using a simple key lookup.
  3. Sleep for a configurable period of time. (See below for explanation.)
  4. Update some attribute of the entity.
  5. Save the updated entity with Diplomat.Transaction.update/2 and commit the change with Diplomat.Transaction.commit/1.

The reason for the sleep in step 3 is so that if we start multiple processes with sufficiently different sleep times, this ensures that there will be an update collision, since the slower process will start a transaction and the faster process will concurrently start its own transaction and commit it before the slower process can complete.

If transactions exhibited the documented behavior, then this scheme should result in an error when the slower process tries to commit a transaction for which the faster process has already successfully committed changes to the same entity. However, when testing this scenario, I found that all transactions always succeed and that the slower process simply overwrites the data written by the faster process. I haven't yet discovered why this happens with Elixir and Diplomat, but does not happen with Ruby, but I suspect it might be due to the fact that that Diplomat is still using the v1beta3 protobuf schema, whereas Ruby is using v1.

Below is an Elixir script file that implements the scenario outlined above, for reference.

alias Diplomat.{Key, Entity, Transaction}

defmodule DatastoreWriter do
  require Logger

  def start_link(uuid, delay) do
    state = %{
      uuid: uuid,
      delay: delay,
      key: Key.new("Rule", uuid, Key.new("Service", "transactional",  Key.new("Company", "transactional"))),
      rule: %{"id" => uuid, "version" => 0}
    }
    Agent.start_link(fn -> state end)
  end

  def insert_initial(agent) do
    state = get_state(agent)
    result = Transaction.begin fn(tx) ->
      Transaction.insert(tx, Entity.new(state.rule, state.key))
    end
    # Logger.info("insert_initial -> #{inspect result}")
  end

  def write_versions(agent, versions) when is_list(versions) do
    Task.async fn ->
      for version <- versions, do: write_version(agent, version)
    end
  end

  def write_version(agent, version) do
    state = get_state(agent)
    tx = Transaction.begin()
    new_rule =
      state.key
      |> lookup_key()
      |> Entity.properties()
      |> Map.update!("version", fn(_) -> version end)
    Process.sleep(state.delay)
    result = try do
      tx
      |> Transaction.update(Entity.new(new_rule, state.key))
      |> Transaction.commit()
    rescue
      e ->
        Logger.error("!!!!! Transaction error: #{inspect e}")
        Transaction.rollback(tx)
        {:error, e}
    end
    # Logger.info("update_version(#{version}) -> #{inspect result}")
    result
  end

  defp get_state(agent) do
    Agent.get(agent, &(&1))
  end

  def lookup(agent) do
    lookup_key(get_state(agent).key)
  end

  def lookup_key(key) do
    [entity] = Key.get(key)
    entity
  end
end

uuid = UUID.uuid1()
{:ok, slow_writer} = DatastoreWriter.start_link(uuid, 500)
{:ok, fast_writer} = DatastoreWriter.start_link(uuid, 100)
DatastoreWriter.insert_initial(fast_writer)

slow_writer_task = DatastoreWriter.write_versions(slow_writer, Enum.map(1..10, &("slow.#{&1}")))
fast_writer_task = DatastoreWriter.write_versions(fast_writer, Enum.map(1..10, &("fast.#{&1}")))

slow_writer_results = Task.await(slow_writer_task, 60_000)
fast_writer_results = Task.await(fast_writer_task, 60_000)

IO.puts("***** fast_writer_results:")
IO.inspect(fast_writer_results)
IO.puts("***** slow_writer_results:")
IO.inspect(slow_writer_results)

IO.puts("Final value written:")
IO.inspect(DatastoreWriter.lookup(fast_writer))
jvoegele commented 7 years ago

One difference that I noticed about Diplomat as compared to the Ruby Datastore library (and the other Datastore libraries for other languages, too) is that Diplomat does not have a get or lookup kind of function on its Transaction module. Ruby, for example, has a Transaction.find method that takes a key and reads the entity from datastore within the scope of the transaction. Diplomat does not have a corresponding Transaction.find function, but rather just has the Diplomat.Key.get/1 function that (apparently) is not transaction-aware. Could this be the problem?

peburrows commented 7 years ago

Very interesting. Let me dig into this a bit and get back to you with what I find.

I had started work to update to the v1.proto definitions a while ago, but was blocked at the time by some PRs I was waiting to get merged into exprotobuf (the lib Diplomat uses for its protobuf support). At this point, I believe exprotobuf should have full support for what we need, and we should be able to move to the newest proto definition as a first step.

Also, your point about Diplomat.Key.get/1 not being transaction aware is a good one. I'll track this down as part of my investigation here as well.

cjab commented 7 years ago

This PR adds a transaction aware lookup to the Transaction module. This seems to solve the problem documented above.

https://github.com/peburrows/diplomat/pull/26

peburrows commented 7 years ago

@jvoegele, I've just published v0.8.1 to hex which includes @cjab's changes. All the tests are passing, but I haven't had a chance to verify that it fixes the bug you outlined above. I should have a chance to verify the fix some time this weekend if you don't get to it before I do.

jvoegele commented 7 years ago

Thanks @peburrows! FYI, @cjab and I have tested these changes using the script that I included in the issue description. With these changes in place, we did indeed encounter transaction errors when multiple transactions attempted to modify the same entity concurrently. This is the expected behavior for transactions based on the Google docs, so we feel confident that this solves the problem that we were having.

Thanks again!

peburrows commented 7 years ago

@jvoegele — Awesome, glad to hear it. Based on your feedback, I'm going to close this issue. Please let me know if I need to reopen it for any reason.

And thanks for the excellent writeup of the issue! Your explanation and test script, along with @cjab's patch, made my job super easy on this one! 😄