private-attribution / ipa

A raw implementation of Interoperable Private Attribution
MIT License
42 stars 25 forks source link

OPRF stall #867

Open benjaminsavage opened 10 months ago

benjaminsavage commented 10 months ago

Starting Helpers:

btsavage@btsavage-mbp raw-ipa % cargo run --release --no-default-features --features "cli web-app real-world-infra compact-gate stall-detection" --bin helper -- --identity 3 --network test_data/network.toml --port 3002 --disable-https

Running Script:

btsavage@btsavage-mbp raw-ipa % SIZE=1000000; cargo run --release --bin report_collector --features="clap cli test-fixture" -- gen-ipa-inputs -n $SIZE --max-breakdown-key 256 --report-filter all --max-trigger-value 7 --seed 123 > /tmp/events-${SIZE}.txt                                                 

Helper 1

2023-11-29T00:36:37.268350Z  WARN stall_detector{role=H1}: ipa_core::helpers::gateway::stall_detection::gateway: Helper is stalled sn=10657796 state=
{
"gate[11422]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case", from=H3. Waiting to receive records ["[2163..3071]"].
"gate[11436]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit0/xor2", from=H3. Waiting to receive records ["[18432..19455]"].
"gate[11444]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit1/xor2", from=H3. Waiting to receive records ["[18432..19455]"].
"gate[11452]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit10/xor2", from=H3. Waiting to receive records ["[18432..19455]"].
"gate[11461]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit2/xor2", from=H3. Waiting to receive records ["[18432..19455]"].
"gate[11469]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit3/xor2", from=H3. Waiting to receive records ["[18432..19455]"].
"gate[11477]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit4/xor2", from=H3. Waiting to receive records ["[18432..19455]"].
"gate[11485]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit5/xor2", from=H3. Waiting to receive records ["[18432..19455]"].
"gate[11493]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit6/xor2", from=H3. Waiting to receive records ["[18432..19455]"].
"gate[11501]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit7/xor2", from=H3. Waiting to receive records ["[18432..19455]"].
"gate[11509]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit8/xor2", from=H3. Waiting to receive records ["[18432..19455]"].
"gate[11517]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit9/xor2", from=H3. Waiting to receive records ["[18432..19455]"].
}

Helper 2

2023-11-29T00:36:37.270875Z  WARN stall_detector{role=H2}: ipa_core::helpers::gateway::stall_detection::gateway: Helper is stalled sn=10658252 state=
{
"gate[11423]=binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case", from=H1. Waiting to receive records ["[2277..3071]"].
"gate[11781]=prime_field_validator/move_value_to_correct_breakdown/depth7/bit0", from=H1. Waiting to receive records ["[18432..19455]"].
}

Helper 3

2023-11-29T00:36:37.270916Z  WARN stall_detector{role=H3}: ipa_core::helpers::gateway::stall_detection::gateway: Helper is stalled sn=10635268 state=
{
"gate[11422]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case", from=H2. Waiting to receive records ["[2048..3071]"].
}
akoshelev commented 10 months ago

I can confirm that - it is reproducible on my laptop too

H1

2023-11-29T18:26:06.129952Z  WARN stall_detector{role=H1}: ipa_core::helpers::gateway::stall_detection::gateway: Helper is stalled sn=5455876 state=
{
"gate[11422]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case", from=H3. Waiting to receive records ["[1024..2047]"].
}

H2

2023-11-29T18:26:36.138578Z  WARN stall_detector{role=H2}: ipa_core::helpers::gateway::stall_detection::gateway: Helper is stalled sn=5467140 state=
{
"gate[11422]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case", from=H1. Waiting to receive records ["[1139..2047]"].
"gate[11435]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit0/xor1", from=H1. Waiting to receive records ["[9216..10239]"].
"gate[11443]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit1/xor1", from=H1. Waiting to receive records ["[9216..10239]"].
"gate[11451]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit10/xor1", from=H1. Waiting to receive records ["[9216..10239]"].
"gate[11460]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit2/xor1", from=H1. Waiting to receive records ["[9216..10239]"].
"gate[11468]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit3/xor1", from=H1. Waiting to receive records ["[9216..10239]"].
"gate[11476]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit4/xor1", from=H1. Waiting to receive records ["[9216..10239]"].
"gate[11484]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit5/xor1", from=H1. Waiting to receive records ["[9216..10239]"].
"gate[11492]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit6/xor1", from=H1. Waiting to receive records ["[9216..10239]"].
"gate[11500]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit7/xor1", from=H1. Waiting to receive records ["[9216..10239]"].
"gate[11508]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit8/xor1", from=H1. Waiting to receive records ["[9216..10239]"].
"gate[11516]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit9/xor1", from=H1. Waiting to receive records ["[9216..10239]"].
}

H3

2023-11-29T18:26:06.134721Z  WARN stall_detector{role=H3}: ipa_core::helpers::gateway::stall_detection::gateway: Helper is stalled sn=5476584 state=
{
"gate[11423]=binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case", from=H2. Waiting to receive records ["[1139..2047]"].
"gate[11436]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit0/xor2", from=H2. Waiting to receive records ["[9216..10239]"].
"gate[11444]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit1/xor2", from=H2. Waiting to receive records ["[9216..10239]"].
"gate[11452]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit10/xor2", from=H2. Waiting to receive records ["[9216..10239]"].
"gate[11461]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit2/xor2", from=H2. Waiting to receive records ["[9216..10239]"].
"gate[11469]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit3/xor2", from=H2. Waiting to receive records ["[9216..10239]"].
"gate[11477]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit4/xor2", from=H2. Waiting to receive records ["[9216..10239]"].
"gate[11485]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit5/xor2", from=H2. Waiting to receive records ["[9216..10239]"].
"gate[11493]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit6/xor2", from=H2. Waiting to receive records ["[9216..10239]"].
"gate[11501]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit7/xor2", from=H2. Waiting to receive records ["[9216..10239]"].
"gate[11509]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit8/xor2", from=H2. Waiting to receive records ["[9216..10239]"].
"gate[11517]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit9/xor2", from=H2. Waiting to receive records ["[9216..10239]"].
}
benjaminsavage commented 10 months ago

Any clues as to the root cause @akoshelev ?

akoshelev commented 10 months ago

I haven't started looking into it. Planning to investigate once I am done with multi-threading

akoshelev commented 10 months ago

I am still seeing some weird chunk sizes being sent between helpers.

**grep "binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case" /tmp/h2.log | grep "Sending next"
2023-12-05T19:30:34.186122Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T19:30:51.054940Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T19:31:06.968061Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T19:31:23.263925Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T19:31:39.858097Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T19:31:57.134318Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T19:32:13.442274Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T19:32:29.663746Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T19:32:44.689919Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T19:33:00.765225Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T19:33:17.144936Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T19:33:34.463893Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T19:33:51.497959Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T19:34:07.532171Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T19:34:24.251729Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T19:34:41.347484Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T19:34:56.487639Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T19:35:11.845600Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 519 bytes
2023-12-05T20:24:39.455490Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T20:24:55.148838Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T20:25:10.177275Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T20:25:27.272723Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T20:25:43.597640Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T20:26:01.167813Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T20:26:16.395990Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T20:26:31.531605Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T20:26:47.727183Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T20:27:05.346826Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T20:27:21.005066Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes
2023-12-05T20:27:38.361139Z  INFO send{to=H3 gate=gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case}: ipa_core::helpers::buffers::ordering_sender: Sending next 1024 bytes

it seems to be happening once per step but size varies from 519 to 300

2023-12-05T19:54:52.267215Z  INFO send{to=H1 gate=gate[12086]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit0/xor2}: ipa_core::helpers::buffers::ordering_sender: Sending next 4096 bytes
2023-12-05T19:54:53.041703Z  INFO send{to=H1 gate=gate[12086]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit0/xor2}: ipa_core::helpers::buffers::ordering_sender: Sending next 300 bytes
2023-12-05T20:24:39.494548Z  INFO send{to=H1 gate=gate[12086]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit0/xor2}: ipa_core::helpers::buffers::ordering_sender: Sending next 4096 bytes
2023-12-05T20:24:40.552093Z  INFO send{to=H1 gate=gate[12086]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit0/xor2}: ipa_core::helpers::buffers::ordering_sender: Sending next 4096 bytes

Another observation - it happens right before a large pause in execution for this step (19:54 ->20:24)

akoshelev commented 10 months ago

OrderingSender generates `Sending next" message when the stream is ready

        if let Poll::Ready(v) = b.take(cx) {
            self.waiting.wake(self.next.load(Acquire));
            tracing::info!("Sending next {} bytes", v.len());
            Poll::Ready(Some(v))
        } else if b.closed {
         ...
       }

b.take() returns Poll::Ready iff there are bytes written and either buffer - spare < written or stream is closed.

        if self.written > 0 && (self.written + self.spare.get() >= self.buf.len() || self.closed) {
            let v = self.buf[..self.written].to_vec();
            self.written = 0;

            Self::wake(&mut self.write_ready);
            Poll::Ready(v)
        } else {
            Self::save_waker(&mut self.stream_ready, cx);
            Poll::Pending
        }

Given that spare is constant (64 bytes) and each message is 1 or 4 bytes long (making buffer capacity either 1024 or 4096), the only plausible explanation that I have is that stream is considered closed.

akoshelev commented 10 months ago

Scratch the previous correspondence, these were two separate runs

akoshelev commented 10 months ago

Current theory: something is broken inside UnorderedReceiver.

Here is some evidence for that.

H2 is stalled

"gate[12073]=binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case", from=H1. Waiting to receive records ["[229..1023]"].

Protocol is moving as expected, submitting first 1024 multiplications (sends complete, receives blocked waiting for data from H1

2023-12-06T17:12:59.854353Z TRACE oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10}:send{i=1022 total=1796 to=H3 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::gateway::send: new
2023-12-06T17:12:59.854360Z TRACE oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10}:send{i=1022 total=1796 to=H3 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::gateway::send: close time.busy=3.30µs time.idle=3.52µs
2023-12-06T17:12:59.854364Z TRACE oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10}:receive{i=1022 from=H1 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::gateway::receive: new
2023-12-06T17:12:59.854375Z TRACE oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10}:send{i=1023 total=1796 to=H3 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::gateway::send: new
2023-12-06T17:12:59.854382Z TRACE oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10}:send{i=1023 total=1796 to=H3 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::gateway::send: close time.busy=3.81µs time.idle=3.52µs
2023-12-06T17:12:59.854386Z TRACE oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10}:receive{i=1023 from=H1 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::gateway::receive: new

Later (pay attention to timestamps), a batch of records arrived from H1

2023-12-06T17:12:59.917502Z TRACE oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10}:receive{i=0 from=H1 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::buffers::unordered_receiver: Received next 1024 bytes
2023-12-06T17:12:59.917507Z TRACE oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10}:receive{i=0 from=H1 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::gateway::receive: close time.busy=39.7µs time.idle=87.1ms
2023-12-06T17:12:59.917643Z TRACE oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10}:receive{i=1 from=H1 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::gateway::receive: close time.busy=1.30µs time.idle=87.2ms

However, only first 228 receive requests were resolved

2023-12-06T17:13:00.343228Z TRACE oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10}:receive{i=223 from=H1 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::gateway::receive: close time.busy=762ns time.idle=508ms
2023-12-06T17:13:00.343555Z TRACE oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10}:receive{i=224 from=H1 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::gateway::receive: close time.busy=848ns time.idle=508ms
2023-12-06T17:13:00.343882Z TRACE oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10}:receive{i=225 from=H1 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::gateway::receive: close time.busy=706ns time.idle=508ms
2023-12-06T17:13:00.344221Z TRACE oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10}:receive{i=226 from=H1 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::gateway::receive: close time.busy=750ns time.idle=508ms
2023-12-06T17:13:00.344570Z TRACE oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10}:receive{i=227 from=H1 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::gateway::receive: close time.busy=799ns time.idle=509ms
<nothing after>

Given that we have 1024 receive requests in flight, the issue seems to be UnorderedReceiver not waking up next receiver request in-flight and the whole pipeline is stalled

akoshelev commented 10 months ago

Not sure if it is relevant, but record 227 was the last one for some PRF

2023-12-06T17:13:00.344589Z TRACE oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10}:receive{i=227 from=H1 gate="binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case"}: ipa_core::helpers::gateway::receive: new
2023-12-06T17:13:00.344593Z TRACE oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10}:receive{i=227 from=H1 gate="binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case"}: ipa_core::helpers::gateway::receive: close time.busy=276ns time.idle=3.19µs
2023-12-06T17:13:00.344596Z  INFO oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10}: ipa_core::protocol::ipa_prf::prf_sharding: close time.busy=5.99ms time.idle=6.08s

so it means that we couldn't cross the boundary between two user-specific attribution circuits

akoshelev commented 10 months ago

It does seem that the waker is missing.

at $t_0$ request to receive 114 is executed

2023-12-06T19:34:38.407207Z TRACE oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10 prf=4511667418102543736}:receive{i=114 from=H2 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::gateway::receive: new

at $t_1 \gt t_0$ request for record 113 is resolved but it can't find the waker to wake up.

2023-12-06T19:34:38.490185Z TRACE oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10 prf=14044988332725311749}:receive{i=113 from=H2 gate="binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case"}: ipa_core::helpers::buffers::unordered_receiver: No waker for 114

this is the code that I used inside wake_next

        if let Some(w) = self.wakers[index].take() {
            w.wake();
        } else {
            tracing::trace!("No waker for {}", self.next)
        }

I don't anticipate this waker to be in overflow wakers because of the record number. 114 is well below 1024 (capacity), so this waker should've landed to the wakers vector.

martinthomson commented 10 months ago

That seems to violate everything I know about Rust and its ownership rules. We only modify self.wakers while holding a mutable reference to self and wake_next() is the only place that removes values.

akoshelev commented 10 months ago

yea I added more logs and it seems waker management is fine

H1 is stalled waiting for records 228...1023 from H3

binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case", from=H3. Waiting to receive records ["[228..1023]"].

it submitted receive requests for all records

2023-12-07T18:39:35.174132Z TRACE ThreadId(15) oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10 prf=5291388962843137553}:receive{i=1 from=H3 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::gateway::receive: new
2023-12-07T18:39:35.174135Z TRACE ThreadId(15) oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10 prf=5291388962843137553}:receive{i=1 from=H3 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::buffers::unordered_receiver: Receiver::poll i=1
2023-12-07T18:39:35.174137Z TRACE ThreadId(15) oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10 prf=5291388962843137553}:receive{i=1 from=H3 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::buffers::unordered_receiver: Save waker for i = 1: Waker { data: 0x7f25a8205700, vtable: 0x562aff2bc2f8 }
...
2023-12-07T18:39:35.243608Z TRACE ThreadId(27) oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10 prf=2446537651109142584}:receive{i=1023 from=H3 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::buffers::unordered_receiver: Receiver::poll i=1023
2023-12-07T18:39:35.243610Z TRACE ThreadId(27) oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10 prf=2446537651109142584}:receive{i=1023 from=H3 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::buffers::unordered_receiver: Save waker for i = 1023: Waker { data: 0x7f25a8205700, vtable: 0x562aff2bc2f8 }

it received the whole batch of 1024 records from H3 and started waking up the executor

2023-12-07T18:39:35.288687Z TRACE ThreadId(29) oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10 prf=8042757424229509893}:receive{i=0 from=H3 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::buffers::unordered_receiver: Received next 1024 bytes
2023-12-07T18:39:35.288690Z TRACE ThreadId(29) oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10 prf=8042757424229509893}:receive{i=0 from=H3 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::buffers::unordered_receiver: Waking up next in line: 1
2023-12-07T18:39:35.288699Z TRACE ThreadId(29) oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10 prf=8042757424229509893}:receive{i=0 from=H3 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::buffers::unordered_receiver: Wake waker for i = 1: Waker { data: 0x7f25a8205700, vtable: 0x562aff2bc2f8 }

completing receive request for record 127 caused UR to wake the waker. Note that waker is the same for all records because the giant future is polled by a single task only

2023-12-07T18:39:35.684330Z TRACE ThreadId(26) oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10 prf=14909736342162320554}:receive{i=227 from=H3 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::buffers::unordered_receiver: Receiver::poll i=227
2023-12-07T18:39:35.684332Z TRACE ThreadId(26) oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10 prf=14909736342162320554}:receive{i=227 from=H3 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::buffers::unordered_receiver: Waking up next in line: 228
2023-12-07T18:39:35.684334Z TRACE ThreadId(26) oprf_ipa_query{sz=100000}:attribute_cap_aggregate:per_user_attribution{rows=10 prf=14909736342162320554}:receive{i=227 from=H3 gate="binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case"}: ipa_core::helpers::buffers::unordered_receiver: Wake waker for i = 228: Waker { data: 0x7f25a8205700, vtable: 0x562aff2bc2f8 }

There is no Receiver::poll i=228 after the last line.

It does seem that waker was called correctly, but because it is not unique for this specific receive request, it could've caused a wake that got stuck before we had a chance to get to this specific receive request. I don't understand the whole mechanic for it yet though...

akoshelev commented 10 months ago

I think I figured what is going on. This is quite convoluted issue and tl;ldr is: flattening the stream causes all sorts of unpredictable behavior when multiple layers of seq_join is used. Random decisions made by the task scheduler make this issue revealing itself only on some occasions.

so for concreteness, here is an example of helpers stuck on OPRF IPA

H1

binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case", from=H3. Waiting to receive records ["[0..1023]"].

H2

2023-12-12T15:52:24.454804Z  WARN ThreadId(15) stall_detector{role=H2}: ipa_core::helpers::gateway::stall_detection::gateway: Helper is stalled sn=103369316 state=
{
"gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case", from=H1. Waiting to receive records ["[115..1023]"].
"gate[12085]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit0/xor1", from=H1. Waiting to receive records ["[0..1023]"].
"gate[12093]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit1/xor1", from=H1. Waiting to receive records ["[0..1023]"].
"gate[12101]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit10/xor1", from=H1. Waiting to receive records ["[0..1023]"].
"gate[12110]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit2/xor1", from=H1. Waiting to receive records ["[0..1023]"].
"gate[12118]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit3/xor1", from=H1. Waiting to receive records ["[0..1023]"].
"gate[12126]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit4/xor1", from=H1. Waiting to receive records ["[0..1023]"].
"gate[12134]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit5/xor1", from=H1. Waiting to receive records ["[0..1023]"].
"gate[12142]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit6/xor1", from=H1. Waiting to receive records ["[0..1023]"].
"gate[12150]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit7/xor1", from=H1. Waiting to receive records ["[0..1023]"].
"gate[12158]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit8/xor1", from=H1. Waiting to receive records ["[0..1023]"].
"gate[12166]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit9/xor1", from=H1. Waiting to receive records ["[0..1023]"].
}

H3

2023-12-12T15:52:24.441428Z  WARN ThreadId(16) stall_detector{role=H3}: ipa_core::helpers::gateway::stall_detection::gateway: Helper is stalled sn=103378760 state=
{
"gate[12073]=binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case", from=H2. Waiting to receive records ["[115..1023]"].
"gate[12086]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit0/xor2", from=H2. Waiting to receive records ["[0..1023]"].
"gate[12094]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit1/xor2", from=H2. Waiting to receive records ["[0..1023]"].
"gate[12102]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit10/xor2", from=H2. Waiting to receive records ["[0..1023]"].
"gate[12111]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit2/xor2", from=H2. Waiting to receive records ["[0..1023]"].
"gate[12119]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit3/xor2", from=H2. Waiting to receive records ["[0..1023]"].
"gate[12127]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit4/xor2", from=H2. Waiting to receive records ["[0..1023]"].
"gate[12135]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit5/xor2", from=H2. Waiting to receive records ["[0..1023]"].
"gate[12143]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit6/xor2", from=H2. Waiting to receive records ["[0..1023]"].
"gate[12151]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit7/xor2", from=H2. Waiting to receive records ["[0..1023]"].
"gate[12159]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit8/xor2", from=H2. Waiting to receive records ["[0..1023]"].
"gate[12167]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit9/xor2", from=H2. Waiting to receive records ["[0..1023]"].
}

New attribution circuit looks like as follows.


                                      ┌─────────────────────────────────────────────┐
                                      │                                             │
                                      │                                             │
                                      │                                             │
              User circuit            │      sequential join (1024 active items)    │
                                      │                                             │
                                      │                                             │
                                      │                                             │
                                      └──────────────────────┬──────────────────────┘
                                                             │
                                                             │
                                                             │
                                                             │
                                                             │
                                                             │
                                                             ▼
                                      ┌────┬────┬────┬────┬────┬────┬────┬────┬─────┐
                                      │    │    │    │    │    │    │    │    │     │
              Capped values           │    │    │    │    │    │    │    │    │     │
                                      └────┴────┴────┴────┴────┴────┴────┴────┴─────┘

                                                          ┌────┐
                                            capped value  │    │
                                                          │    │
                                                          └────┘

                                                          ┌────┐
                                            capped value  │    │
                                                          │    │
                                                          └────┘

                                                          ┌────┐
                                            capped value  │    │
                                                          │    │
                                                          └────┘

                                                          ┌────┐
                                                          │    │
                                            capped value  │    │
                                                          └────┘

                                     ┌───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
                                     │                                                                                                                                   │
                                     │                                                                                                                                   │
            modulus conversion       │                                            sequential join (1024 active items)                                                    │
                                     │                                                                                                                                   │
                                     │                                                                                                                                   │
                                     └───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

                                     ┌───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
                                     │                                                                                                                                   │
                                     │                                                                                                                                   │
             aggregation             │                                             sequential join (1024 active items)                                                   │
                                     │                                                                                                                                   │
                                     │                                                                                                                                   │
                                     └───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

The issue is that 1024 active user circuits can produce more than 1024 items for modulus conversion. In fact, our test circuit starts processing the largest user circuits (10 rows) first, so around ~100 finished circuits can fill up the buffer for modulus conversion.

Here is how the failure case looks like.

On helper 1

  1. Tokio polls the resulting stream (aggregation), which polls modulus conversion stream which, in turn, polls per user attribution circuit. First 1024 attribution circuits starts executing.
  2. All helpers reach the step computed_capped_attributed_trigger_value_just_saturated_case. This means that all communications required for computed_capped_attributed_trigger_value_not_saturated_case has finished.
  3. First 120 per-user attribution circuit futures resolve and produce 120 vectors with ~9 items each, totalling ~1024 capped values to be converted to prime field. They generate a wake notification to the executor.
  4. Tokio executor makes a random decision to poll modulus conversion circuit, rather than per-user attribution stream. This moves 1024 capped values and modulus conversion produces 1024 multiplications and 1024 receive requests.
  5. At this point it can't move forward because modulus conversion stream does not poll per-user attribution stream anymore and will not let the remaining circuits to complete. Receive requests remain outstanding. This is what we're seeing on one of the helpers
"gate[12073]=binary_validator/row9/computed_capped_attributed_trigger_value_not_saturated_case", from=H2. Waiting to receive records ["[115..1023]"].
"gate[12086]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit0/xor2", from=H2. Waiting to receive records ["[0..1023]"].
...

On helper 2

At step 4, tokio executor makes a different decision and continues polling the sequential futures for per-user attribution circuit. This drives all receive requests for step computed_capped_attributed_trigger_value_just_saturated_case to complete and this helper moves to computed_capped_attributed_trigger_value_just_saturated_case. This is what is shown in the logs too:

Note the step is computed_capped_attributed_trigger_value_just_saturated_case and not computed_capped_attributed_trigger_value_not_saturated_case

"gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case", from=H1. Waiting to receive records ["[115..1023]"].
"gate[12085]=prime_field_validator/modulus_convert_breakdown_key_bits_and_trigger_values/convert_bit0/xor1", from=H1. Waiting to receive records ["[0..1023]"].

On helper 3, executor decided to keep polling the per-user attribution circuit and got stuck at the computed_capped_attributed_trigger_value_just_saturated_case step, while sending all the data required to the helper 2

2023-12-12T15:50:24.483444Z  WARN ThreadId(10) stall_detector{role=H1}: ipa_core::helpers::gateway::stall_detection::gateway: Helper is stalled sn=103458052 state=
{
"gate[12072]=binary_validator/row9/computed_capped_attributed_trigger_value_just_saturated_case", from=H3. Waiting to receive records ["[0..1023]"].
}

Now helpers are in a broken state. Two of them completed 114 per user attribution circuits and started doing modulus conversion. The third one completed 0 circuits because the other one got stuck at computed_capped_attributed_trigger_value_not_saturated_case and never sent it the data required to advance.

❯ grep "prf_sharding: close" h1.log | grep "per_user_attribution" | wc -l
       0

❯ grep "prf_sharding: close" h2.log | grep "per_user_attribution" | wc -l
     114

❯ grep "prf_sharding: close" h3.log | grep "per_user_attribution" | wc -l
     114

I believe the root cause of this issue is the misalignment of the flow control for two streams. Per-user attribution circuit with flow control enabled for 1024 user circuits can produce more than 1024 items for the downstream consumer.

benjaminsavage commented 10 months ago

Can we work around this by changing the protocol to not have one future generate multiple child futures?

I had a similar type of thing going on with quicksort, but I took a different approach. I used flat_map to map to a flattened list of inputs, which I mapped to async futures. We can probably do something similar.

https://github.com/private-attribution/ipa/pull/892

akoshelev commented 10 months ago

@benjaminsavage I haven't looked closely into quicksort implementation, but it seems it uses only one seq_join which is fine for later flattening the stream. When we have multiple combinations of them inside one protocol, we will be having this spiritual failures.

Currently my thinking is that we could fix it by removing backpressure from the user circuits (drive them in parallel). It won't be free, but it will fix the issue I think.

Long term I am less and less optimistic about concurrency control at infra layer. There are just too many things that can go wrong and that are invisible to the protocols which make those issues hard to reproduce and debug. I think we should invest in vectorization that will be applied at the circuit layer where you as an author can precisely control how much data you want to put on the wire. You could also control CPU parallelism - that will cost us something in terms of code, but benefits as I see them now outweight the cons.

Maybe @martinthomson and @andyleiserson who is working on vectorization can share their opinions whether vectorization could be the right fix for it

andyleiserson commented 10 months ago

I don't think that vectorization will fix this, at least not directly. It will change the math about when things get stuck, e.g. if the vectorization width is 10 and we scale all the buffers 10x, then the problematic case Alex describes will stay pretty much the same, but with a bunch of numbers multiplied by 10 (1200 users complete to fill a buffer that holds 10240 items).

The way that vectorization may help, indirectly, is that it reduces the number of futures associated with a given volume of work. This may enable tuning the amount of pending work differently. i.e. vectorize 4x; reduce the number of outstanding futures by half, which means double (4 / 2 = 2) the amount of work in flight.

I will think some more about strategies here.

andyleiserson commented 10 months ago

I pushed a branch that seems to help with this. However, I believe it only reduces the chance that things get scheduled in a way that deadlocks, not eliminates it, at least if you allow for pathological scheduling of futures.

Also, judging by at least three discussions of deadlocks in the google results for "rust buffered stream", we're not the only ones to have trouble like this.

There are a few changes I am contemplating that might be a more complete fix:

martinthomson commented 10 months ago

things get scheduled in a way that deadlocks, not eliminates it

You mean "reduces deadlocks, not eliminates them", I assume.

Modify things so that the seq_join after attribution can be eliminated

This might help, but all signs point to this not helping.

include the functionality in my BufferedStream

I agree with you that the underlying problem is such that you reduce, not eliminate, at this point.

consume an iterator over futures

I can pretty much guarantee that consuming an iterator of futures is not going to make this better, based on my reading of those postings. Maybe not at all.

Make the use of record IDs more regular

This falls under the "JuSt Be MoRe CaReFuL" strategy. Maybe we can do that here, but I'm skeptical.

Spawn portions of the protocol as independent tasks

This is starting to sound better and better as things develop. However, this whole async lifetime nonsense is a real drag. I don't want to move to Arc over &, but it is starting to get more and more appealing.


My understanding of the buffered stream problem is that it comes down to something relatively simple. Futures are only polled at the consumer, so that when you have an aggregation (like seq_join) you can create a situation where futures do not get polled when they need to.

Imagine a case where you have a collection of futures, where each future resolves from two (or more) underlying actions, each requiring independent driving. Let's say that we have modulus conversion, which has two multiply operations: xor1 and xor2. If you have 1k operations outstanding, you need to ensure that the xor1 operation is polled for the data to be sent or received. But it could be that the 1k is not woken because xor2 is not making progress. (More likely the roles of these two are inverted, but you get the idea.)

Most of the discussion talks about how to ensure that the delayed xor1 operations get polled. Spawning does seem to be the answer that most people reach for here. I don't know if we can guarantee that this will work for us, but I think that is what we should look to investigate.

martinthomson commented 10 months ago

consume an iterator over futures

I can pretty much guarantee that consuming an iterator of futures is not going to make this better, [...]

OK, maybe I got that part wrong. Still, I think that we'd end up with more calls to .try_collect() than is ideal.

andyleiserson commented 10 months ago

I think the problem and solution can be stated more simply than I was thinking the other day:

The problem occurs when processing of a completed work unit gets split up. Going back to the case Alex analyzed: one helper inevitably must be the first to receive 1,024 records of MPC data for the last gate in the attribution stage. At this point the seq_join for the aggregation stage begins pulling futures from its input stream. Because each item output from the attribution stage corresponds to multiple aggregation stage records, only ~120 attribution output records need to be consumed for the aggregation stage to reach a full active work unit of 1,024 futures. Once it does so, the aggregation seq_join will no longer pull additional futures from its input stream, which means that the futures that would have flushed data for the last of the 1,024 attribution records to other helpers can be left pending. (While the futures that flush the outbound data for the last of the 1,024 attribution records could have been scheduled previously, there is no reason they have to be.)

Viewed in light of that description, the fix is easier to justify: don't give any futures to the aggregation stage, until a full set of futures in the attribution stage has been driven to completion. Which is roughly what the fix was doing, but it can be made more precise by counting records in and out of the buffer. Each time a multiple of 1,024 records in is crossed, advance the number of records eligible to be output by 1,024.

I agree that changing the way seq_join accepts its input futures is irrelevant. On Saturday I was worried about a deadlock with the attribution and aggregation stages at arbitrary record IDs (e.g. aggregation at 9,000; attribution at 4,000 -- aggregation would need to run ahead to 40,000 to start pulling the data where attribution stopped). If that were the case, it might have been desirable to change things such that each aggregation stage future can independently couple with the attribution stage futures producing relevant data. But I don't think states like that are reachable, and I don't think that changing the coupling between the stages eliminates the fundamental problem of needing to tune the work unit carefully.

One other note -- maybe this is obvious, but it helped me in thinking about how the deadlocks happen -- the scheduler on helper A doesn't know when there is a future Y on helper B that is pending on data that will be sent by future X on helper A. This means that we're inherently reliant on future X getting scheduled for some other reason, in order to make progress. This would be a problem for our system even with a runtime like Javascript that always drives futures to completion.

akoshelev commented 10 months ago

I don't think that vectorization will fix this, at least not directly.

My thought was that if we give more control to the protocol code to control the width of execution. In extreme case, our channels can be configured with capacity 1, so each send actually triggers a network communication and vectorization techniques ensure we have enough data on the wire.

This could eliminate the delays and the need for seq_join.

Spawn portions of the protocol as independent tasks

This is starting to sound better and better as things develop. However, this whole async lifetime nonsense is a real drag. I don't want to move to Arc over &, but it is starting to get more and more appealing.

873 should help with it. If futures spawned by seq_join are polled by the executor directly, OPRF IPA seems to be working fine. Not sure how sustainable it is to rely on that though.

Viewed in light of that description, the fix is easier to justify: don't give any futures to the aggregation stage, until a full set of futures in the attribution stage has been driven to completion. Which is roughly what the fix was doing, but it can be made more precise by counting records in and out of the buffer. Each time a multiple of 1,024 records in is crossed, advance the number of records eligible to be output by 1,024.

@andyleiserson I wonder if the fix needs to be put explicitly on the protocol side or we need another combinator in addition to seq_join. In the latter case, I worry a bit about complexity of this.

One other note -- maybe this is obvious, but it helped me in thinking about how the deadlocks happen -- the scheduler on helper A doesn't know when there is a future Y on helper B that is pending on data that will be sent by future X on helper A. This means that we're inherently reliant on future X getting scheduled for some other reason, in order to make progress. This would be a problem for our system even with a runtime like Javascript that always drives futures to completion.

this is interesting - I had an impression that runtimes like Javascript/Java always immediately schedule futures for execution. In what circumstances you think having such runtimes can lead to a similar deadlock?

andyleiserson commented 10 months ago

My thought was that if we give more control to the protocol code to control the width of execution. In extreme case, our channels can be configured with capacity 1, so each send actually triggers a network communication and vectorization techniques ensure we have enough data on the wire.

You're right... when I wrote that comment, I was thinking this was an issue of the helpers taking divergent execution paths, which I think could still happen with vectorization. But it's really a work unit problem, and if we can always set the work unit to 1 with vectorization, it shouldn't be an issue. (However, vectorization will introduce a different kind of work unit problem, since you need to collect a certain number of records to be packed into a vector.)

https://github.com/private-attribution/ipa/pull/873 should help with it. If futures spawned by seq_join are polled by the executor directly, OPRF IPA seems to be working fine. Not sure how sustainable it is to rely on that though.

I think it is as a good a fix as what BufferedStream does (performance win aside) -- it has the same effect, that completion of attribution futures necessary to make forward progress isn't dependent on the aggregation stage polling them.

@andyleiserson I wonder if the fix needs to be put explicitly on the protocol side or we need another combinator in addition to seq_join. In the latter case, I worry a bit about complexity of this.

I think the BufferedStream functionality could be integrated with seq_join, if we thought that made things simpler for protocol authors.

this is interesting - I had an impression that runtimes like Javascript/Java always immediately schedule futures for execution. In what circumstances you think having such runtimes can lead to a similar deadlock?

Yeah, that's my understanding too. The point about having issues even with different runtime behavior doesn't make sense. Problems arise because we don't construct all our futures eagerly (which we intentionally stopped doing, because it was inefficient). No runtime will execute futures that don't exist.

The issue is really that it's hard to know when a particular future needs to be created in order to make forward progress. And even given a solution that is 100% correct in applying local knowledge to create necessary futures, there is still remote knowledge (other helper needs X piece of data) that isn't available.