MaterializeInc / materialize

The Cloud Operational Data Store: use SQL to transform, deliver, and act on fast-changing data.
https://materialize.com
Other
5.7k stars 465 forks source link

storage/sources: support demuxing one table into multiple persist shards #24843

Closed sploiselle closed 4 weeks ago

sploiselle commented 6 months ago

Feature request

Materialize currently:

This means that if a user does want to start ingesting new columns from a table, their only option is to drop the subsource and re-add it. This introduces downtime, which a user can avoid only by doing a blue-green deployment on the entire source.

However, this issue can be largely resolved by supporting 1:N mappings between upstream database tables and persist shards. Users could then:

CREATE SOURCE pg ... FOR TABLES (t);
-- Add an upstream column
ALTER SOURCE pg ADD TABLE t AS t_v2;
-- Migrate all views using t to use t_v2 instead.
ALTER SOURCE pg DROP TABLE t_v1; 

Conceptually

For each table, we can produce not a Vec<Cast>, but a Vec<Vec<Cast>>. Then, when producing the data from the ingestion, we produce a row for each element in the outer collection.

This is somewhat wasteful because the prefix of the casts are likely to be the same. If we find that this impedes performance, we can introduce a more exotic structure here that produces data for the common prefix, and then the distinct suffixes.

PG implementation

Mechanically, this requires something akin to this.

diff --git a/src/storage-types/src/sources/postgres.rs b/src/storage-types/src/sources/postgres.rs
index bc824652cb..1073e28894 100644
--- a/src/storage-types/src/sources/postgres.rs
+++ b/src/storage-types/src/sources/postgres.rs
@@ -37,7 +37,7 @@ pub struct PostgresSourceConnection<C: ConnectionAccess = InlinedConnection> {
     pub connection: C::Pg,
     /// The cast expressions to convert the incoming string encoded rows to
     /// their target types, keyed by their position in the source.
-    pub table_casts: BTreeMap<usize, Vec<MirScalarExpr>>,
+    pub table_casts: BTreeMap<usize, Vec<Vec<MirScalarExpr>>>,
     pub publication: String,
     pub publication_details: PostgresSourcePublicationDetails,
 }
diff --git a/src/storage/src/source/postgres.rs b/src/storage/src/source/postgres.rs
index 36d97e028b..a070f9acba 100644
--- a/src/storage/src/source/postgres.rs
+++ b/src/storage/src/source/postgres.rs
@@ -147,15 +147,22 @@ impl SourceRender for PostgresSourceConnection {

         // Collect the tables that we will be ingesting.
         let mut table_info = BTreeMap::new();
-        for (i, desc) in self.publication_details.tables.iter().enumerate() {
-            // Index zero maps to the main source
-            let output_index = i + 1;
+
+        // Index zero maps to the main source; this has to be incremented for
+        // each table cast, not each table.
+        let mut output_index = 1;
+        for desc in self.publication_details.tables.iter() {
             // The publication might contain more tables than the user has selected to ingest (via
             // a restricted FOR TABLES <..>). The tables that are to be ingested will be present in
             // the table_casts map and so we can filter the publication tables based on whether or
             // not we have casts for it.
-            if let Some(casts) = self.table_casts.get(&output_index) {
-                table_info.insert(desc.oid, (output_index, desc.clone(), casts.clone()));
+            if let Some(tables) = self.table_casts.get(&output_index) {
+                let mut table_details = vec![];
+                for casts in tables {
+                    table_details.push((output_index, desc.clone(), casts.clone()));
+                    output_index += 1;
+                }
+                table_info.insert(desc.oid, table_details);
             }
         }

diff --git a/src/storage/src/source/postgres/replication.rs b/src/storage/src/source/postgres/replication.rs
index 49a9959812..21338e8323 100644
--- a/src/storage/src/source/postgres/replication.rs
+++ b/src/storage/src/source/postgres/replication.rs
@@ -142,7 +142,7 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
     config: RawSourceCreationConfig,
     connection: PostgresSourceConnection,
     subsource_resume_uppers: BTreeMap<GlobalId, Antichain<MzOffset>>,
-    table_info: BTreeMap<u32, (usize, PostgresTableDesc, Vec<MirScalarExpr>)>,
+    table_info: BTreeMap<u32, Vec<(usize, PostgresTableDesc, Vec<MirScalarExpr>)>>,
     rewind_stream: &Stream<G, RewindRequest>,
     committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
     metrics: PgSourceMetrics,
@@ -345,19 +345,20 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
     // We now process the slot updates and apply the cast expressions
     let mut final_row = Row::default();
     let mut datum_vec = DatumVec::new();
-    let replication_updates = raw_collection.map(move |(oid, event)| {
-        let (output_index, _, casts) = &table_info[&oid];
-        let event = event.and_then(|row| {
-            let mut datums = datum_vec.borrow();
-            for col in row.iter() {
-                let datum = col.as_deref().map(super::decode_utf8_text).transpose()?;
-                datums.push(datum.unwrap_or(Datum::Null));
-            }
-            super::cast_row(casts, &datums, &mut final_row)?;
-            Ok(final_row.clone())
-        });
+    let replication_updates = raw_collection.flat_map(move |(oid, event)| {
+        table_info[&oid].map(|(output_index, _, casts)| {
+            let event = event.and_then(|row| {
+                let mut datums = datum_vec.borrow();
+                for col in row.iter() {
+                    let datum = col.as_deref().map(super::decode_utf8_text).transpose()?;
+                    datums.push(datum.unwrap_or(Datum::Null));
+                }
+                super::cast_row(casts, &datums, &mut final_row)?;
+                Ok(final_row.clone())
+            });

-        (*output_index, event.err_into())
+            (*output_index, event.err_into())
+        })
     });

     let errors = definite_errors.concat(&transient_errors.map(ReplicationError::from));
diff --git a/src/storage/src/source/postgres/snapshot.rs b/src/storage/src/source/postgres/snapshot.rs
index 6388a267da..b4231ece38 100644
--- a/src/storage/src/source/postgres/snapshot.rs
+++ b/src/storage/src/source/postgres/snapshot.rs
@@ -177,7 +177,7 @@ pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
     config: RawSourceCreationConfig,
     connection: PostgresSourceConnection,
     subsource_resume_uppers: BTreeMap<GlobalId, Antichain<MzOffset>>,
-    table_info: BTreeMap<u32, (usize, PostgresTableDesc, Vec<MirScalarExpr>)>,
+    table_info: BTreeMap<u32, Vec<(usize, PostgresTableDesc, Vec<MirScalarExpr>)>>,
     metrics: PgSnapshotMetrics,
 ) -> (
     Collection<G, (usize, Result<Row, SourceReaderError>), Diff>,
sploiselle commented 6 months ago

@rjobanp I tinkered with this a little bit and, unfortunately, not quite as easy I had thought. Wanted to ping you lest it feel like I suggest you go on an interminable yak shave.

In the PG source, we've essentially hard-coded that the output_index of messages lines up with their position in PostgresSourcePublicationDetails's tables field. This means there's no way to demux tables input into multiple shards. Assume this is similar for the MySQL source.

My current hunch is that this is most easily fixed by adding a kind of "key" to SourceExport for each type of source which explains which upstream object this GlobalId corresponds to.

We can then maybe remove the output_index field on SourceExport and rely on enumerating over source_exports to determine the output position.

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
enum SouceExportConfig {
    /// PG subsources use the table OID
    Pg(u32),
    /// Kafka only exports a single collection,
    Kafka,
    /// Kafka exports are correlated to the upstream table's name
    MySql(UnresolvedItemName),
}

#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)]
pub struct SourceExport<S = ()> {
    /// The collection metadata needed to write the exported data
    pub storage_metadata: S,
    pub config: SouceExportConfig,
}

Just a quick sketch––we might want to do this by making things a trait or something. Not suggesting this is what the final code will or should look like, just that this is a general direction that would get us out of the restriction we currently have.

rjobanp commented 6 months ago

Awesome thanks for the context!

In the PG source, we've essentially hard-coded that the output_index of messages lines up with their position in PostgresSourcePublicationDetails's tables field. This means there's no way to demux tables input into multiple shards. Assume this is similar for the MySQL source.

Yes right now I implemented this for MySQL very similar to Postgres: We assume that the order of tables within the MySqlSourceConnection tables field is the same as the output_index ordering for each sub-source.

This was definitely one of the most brittle things I noticed when working through the MySQL source so I like the idea of us removing this restriction and using the source-specific 'key' to map GlobalIds to their corresponding output (and therefore supporting 1:N table mappings).

It still seems that we would need to store the output_index on the SourceExports struct, right? Since we store a BTreeMap<GlobalId, SourceExport<G>> on both the RawSourceCreationConfig and IngestionDescription I'm not sure that we can rely on the iteration order to determine position

sploiselle commented 6 months ago

BTreeMap offers stable and ordered iteration (arguably the feature of a b-tree vs. hash map). Or maybe it's important to note that we have to rebuild the entire dataflow if the exports change. Either way. I have this working locally using enumerate while iterating over the b-tree map to determine output indexes.

The complex change, though, is swapping out output_index in SourceExport for something that lets us correlate source exports to an upstream table. Will see what everyone thinks later this week probably.

rjobanp commented 6 months ago

BTreeMap offers stable and ordered iteration

I thought this was ordered by-key though, right? Unless you're saying that GlobalIds are ordered too such that we can assume a new entry in the map would always sort to the end of the keys?

The complex change, though, is swapping out output_index in SourceExport

Yep I briefly looked up references to SourceExport and it looks fairly non-trivial to refactor all the uses :)

sploiselle commented 6 months ago

Yep it is ordered by key. The output index, though, just needs to be consistent between the ingestion dataflow and where we correlate output indexes to persist shards. That gets re-built every time we restart the dataflow, so supports things like ALTER SOURCE...ADD SUBSOURCE changing the source exports by just re-figuring it out. i.e. the output index is just a transient mechanism to support multi-output ingestions, which we have to orchestrate in two separate spots.

rjobanp commented 6 months ago

Ahh that makes sense, thanks for explaining!

sploiselle commented 3 months ago

@petrosagg suggests that this feature should not violate the contract that SHOW CREATE SOURCE shows you how to recreate an environment in exactly its current state. The only way to solve this is to allow creating subsources with projections from the input table (maybe they must be prefixes, maybe the allow reordering). https://github.com/MaterializeInc/materialize/pull/26871#pullrequestreview-2045572908 Assigning to him to ensure the scope of that work is to the spec he envisions.

rjobanp commented 4 weeks ago

Closing this issue in favor of tracking in https://github.com/MaterializeInc/materialize/issues/28430