whitfin / cachex

A powerful caching library for Elixir with support for transactions, fallbacks and expirations
https://hexdocs.pm/cachex/
MIT License
1.6k stars 104 forks source link

Fetch can allow multiple executions of `fallback` function #347

Closed camilleryr closed 5 months ago

camilleryr commented 5 months ago

First off - thanks, as always, for maintaining this library!

I had a question / bug(?) report regarding fetch/4

I rely heavily on fetch/4 when using cachex to leverage the fact that the fallback function will only be called once per key per ttl and recently found out that the reason for a very infrequent test failure was that the fallback function was in fact being called multiple times due to what I assume is a race condition

The general flow for fetch is as follows

stateDiagram-v2
    state cache_hit_state <<fork>>
    state fetch_state <<fork>>
    state task_notification <<join>>
    state response_state <<join>>

    [*] --> Cachex.fetch/4
    Cachex.fetch/4 --> check_cache
    check_cache --> cache_hit_state
    cache_hit_state --> cache_hit
    cache_hit_state --> cache_miss

    cache_miss --> dispatch_to_currier
    dispatch_to_currier --> fetch_state
    fetch_state --> task_executing
    fetch_state --> no_task_executing

    state Currier {
    task_executing --> queue_caller_await_notification
    no_task_executing --> execute_task
    no_task_executing --> queue_caller_await_notification
    queue_caller_await_notification --> task_notification
    update_cache --> task_notification
    task_notification --> notification
    notification --> reply_to_callers
    }

    state external_task {
        execute_task --> fallback
        fallback --> update_cache
    }

    reply_to_callers --> response_state
    cache_hit --> response_state

    response_state --> return_value

I believe the issue I am seeing is related to the fact that the initial check cache / queue a message with the currier is not an "atomic" operation and can interleave in a way in which we would execute the fallback function a second time.

Imagine a situation where the Currier has dispatched a task to generate the value for key_1 A new client (Client_3) now calls fetch/4 for the value of key_1

Here is an attempt to diagram that same workflow

sequenceDiagram
    box cache
    participant Cache
    end
    box Clients
    participant Client_1
    participant Client_2
    participant Client_3
    end
    box Currier
    participant Currier
    participant Task
    end

    Client_1->>Cache: check key_1
    Cache->>Client_1: no value
    Client_1->>Currier: fetch key_1
    Currier->>Task: dispatch
    Currier->>Currier: record Client_1->key_1
    Client_2->>Cache: check key_1
    Cache->>Client_2: no value
    Client_2->>Currier: fetch key_1
    Currier->>Currier: record Client_2->key_1
    Task->>Task: execute_fallback
    Client_3->>Cache: check key_1
    Cache->>Client_3: no value
    Task->>Task: update_cache
    Client_3->>Currier: fetch key_1
    Task->>Currier: notification of result
    Currier->>Client_1: result
    Currier->>Client_2: result
    Client_3->>Currier: fetch key_1
    Currier->>Task: dispatch

I was wondering if you agree that this behavior is undesirable and if you do agree wanted to see what your thoughts were on a potential resolution. My first thoughts was that we could check the cache again from the Currier before executing the task to generate the value - this would add some additional overhead, but only for the single message that would execute the task

whitfin commented 5 months ago

Hi @camilleryr!

Unless I'm mistaken, this is not the case and should not be possible. The courier is backed by a GenServer, which means at most one call is running (because the others are guaranteed to block on the response from the first). The flow is this:

  1. fetch/4 is called by Process A
  2. Courier receives and triggers a fetch worker
  3. fetch/4 is called by Process B
  4. Courier receives and attaches Process B to the worker from Step 2
  5. Worker resolves and notifies Courier
  6. Courier notifies both Process A and Process B of the result

As the Courier is a GenServer, there should be no ability to race and cause inconsistency with the state here. The only case I can imagine this happening is if a fetch/4 call happens at the same time the Courier is resolving existing wait points, which means that it'd re-queue and re-fetch afterwards. The timing on this is so specific that I would be surprised if this was what you were seeing (but even then I'd probably count this behaviour as "correct").

If this is the case, there really isn't anything I can do about it beyond adding an option to disallow fetch if the key already exists; but this feels kinda awkward. If it's necessary, then we can - but I feel that it might be the conditions of your test specifically rather than something that people will hit in "real" workflows.

Can you provide me a code example that can reproduce this? My instinct is that this isn't a bug, but it'd be easier to look at with a smaller reproduction case. Maybe you can share your test (or a slim version of it) that I can use to take a look?

camilleryr commented 5 months ago

Can you provide me a code example that can reproduce this? My instinct is that this isn't a bug, but it'd be easier to look at with a smaller reproduction case. Maybe you can share your test (or a slim version of it) that I can use to take a look?

I was able to write a test that shows this behavior

  test "fetching will only call fallback once per key" do
    cache = Helper.create_cache()
    agent = start_supervised!({Agent, fn -> %{} end})
    number_of_tests = 100

    for test_index <- 1..number_of_tests do
      test_key = "test_key_#{test_index}"

      1..(System.schedulers_online() * 2)
      |> Task.async_stream(fn _ ->
        Cachex.fetch(cache, test_key, fn ->
          Agent.update(agent, fn state ->
            Map.update(state, test_key, 1, &(&1 + 1))
          end)
        end)
      end)
      |> Stream.run()
    end

    call_counts =
      agent
      |> Agent.get(& &1)
      |> Enum.frequencies_by(fn {_key, count} -> count end)

    assert %{1 => number_of_tests} == call_counts
  end
  1) test fetching will only call fallback once per key (Cachex.Actions.FetchTest)
     test/cachex/actions/fetch_test.exs:4
     Assertion with == failed
     code:  assert %{1 => number_of_tests} == call_counts
     left:  %{1 => 100}
     right: %{1 => 80, 2 => 20}
     stacktrace:
       test/cachex/actions/fetch_test.exs:28: (test)

The number of "test" that end up with 2 calls fluctuates, which does suggest (to me) that it is a nondeterministic temporal bug

The timing on this is so specific that I would be surprised if this was what you were seeing

I agree that it is a really specific timing condition that can cause this - but at least according to this test its not so uncommon that it cannot be easily forced. That being said, this is a pretty contrived example, so the chance it happening in the wild is hard to quantify - and the effect of it happening is probably relatively minimal

but even then I'd probably count this behaviour as "correct"

It was not the behavior I expected, but if guaranteeing at most one successful execution of the fallback function per key per ttl is not in the purview of cachex, I totally get it!

If this is the case, there really isn't anything I can do about it

I think adding an additional check of the cache can remove this gap at the expense of an additional check - this patch makes the test pass (forgive the gross nested case statement, this involved the smallest change, its not what I would PR but it does exemplify the change that could resolve this)

  def handle_call({:dispatch, key, task, stack}, caller, {cache, tasks} = state) do
    case Map.get(tasks, key) do
      {pid, listeners} ->
        {:noreply, {cache, Map.put(tasks, key, {pid, [caller | listeners]})}}

      nil ->
        case Get.execute(cache, key, []) do
          {:ok, nil} ->
            parent = self()

            worker =
              spawn_link(fn ->
                result =
                  try do
                    task.()
                  rescue
                    e ->
                      {
                        :error,
                        %ExecutionError{
                          message: Exception.message(e),
                          stack: stack_compat() ++ stack
                        }
                      }
                  end

                formatted = Actions.format_fetch_value(result)
                normalized = Actions.normalize_commit(formatted)

                with {:commit, val, options} <- normalized do
                  Put.execute(cache, key, val, [
                    const(:notify_false) | options
                  ])
                end

                send(parent, {:notify, key, formatted})
              end)

            {:noreply, {cache, Map.put(tasks, key, {worker, [caller]})}}

          {:ok, _value} = res ->
            {:reply, res, state}
        end
    end
  end
whitfin commented 5 months ago

Awesome, thank you for the reproduction - I'll check it out tonight. Just to check, does this still happen in the latest main?

I agree that something like what you suggest is also what I had in mind. My first thought was that it felt maybe a bit awkward, but I guess there's no real harm. The overhead is tiny, but it does solve this case with a minimal change.

If we do change this, I assume it's okay to be bundled into the next release (which is tentatively a 4.0, so it could be a little while away)? It doesn't feel like something which needs an urgent patch, but maybe you disagree?

camilleryr commented 5 months ago

does this still happen in the latest main?

Yes - this was working off of main

If we do change this, I assume it's okay to be bundled into the next release (which is tentatively a 4.0, so it could be a little while away)? It doesn't feel like something which needs an urgent patch, but maybe you disagree?

4.0 sounds good to me! This has really only caused an issue in that flaky test, so getting it fixed will really only make CI a little less of a hassle. Also, the code base that this test is on is already using main as a dep because of the async warmer patches that were merged recently - so we can always just use latest so that test doesnt fail.

If you do want to move forward with this fix, let me know and Ill be happy to get the PR up for review with a cleaned up version of this test and this patch

whitfin commented 5 months ago

Hi @camilleryr!

I verified the issue, and yep your fix seems as good as anything I could come up with - so if you want to PR it, then go right ahead! Also happy for you to add your test case to the bottom of courier_test.exs as well, if you want to do that too.

I'm fairly confident that people won't be hitting this in a "realistic" workflow, so I'm not going to push this anything out immediately. Even if it was to be hit I feel like you'd end up with a wasted fetch call but still the same result; having fetch change other state as it does in the test seems like something unrealistic (which is fine, just making a note!).

That being said your test case is pretty much 100% reproducible on my machine so I do definitely consider this a "bug", for the record - and thank you for the effort put in to help fix it! :)