vitessio / vitess

Vitess is a database clustering system for horizontal scaling of MySQL.
http://vitess.io
Apache License 2.0
18.7k stars 2.1k forks source link

VStreamer: For larger compressed transaction payloads, stream the internal contents #17239

Open mattlord opened 6 days ago

mattlord commented 6 days ago

Description

For larger compressed transaction payloads (> ZstdInMemoryDecompressorMaxSize) we were already streaming the internal events as we decompressed the payload, but in the vstreamer we were still reading the entire contents into memory before sending them to the consumer (vplayer).

In this PR, we stream the internal contents all the way from the binlog consumer to the vstream consumer so that we do not need to hold the entire contents, which can be 10s or even 100s of GiBs, in memory all at once. As you can see in the test/demonstration below, we allocate and use DRASTICALLY less memory when processing the payloads: in this case approximately 14-18 times less.

Here's a manual test/demonstration on macOS (note that we end up with over 40 million rows in the customer table):

make build 

cd examples/local
alias vtctldclient='command vtctldclient --server=localhost:15999'

# Setup function to wait for the copy phase to complete
function wait_for_workflow_running() {
    local keyspace=customer
    local workflow=commerce2customer
    local wait_secs=900
    local result=""

    echo "Waiting for the ${workflow} workflow in the ${keyspace} keyspace to finish the copy phase..."

    for _ in $(seq 1 ${wait_secs}); do
        result=$(vtctldclient Workflow --keyspace="${keyspace}" show --workflow="${workflow}" 2>/dev/null | grep "Copy phase completed")
        if [[ ${result} != "" ]]; then
            break
        fi
        sleep 1
    done;

    if [[ ${result} == "" ]]; then
        echo "Timed out after ${wait_secs} seconds waiting for the ${workflow} workflow in the ${keyspace} keyspace to reach the running state"
    else
        echo "The ${workflow} workflow in the ${keyspace} keyspace is now running. $(sed -rn 's/.*"(Copy phase.*)".*/\1/p' <<< "${result}")."
    fi
}

./101_initial_cluster.sh; ./201_customer_tablets.sh

# Enable binlog transaction compression
for uid in $(vtctldclient GetTablets | awk '{print $1}' | cut -d- -f2 | bc); do
  command mysql -u root --socket "${VTDATAROOT}/vt_0000000${uid}/mysql.sock" -e "stop replica; set global  binlog_transaction_compression=ON; start replica;"
done

# Load data in the customer table
commerce_primary_uid=$(vtctldclient GetTablets --keyspace commerce --tablet-type primary --shard "0" | awk '{print $1}' | cut -d- -f2 | bc)
table_file="${VTDATAROOT}/vt_0000000${commerce_primary_uid}/data/vt_commerce/customer.ibd"

# Generate 5MiB of initial data
size=$((5*1024*1024))
while [[ $(stat -f "%z" "${table_file}") -lt ${size} ]]; do
    command mysql -u root --socket "${VTDATAROOT}/vt_0000000${commerce_primary_uid}/mysql.sock" vt_commerce -e "insert into customer (customer_id, email) values (${RANDOM}*${RANDOM}, '${RANDOM}_person@planetscale.com')" 2> /dev/null
done

say "Initial data load completed"

# Grow that to at least 2GiB
size=$((2*1024*1024*1024))
i=1
while [[ $(stat -f "%z" "${table_file}") -lt ${size} ]]; do
    command mysql -u root --socket "${VTDATAROOT}/vt_0000000${commerce_primary_uid}/mysql.sock" vt_commerce -e "insert into customer (email) select concat(${i}, email) from customer limit 5000000"
    let i=i+1
done

say "Full data load completed"

# Move the customer table from commerce to customer
vtctldclient MoveTables --workflow commerce2customer --target-keyspace customer create --source-keyspace commerce --tables "customer,corder" --tablet-types="primary"
wait_for_workflow_running

say "Workflow is running"

rm /tmp/vttablet-sample

sample $(cat ${VTDATAROOT}/vt_0000000${commerce_primary_uid}/vttablet.pid) 600 -file /tmp/vttablet-sample &

command mysql -u root --socket "${VTDATAROOT}/vt_0000000${commerce_primary_uid}/mysql.sock" vt_commerce -e "update customer set email = concat('update1_', email) limit 10000000"

say "Bulk update complete"

# Wait for the transaction payload event to be processed
while [[ $(curl -s "http://localhost:15${commerce_primary_uid}/debug/vars" | jq -r '.CompressedTransactionPayloadsViaStream') -ne 1 ]]; do
    sleep 1
done
# Wait for the workflow to catch up
sleep 5
while [[ $(vtctldclient MoveTables --target-keyspace=customer --workflow=commerce2customer show --compact --include-logs=false | jq -r '.workflows[0].max_v_replication_transaction_lag') -gt 1 ]]; do
    sleep 1
done
say "Bulk update has been successfully replicated"

kill -INT %1
grep "Physical footprint" /tmp/vttablet-sample
rm /tmp/vttablet-sample

say "Update test complete"

sample $(cat ${VTDATAROOT}/vt_0000000${commerce_primary_uid}/vttablet.pid) 1200 -file /tmp/vttablet-sample &

command mysql -u root --socket "${VTDATAROOT}/vt_0000000${commerce_primary_uid}/mysql.sock" vt_commerce -e "update customer set email = concat('update2_', email) limit 20000000"

say "Bulk update two complete"

# Wait for the transaction payload event to be processed
while [[ $(curl -s "http://localhost:15${commerce_primary_uid}/debug/vars" | jq -r '.CompressedTransactionPayloadsViaStream') -ne 2 ]]; do
    sleep 1
done
# Wait for the workflow to catch up
sleep 5
while [[ $(vtctldclient MoveTables --target-keyspace=customer --workflow=commerce2customer show --compact --include-logs=false | jq -r '.workflows[0].max_v_replication_transaction_lag')  -gt 1 ]]; do
    sleep 1
done
say "Bulk update has been successfully replicated"

kill -INT %1
grep "Physical footprint" /tmp/vttablet-sample
rm /tmp/vttablet-sample

say "Update test two complete"

# Do a vdiff to be sure everything was replicated correctly
vtctldclient vdiff --target-keyspace customer --workflow commerce2customer create --tablet-types=primary --wait

Results on the PR branch:

update1:
Physical footprint:         310.5M
Physical footprint (peak):  310.5M

update2:
Physical footprint:         586.5M
Physical footprint (peak):  747.3M

Results on the main branch:

update1:
Physical footprint:         5.1G
Physical footprint (peak):  5.7G

update2:
Physical footprint:         10.3G
Physical footprint (peak):  10.9G

Related Issue(s)

Checklist

vitess-bot[bot] commented 6 days ago

Review Checklist

Hello reviewers! :wave: Please follow this checklist when reviewing this Pull Request.

General

Tests

Documentation

New flags

If a workflow is added or modified:

Backward compatibility

codecov[bot] commented 6 days ago

Codecov Report

Attention: Patch coverage is 16.66667% with 20 lines in your changes missing coverage. Please review.

Project coverage is 67.39%. Comparing base (216fd70) to head (0bc59bc).

Files with missing lines Patch % Lines
go/vt/vttablet/tabletserver/vstreamer/vstreamer.go 9.09% 20 Missing :warning:
Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #17239 +/- ## ======================================= Coverage 67.39% 67.39% ======================================= Files 1570 1570 Lines 252917 252937 +20 ======================================= + Hits 170446 170460 +14 - Misses 82471 82477 +6 ```

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.