MaterializeInc / materialize

The Cloud Operational Data Store: use SQL to transform, deliver, and act on fast-changing data.
https://materialize.com
Other
5.72k stars 466 forks source link

storage/sources/postgres: slot compacted past snapshot point #24396

Open def- opened 8 months ago

def- commented 8 months ago

What version of Materialize are you using?

43c65e6c8545

What is the issue?

Seen in https://buildkite.com/materialize/tests/builds/72946#018cfccc-477b-40da-8c1d-108336a3e402, but is unrelated to my PR.

thread 'timely:work-3' panicked at /var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-0526fd24678a523d2-1/materialize/tests/src/storage/src/source/postgres/replication.rs:214:25:
slot compacted past snapshot point. snapshot consistent point=1470915840 resume_lsn=1470915896
stack backtrace:
   0: rust_begin_unwind
             at ./rustc/79e9716c980570bfd1f666e3b16ac583f0168962/library/std/src/panicking.rs:597:5
   1: core::panicking::panic_fmt
             at ./rustc/79e9716c980570bfd1f666e3b16ac583f0168962/library/core/src/panicking.rs:72:14
   2: mz_storage::source::postgres::replication::render::<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::postgres::PostgresSourceConnection, async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>>::{closure#0}>::{closure#0}>>::{closure#1}::{closure#0}
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-0526fd24678a523d2-1/materialize/tests/src/storage/src/source/postgres/replication.rs:214:25
   3: <core::pin::Pin<alloc::boxed::Box<dyn core::future::future::Future<Output = core::result::Result<(), mz_storage::source::postgres::TransientError>>>> as core::future::future::Future>::poll
             at ./rustc/79e9716c980570bfd1f666e3b16ac583f0168962/library/core/src/future/future.rs:125:9
   4: <mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build_fallible::<mz_storage::source::postgres::TransientError, mz_storage::source::postgres::replication::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::postgres::PostgresSourceConnection, async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>>::{closure#0}>::{closure#0}>>::{closure#1}>::{closure#0}::{closure#0}
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-0526fd24678a523d2-1/materialize/tests/src/timely-util/src/builder_async.rs:643:55
   5: <core::pin::Pin<alloc::boxed::Box<<mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build_fallible<mz_storage::source::postgres::TransientError, mz_storage::source::postgres::replication::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::postgres::PostgresSourceConnection, async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>>::{closure#0}>::{closure#0}>>::{closure#1}>::{closure#0}::{closure#0}>> as core::future::future::Future>::poll
             at ./rustc/79e9716c980570bfd1f666e3b16ac583f0168962/library/core/src/future/future.rs:125:9
   6: <mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build::<<mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build_fallible<mz_storage::source::postgres::TransientError, mz_storage::source::postgres::replication::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::postgres::PostgresSourceConnection, async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>>::{closure#0}>::{closure#0}>>::{closure#1}>::{closure#0}, <mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build_fallible<mz_storage::source::postgres::TransientError, mz_storage::source::postgres::replication::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::postgres::PostgresSourceConnection, async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>>::{closure#0}>::{closure#0}>>::{closure#1}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-0526fd24678a523d2-1/materialize/tests/src/timely-util/src/builder_async.rs:561:32
   7: <timely::dataflow::operators::generic::builder_rc::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build_reschedule::<<mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build<<mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build_fallible<mz_storage::source::postgres::TransientError, mz_storage::source::postgres::replication::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::postgres::PostgresSourceConnection, async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>>::{closure#0}>::{closure#0}>>::{closure#1}>::{closure#0}, <mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build_fallible<mz_storage::source::postgres::TransientError, mz_storage::source::postgres::replication::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::postgres::PostgresSourceConnection, async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>>::{closure#0}>::{closure#0}>>::{closure#1}>::{closure#0}::{closure#0}>::{closure#0}, <mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build<<mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build_fallible<mz_storage::source::postgres::TransientError, mz_storage::source::postgres::replication::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::postgres::PostgresSourceConnection, async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>>::{closure#0}>::{closure#0}>>::{closure#1}>::{closure#0}, <mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build_fallible<mz_storage::source::postgres::TransientError, mz_storage::source::postgres::replication::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::postgres::PostgresSourceConnection, async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>>::{closure#0}>::{closure#0}>>::{closure#1}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}>::{closure#0}
             at ./cargo/git/checkouts/timely-dataflow-70b80d81d6cabd62/de20aa8/timely/src/dataflow/operators/generic/builder_rc.rs:172:26
   8: <timely::dataflow::operators::generic::builder_raw::OperatorCore<mz_storage_types::sources::MzOffset, <timely::dataflow::operators::generic::builder_rc::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build_reschedule<<mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build<<mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build_fallible<mz_storage::source::postgres::TransientError, mz_storage::source::postgres::replication::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::postgres::PostgresSourceConnection, async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>>::{closure#0}>::{closure#0}>>::{closure#1}>::{closure#0}, <mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build_fallible<mz_storage::source::postgres::TransientError, mz_storage::source::postgres::replication::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::postgres::PostgresSourceConnection, async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>>::{closure#0}>::{closure#0}>>::{closure#1}>::{closure#0}::{closure#0}>::{closure#0}, <mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build<<mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build_fallible<mz_storage::source::postgres::TransientError, mz_storage::source::postgres::replication::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::postgres::PostgresSourceConnection, async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>>::{closure#0}>::{closure#0}>>::{closure#1}>::{closure#0}, <mz_timely_util::builder_async::OperatorBuilder<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>>>::build_fallible<mz_storage::source::postgres::TransientError, mz_storage::source::postgres::replication::render<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, futures_util::stream::stream::Inspect<async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::source::source_reader_pipeline::source_render_operator<timely::dataflow::scopes::child::Child<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>, mz_storage_types::sources::MzOffset>, mz_storage_types::sources::postgres::PostgresSourceConnection, async_stream::async_stream::AsyncStream<timely::progress::frontier::Antichain<mz_storage_types::sources::MzOffset>, mz_storage::source::source_reader_pipeline::reclock_resume_upper<mz_storage_types::sources::MzOffset, mz_repr::timestamp::Timestamp>::{closure#0}>, mz_storage::render::sources::render_source<timely::dataflow::scopes::child::Child<timely::worker::Worker<timely_communication::allocator::generic::Generic>, ()>>::{closure#0}>::{closure#0}>>::{closure#1}>::{closure#0}::{closure#0}>::{closure#0}::{closure#0}>::{closure#0}> as timely::scheduling::Schedule>::schedule
             at ./cargo/git/checkouts/timely-dataflow-70b80d81d6cabd62/de20aa8/timely/src/dataflow/operators/generic/builder_raw.rs:204:9
   9: <timely::progress::subgraph::PerOperatorState<mz_storage_types::sources::MzOffset>>::schedule
             at ./cargo/git/checkouts/timely-dataflow-70b80d81d6cabd62/de20aa8/timely/src/progress/subgraph.rs:699:30
  10: <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset>>::activate_child
             at ./cargo/git/checkouts/timely-dataflow-70b80d81d6cabd62/de20aa8/timely/src/progress/subgraph.rs:346:26
  11: <timely::progress::subgraph::Subgraph<(), mz_storage_types::sources::MzOffset> as timely::scheduling::Schedule>::schedule
             at ./cargo/git/checkouts/timely-dataflow-70b80d81d6cabd62/de20aa8/timely/src/progress/subgraph.rs:312:17
  12: <timely::progress::subgraph::PerOperatorState<()>>::schedule
             at ./cargo/git/checkouts/timely-dataflow-70b80d81d6cabd62/de20aa8/timely/src/progress/subgraph.rs:699:30
  13: <timely::progress::subgraph::Subgraph<(), ()>>::activate_child
             at ./cargo/git/checkouts/timely-dataflow-70b80d81d6cabd62/de20aa8/timely/src/progress/subgraph.rs:346:26
  14: <timely::progress::subgraph::Subgraph<(), ()> as timely::scheduling::Schedule>::schedule
             at ./cargo/git/checkouts/timely-dataflow-70b80d81d6cabd62/de20aa8/timely/src/progress/subgraph.rs:312:17
  15: <timely::worker::Wrapper>::step::{closure#0}
             at ./cargo/git/checkouts/timely-dataflow-70b80d81d6cabd62/de20aa8/timely/src/worker.rs:754:57
  16: <core::option::Option<&mut alloc::boxed::Box<dyn timely::scheduling::Schedule>>>::map::<bool, <timely::worker::Wrapper>::step::{closure#0}>
             at ./rustc/79e9716c980570bfd1f666e3b16ac583f0168962/library/core/src/option.rs:1075:29
  17: <timely::worker::Wrapper>::step
             at ./cargo/git/checkouts/timely-dataflow-70b80d81d6cabd62/de20aa8/timely/src/worker.rs:754:48
  18: <timely::worker::Worker<timely_communication::allocator::generic::Generic>>::step_or_park
             at ./cargo/git/checkouts/timely-dataflow-70b80d81d6cabd62/de20aa8/timely/src/worker.rs:396:38
  19: <mz_storage::storage_state::Worker<timely_communication::allocator::generic::Generic>>::run_client
  20: <mz_storage::storage_state::Worker<timely_communication::allocator::generic::Generic>>::run
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-0526fd24678a523d2-1/materialize/tests/src/storage/src/storage_state.rs:468:21
  21: <mz_storage::server::Config as mz_cluster::types::AsRunnableWorker<mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse>>::build_and_run::<timely_communication::allocator::generic::Generic>
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-0526fd24678a523d2-1/materialize/tests/src/storage/src/server.rs:107:9
  22: <mz_cluster::server::ClusterClient<mz_service::client::Partitioned<mz_service::local::LocalClient<mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse, std::thread::Thread>, mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse>, mz_storage::server::Config, mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse>>::build_timely::{closure#0}::{closure#2}
             at ./var/lib/buildkite-agent/builds/buildkite-builders-d43b1b5-i-0526fd24678a523d2-1/materialize/tests/src/cluster/src/server.rs:235:13
  23: timely::execute::execute_from::<timely_communication::allocator::generic::GenericBuilder, (), <mz_cluster::server::ClusterClient<mz_service::client::Partitioned<mz_service::local::LocalClient<mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse, std::thread::Thread>, mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse>, mz_storage::server::Config, mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse>>::build_timely::{closure#0}::{closure#2}>::{closure#0}
             at ./cargo/git/checkouts/timely-dataflow-70b80d81d6cabd62/de20aa8/timely/src/execute.rs:388:22
  24: timely_communication::initialize::initialize_from::<timely_communication::allocator::generic::GenericBuilder, (), timely::execute::execute_from<timely_communication::allocator::generic::GenericBuilder, (), <mz_cluster::server::ClusterClient<mz_service::client::Partitioned<mz_service::local::LocalClient<mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse, std::thread::Thread>, mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse>, mz_storage::server::Config, mz_storage_client::client::StorageCommand, mz_storage_client::client::StorageResponse>>::build_timely::{closure#0}::{closure#2}>::{closure#0}>::{closure#0}
             at ./cargo/git/checkouts/timely-dataflow-70b80d81d6cabd62/de20aa8/communication/src/initialize.rs:316:33
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

I think this came from https://github.com/MaterializeInc/materialize/pull/24240 @petrosagg Can you please check?

ci-regexp: slot compacted past snapshot point

petrosagg commented 8 months ago

I see this strange log for the panicking source that I haven't seen before.

pg-cdc-materialized-1  | cluster-u1-replica-u1: 2024-01-12T08:39:20.267823Z  INFO mz_storage::healthcheck: Health transition for source u186: Some(Starting) -> Some(Stalled { error: "postgres: No such file or directory (os error 2)", hints: {}, namespaced_errors: {Postgres: "No such file or directory (os error 2)"} })
petrosagg commented 8 months ago

A few more findings. In the postgres logs we see the following three CREATE_REPLICATION_SLOT statements being executed:

pg-cdc-postgres-1  | 2024-01-12 08:39:20.252 UTC [3040] LOG:  logical decoding found consistent point at 0/57AC6490
pg-cdc-postgres-1  | 2024-01-12 08:39:20.252 UTC [3040] DETAIL:  There are no running transactions.
pg-cdc-postgres-1  | 2024-01-12 08:39:20.252 UTC [3040] STATEMENT:  CREATE_REPLICATION_SLOT materialize_ad4e3ba33924431bb4668267a8cfaa8d LOGICAL "pgoutput" NOEXPORT_SNAPSHOT
pg-cdc-postgres-1  | 2024-01-12 08:39:20.257 UTC [3040] LOG:  logical decoding found consistent point at 0/57AC64C8
pg-cdc-postgres-1  | 2024-01-12 08:39:20.257 UTC [3040] DETAIL:  There are no running transactions.
pg-cdc-postgres-1  | 2024-01-12 08:39:20.257 UTC [3040] STATEMENT:  CREATE_REPLICATION_SLOT "mzsnapshot_7ece96c5f3fb4627824f82609089811d" TEMPORARY LOGICAL "pgoutput" USE_SNAPSHOT
pg-cdc-postgres-1  | 2024-01-12 08:39:20.337 UTC [3054] LOG:  logical decoding found consistent point at 0/57AC6500
pg-cdc-postgres-1  | 2024-01-12 08:39:20.337 UTC [3054] DETAIL:  There are no running transactions.
pg-cdc-postgres-1  | 2024-01-12 08:39:20.337 UTC [3054] STATEMENT:  CREATE_REPLICATION_SLOT materialize_ad4e3ba33924431bb4668267a8cfaa8d LOGICAL "pgoutput" NOEXPORT_SNAPSHOT

The first creates the normal slot, the second one creates the slot that is used for snapshotting the tables, and the third one attempts to create the normal slot again. This is expected because there are two dataflow operators involved in pg sources, a snapshot operator and a replication operator. The snapshot operator first creates the normal slot, then the temporary one for snapshotting tables. The replication operator only attempts to create the normal slot.

The ordering is this way to ensure that the LSN at which we snapshot the tables (which is the consistent point of the temporary slot) is greater than the LSN of the normal slot, which is why the normal slot is always created first.

From the logs we see the following consistent points for each attempt:

  1. Normal slot creation: 0/57AC6490
  2. Temporary slot creation (the table snapshot point): 0/57AC64C8
  3. Normal slot creation (again): 0/57AC6500

These look correct because 0x57AC6490 < 0x57AC64C8.

Now if we look at the log of the panic (with LSNs converted to hex) it says the following:

slot compacted past snapshot point. snapshot consistent point=0/57AC6500 resume_lsn=0/57AC6538

What is strange here is that the snapshot consistent point is claimed to be the LSN we got in the third attempt of creating the normal slot. But according to the code that LSN should have been 0/57AC64C8, the LSN of the temporary slot. Furthermore the resume LSN should be the LSN of the normal slot, but the value 0/57AC6538 doesn't appear anywhere in the logs.

I don't see anything related to #24240 so far but something seems off about our assumption of postgres slots and how they work.

petrosagg commented 8 months ago

Ok so the postgres logs posted above are extra weird because we see that the second attempt to create the normal slots succeeds whereas when I tried the same sequence of actions on my local machine the second attempt gets a "slot already exists" error. Indeed if we scroll up in the postgres logs to see what happens with other sources of the test we see this exact thing happening:

pg-cdc-postgres-1  | 2024-01-12 08:36:08.112 UTC [148] LOG:  logical decoding found consistent point at 0/1527940
pg-cdc-postgres-1  | 2024-01-12 08:36:08.112 UTC [148] DETAIL:  There are no running transactions.
pg-cdc-postgres-1  | 2024-01-12 08:36:08.112 UTC [148] STATEMENT:  CREATE_REPLICATION_SLOT materialize_072a433738dc445ab1809fedae925eb1 LOGICAL "pgoutput" NOEXPORT_SNAPSHOT
pg-cdc-postgres-1  | 2024-01-12 08:36:08.113 UTC [148] LOG:  logical decoding found consistent point at 0/1527978
pg-cdc-postgres-1  | 2024-01-12 08:36:08.113 UTC [148] DETAIL:  There are no running transactions.
pg-cdc-postgres-1  | 2024-01-12 08:36:08.113 UTC [148] STATEMENT:  CREATE_REPLICATION_SLOT "mzsnapshot_0a3dc935b4a747dc845f3e775b3d82fd" TEMPORARY LOGICAL "pgoutput" USE_SNAPSHOT
pg-cdc-postgres-1  | 2024-01-12 08:36:08.117 UTC [152] ERROR:  replication slot "materialize_072a433738dc445ab1809fedae925eb1" already exists
pg-cdc-postgres-1  | 2024-01-12 08:36:08.117 UTC [152] STATEMENT:  CREATE_REPLICATION_SLOT materialize_072a433738dc445ab1809fedae925eb1 LOGICAL "pgoutput" NOEXPORT_SNAPSHOT

This indicates that postgres does not guarantee that in the presence of two concurrent CREATE_REPLICATION_SLOT queries one of them will get a "slot already exists" error. It looks like it's possible for BOTH to succeed and for the second one to overwrite the details of the first. I will try to reproduce that.

petrosagg commented 8 months ago

I have been running this hot loop code against the postgres container spun up by mzcompose both when that container is idle and while the full testcase is running concurrently:

#[tokio::main]
async fn main() {
    let mut config = Config::new();
    config
        .host("127.0.0.1")
        .port(32768)
        .user("postgres")
        .password("postgres")
        .replication_mode(ReplicationMode::Logical);

    let (client1, connection1) = config.connect(NoTls).await.unwrap();
    tokio::spawn(connection1);
    let (client2, connection2) = config.connect(NoTls).await.unwrap();
    tokio::spawn(connection2);

    client1.simple_query("SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots").await.unwrap();

    let name = "materialize_ad4e3ba33924431bb4668267a8cfaa8d";
    let create_q = format!("CREATE_REPLICATION_SLOT {name} LOGICAL \"pgoutput\" NOEXPORT_SNAPSHOT;");
    let drop_q = format!("SELECT pg_drop_replication_slot('{name}')");
    loop {
        // Then we try to create it concurrently from both clients
        let req1 = client1.simple_query(&create_q);
        let req2 = client2.simple_query(&create_q);
        match futures::future::join(req1, req2).await {
            (Ok(_), Ok(_)) => panic!("Both clients succeeded"),
            (Err(err1), Err(err2)) => {
                println!("client1 error: {err1}");
                println!("client2 error: {err2}");
                panic!("Both clients failed");
            }
            _ => {
                // Drop the slot to try again
                client1.simple_query(&drop_q).await.unwrap();
            }
        }
    }
}

This busy loops trying to create the same slot concurrently but I didn't the condition we see in the logs even once. I looked at the postgres source code and the section that creates replication slots is well guarded by an exclusive lock: https://github.com/postgres/postgres/blob/master/src/backend/replication/slot.c#L263-L289

We still have the unexplained error from postgres that MZ logs ("No such file or directory (os error 2)"}) which happens at 2024-01-12T08:39:20.267823Z. This timestamp is between the second and the third CREATE_REPLICATION_SLOT query so it's unclear what this means. Creating a replication slot does involve writing files and directories on disk so maybe that error is related to one of the slots failing to get created. There isn't any relevant log from the postgres container though.

We additionally know that the first and second queries were performed by the snapshot operator because they share the same PID (3040) and the third one comes from the replication operator whose PID is (3054). What is even more confusing is that none of the ensure_replication_slot calls could have produced the No such file.. error because in all cases we have evidence of code that is after those lines running. For the snapshot operator we know it made progress after creating the slots because it let the replication operator know about rewinds (because the panic comes from processing rewinds). For the replication operator we know it made progress after creating the slots because the panic is after that point.

I will call it a day for today, I can't think of another theory. I'm pretty convinced this is unrelated to any recent code changes and we're just observing some rare race/bug.

petrosagg commented 8 months ago

The weird error might be related to ssh tunnels which were used for this connection. The previous line from the one saying "File not found" is:

pg-cdc-materialized-1  | cluster-u1-replica-u1: 2024-01-12T08:39:20.267289Z  INFO mz_postgres_util::tunnel: connecting task_name=postgres_connect_replication address=postgres@postgres:5432/postgres

Still does not explain why pg created the slot twice but it might explain why there isn't any similar error log in the pg logs

petrosagg commented 8 months ago

Ok, I found what's going on. We have a test that creates a postgres source only to verify that subsource resolution works correctly (i.e it doesn't query the data at all), which means that the source is created and then almost immediately afterwards dropped. This is the test: https://github.com/MaterializeInc/materialize/blob/main/test/pg-cdc/subsource-resolution-duplicates.td#L52-L59

When the adapter drops a postgres source it also cleans up its replication slot, and it does so in a retry loop. This means that the slot cleanup task is racing with dataflow creation because clusterd does not yet know that the source has been dropped.

So what happened here is the following interleaving:

  1. Adapter created source u186
  2. Storage controller sent a CreateIngestion command to clusterd
  3. Adapter dropped source u186, which started the cleanup task
  4. clusterd rendered the ingestion dataflow for u186
  5. The snapshot operator created the normal slot and then the temporary snapshot slot
  6. The slot cleanup task in the adapter retried and succeed in removing the normal slot just created
  7. The replication operator created the normal slot again, getting a different LSN
  8. Slot invariant violated, leading to a panic.

I think what we can do is turn the panic into a definite error that permanently errors the source since this either happens as a result of this race or as a result of the user manually dropping the source from the upstream database.

Definitely not a release blocker though, phew! Removing the label

def- commented 6 months ago

Seen again in https://buildkite.com/materialize/tests/builds/77769#018e14d9-42e1-4e76-8131-317fb952200a