ash-project / ash

A declarative, extensible framework for building Elixir applications.
https://www.ash-hq.org
MIT License
1.61k stars 219 forks source link

nested bulk_create failing when using transaction: :batch #1383

Open yasoob opened 2 months ago

yasoob commented 2 months ago

Describe the bug

A nested bulk_create fails when a parent bulk_create is called with transaction: :batch.

Explanation:

I have two resources that are joined using a through resource:

  1. Subscriber
  2. List
  3. SubscriberList (through resource)

I then use bulk_create to create a bunch of Subscribers and then use the after_batch hook to create SubscriberList records to relate the Subscriber with an existing List. I use a bulk_create within the after_batch hook to create these through records.

This fails when I call the original bulk_create on the Subscriber and pass in transaction: :batch. However, it passes when I pass transaction: false. The failure I get is this:

* Invalid value provided for subscriber_id: does not exist.

This is what the after_batch hook looks like:

@impl true
  def after_batch(results, _opts, context) do
    Stream.map(results, fn {changeset, subscriber} ->
      %{
        subscriber_id: subscriber.id,
        list_id: changeset.arguments.list_id,
        status: changeset.arguments.list_subscription_status
      }
    end)
    |> Enum.map(&dbg/1)
    |> Ash.bulk_create!(
      Daakia.Newsletters.SubscriberList,
      :create,
      return_records?: false,
      upsert?: true,
      batch_size: 1000,
      upsert_fields: [:status, :meta],
      transaction: false,
      return_errors?: true,
      max_concurrency: 10,
      tenant: context.tenant,
      domain: Daakia.Newsletters
    )

    Stream.map(results, fn {_changeset, result} ->
      {:ok, result}
    end)
  end

I added a dbg in the after_batch hook (visible in code above) to make sure the subscribers are getting created before the nested bulk_create is executed and it does print proper maps:

%{
  status: :confirmed,
  list_id: "01914426-b560-7f9b-8102-c56e969fc41e",
  subscriber_id: "01914426-c052-717b-b160-ebb3c2935e84"
}

So even though the subscriber_id is populated, presumably after creating a successful record in the db (within the transaction), the create action of SubscriberList complains that subscriber_id does not exist.

To Reproduce

Here is a Subscriber resource: ```elixir defmodule Daakia.Newsletters.Subscriber do alias Daakia.Newsletters.Subscriber.Types use Ash.Resource, data_layer: AshPostgres.DataLayer, authorizers: [Ash.Policy.Authorizer], domain: Daakia.Newsletters postgres do table "subscribers" repo Daakia.Repo references do reference :organization, on_delete: :delete end end actions do defaults [:create, :update, :destroy] create :new_subscriber do primary? true upsert? true upsert_identity :unique_email accept [:email, :name, :attributes, :status] argument :list_id, :uuid, allow_nil?: false argument :list_subscription_status, Types.ListSubscriptionStatus, allow_nil?: false change Daakia.Newsletters.Changes.CreateSubscriberlistEntry end read :read do primary? true pagination do offset? true countable :by_default default_limit 10 end end end policies do policy action_type(:read) do authorize_if always() end policy action_type(:create) do authorize_if always() end policy action_type(:update) do authorize_if relates_to_actor_via(:user) end policy action_type(:destroy) do authorize_if relates_to_actor_via(:user) end end multitenancy do strategy :attribute attribute :organization_id end attributes do uuid_v7_primary_key :id attribute :email, :ci_string do allow_nil? false end attribute :name, :string do allow_nil? true end attribute :attributes, :map do allow_nil? false default %{} end attribute :status, Types.OverallSubscriptionStatus do allow_nil? false default :enabled end create_timestamp :created_at update_timestamp :updated_at end relationships do many_to_many :lists, Daakia.Newsletters.List do through Daakia.Newsletters.SubscriberList source_attribute_on_join_resource :subscriber_id destination_attribute_on_join_resource :list_id join_relationship :list_subscribers end belongs_to :organization, Daakia.Accounts.Organization, domain: Daakia.Accounts, allow_nil?: false end identities do identity :unique_email, [:email] end end ```
Here is a List resource ```elixir defmodule Daakia.Newsletters.List do use Ash.Resource, otp_app: :daakia, data_layer: AshPostgres.DataLayer, authorizers: [Ash.Policy.Authorizer], domain: Daakia.Newsletters postgres do table "lists" repo Daakia.Repo references do reference :organization, on_delete: :delete end end actions do defaults [:read, :update, :destroy] create :create_new_list do primary? true accept [:name, :description, :type, :optin] argument :subscribers, {:array, :map}, allow_nil?: true change relate_actor(:owner) change manage_relationship(:subscribers, type: :append_and_remove, on_no_match: :create ) end end policies do policy action_type(:read) do authorize_if always() end policy action_type(:create) do authorize_if actor_present() end policy action_type(:update) do authorize_if relates_to_actor_via(:owner) end policy action_type(:destroy) do authorize_if relates_to_actor_via(:owner) end end multitenancy do strategy :attribute attribute :organization_id end attributes do uuid_v7_primary_key :id attribute :name, :string do allow_nil? false public? true end attribute :description, :string do allow_nil? true end attribute :type, Daakia.Newsletters.List.Types.Type do allow_nil? false end attribute :optin, Daakia.Newsletters.List.Types.OptinType do allow_nil? false end create_timestamp :created_at update_timestamp :updated_at end relationships do belongs_to :owner, Daakia.Accounts.User do domain Daakia.Accounts allow_nil? false public? true end many_to_many :subscribers, Daakia.Newsletters.Subscriber do through Daakia.Newsletters.SubscriberList source_attribute_on_join_resource :list_id destination_attribute_on_join_resource :subscriber_id end belongs_to :organization, Daakia.Accounts.Organization do domain Daakia.Accounts allow_nil? false end end identities do identity :unique_name, [:name] end end ```
Here is the associated SubscriberList resource ```elixir defmodule Daakia.Newsletters.SubscriberList do alias Daakia.Newsletters.Subscriber.Types use Ash.Resource, data_layer: AshPostgres.DataLayer, domain: Daakia.Newsletters postgres do table "subscriber_lists" repo Daakia.Repo references do reference :organization, on_delete: :delete reference :list, on_delete: :delete reference :subscriber, on_delete: :delete end end code_interface do define :create, action: :create end actions do defaults [:read, :destroy, update: :*] create :create do primary? true accept [:status, :subscriber_id, :list_id] upsert? true end update :unsubscribe_from_list do accept [] validate attribute_does_not_equal(:status, :unsubscribed) do message "User is already unsubscribed" end change set_attribute(:status, :unsubscribed) end end multitenancy do strategy :attribute attribute :organization_id end attributes do attribute :status, Types.ListSubscriptionStatus do allow_nil? false end attribute :meta, :map do allow_nil? false default %{} end create_timestamp :created_at update_timestamp :updated_at end relationships do belongs_to :subscriber, Daakia.Newsletters.Subscriber, primary_key?: true, allow_nil?: false belongs_to :list, Daakia.Newsletters.List, primary_key?: true, allow_nil?: false belongs_to :organization, Daakia.Accounts.Organization, domain: Daakia.Accounts, allow_nil?: false end end ```
Here is the CreateSubscriberListEntry change ```elixir defmodule Daakia.Newsletters.Changes.CreateSubscriberlistEntry do use Ash.Resource.Change @doc """ This will create the associated through record in the subscribers_list table """ @impl true def change(changeset, _, _) do status = Ash.Changeset.get_argument(changeset, :list_subscription_status) Ash.Changeset.after_action(changeset, fn changeset, subscriber -> Daakia.Newsletters.SubscriberList.create!( %{ subscriber_id: subscriber.id, list_id: changeset.arguments.list_id, status: status }, upsert?: true, upsert_fields: [:status], tenant: changeset.tenant ) {:ok, subscriber} end) end @doc """ We do not run an after_action hook for batches but rather the after_batch hook """ @impl true def batch_change(changesets, _opts, _context) do changesets end @doc """ This will create the associated through records for the created subscribers in batch """ @impl true def after_batch(results, _opts, context) do Stream.map(results, fn {changeset, subscriber} -> %{ subscriber_id: subscriber.id, list_id: changeset.arguments.list_id, status: changeset.arguments.list_subscription_status } end) |> Enum.map(&dbg/1) |> Ash.bulk_create!( Daakia.Newsletters.SubscriberList, :create, return_records?: false, upsert?: true, batch_size: 1000, upsert_fields: [:status, :meta], transaction: false, return_errors?: true, max_concurrency: 10, tenant: context.tenant, domain: Daakia.Newsletters ) Stream.map(results, fn {_changeset, result} -> {:ok, result} end) end end ```

This code is exercised like this:

random_subscribers =
        Task.async_stream(
          0..100,
          fn _i ->
            %{
              email: Faker.Internet.email(),
              name: Faker.Person.name(),
              status: :enabled,
              list_subscription_status: :confirmed,
              list_id: list.id
            }
          end,
          ordered: false
        )
        |> Stream.chunk_every(1000)
        |> Stream.map(fn list ->
          Enum.reduce(list, [], fn {:ok, result}, acc -> [result | acc] end)
          |> Ash.bulk_create!(
            Subscriber,
            :new_subscriber,
            return_records?: true,
            upsert?: true,
            batch_size: 1000,
            upsert_identity: :unique_email,
            upsert_fields: [:name, :attributes, :status],
            transaction: false,
            return_errors?: true,
            max_concurrency: 10,
            tenant: organization.id,
            domain: Newsletters
          )
        end)
        |> Enum.to_list()

Expected behavior

I expected the nested bulk_create to succeed with transaction: :batch.

Runtime

Additional context N/A

zachdaniel commented 2 months ago

I will look into this this week. However, if you could provide a more trimmed down reproduction that would be great. Ideally in a new project I can clone down and run locally. It can take a lot of time even just copying resources because I have to figure out all the policies, other things at play, data layer specifics, etc.

yasoob commented 2 months ago

I totally understand. The policies and all can be safely removed. They are not doing anything useful here. I will try to create a more trimmed down example later.