MaterializeInc / materialize

The Cloud Operational Data Store: use SQL to transform, deliver, and act on fast-changing data.
https://materialize.com
Other
5.72k stars 466 forks source link

parallel-workload: MySQL source stuck in starting status #28108

Open def- opened 2 months ago

def- commented 2 months ago

What version of Materialize are you using?

463981199d1edea80820914543fadf73df727903

What is the issue?

I have an example locally of parallel-workload getting stuck (bin/mzcompose --find parallel-workload down && bin/mzcompose --find parallel-workload run default), where a JOIN hangs indefinitely, but each of the individual objects is selectable:

materialize=> SELECT * FROM "db-pw-1720518117-0"."s-0".mytable10 JOIN "db-pw-1720518117-0"."s-0"."v-16" ON "db-pw-1720518117-0"."s-0".mytable10.key0 = "db-pw-1720518117-0"."s-0"."v-16"."key3-kafka_table8" LIMIT 1;
^CCancel request sent
ERROR:  canceling statement due to user request
materialize=> explain timestamp for SELECT * FROM "db-pw-1720518117-0"."s-0".mytable10 JOIN "db-pw-1720518117-0"."s-0"."v-16" ON "db-pw-1720518117-0"."s-0".mytable10.key0 = "db-pw-1720518117-0"."s-0"."v-16"."key3-kafka_table8" LIMIT 1;
                                 Timestamp
---------------------------------------------------------------------------
                 query timestamp: 1720519547000 (2024-07-09 10:05:47.000) +
 largest not in advance of upper: 1720518453999 (2024-07-09 09:47:33.999) +
                           upper:[1720518454000 (2024-07-09 09:47:34.000)]+
                           since:[1720519547000 (2024-07-09 10:05:47.000)]+
         can respond immediately: false                                   +
                        timeline: Some(EpochMilliseconds)                 +
               session wall time: 1720519550076 (2024-07-09 10:05:50.076) +
                                                                          +
 source db-pw-1720518117-0.s-0.v-16 (u115, storage):                      +
                   read frontier:[1720519547000 (2024-07-09 10:05:47.000)]+
                  write frontier:[1720519548441 (2024-07-09 10:05:48.441)]+
                                                                          +
 source db-pw-1720518117-0.s-0.mytable10 (u193, storage):                 +
                   read frontier:[1720518453000 (2024-07-09 09:47:33.000)]+
                  write frontier:[1720518454000 (2024-07-09 09:47:34.000)]+

(1 row)
materialize=> explain SELECT * FROM "db-pw-1720518117-0"."s-0".mytable10 JOIN "db-pw-1720518117-0"."s-0"."v-16" ON "db-pw-1720518117-0"."s-0".mytable10.key0 = "db-pw-1720518117-0"."s-0"."v-16"."key3-kafka_table8" LIMIT 1;
                     Optimized Plan
--------------------------------------------------------
 Explained Query:                                      +
   Finish limit=1 output=[#0..=#34]                    +
     Project (#0..=#31, #0, #33, #34)                  +
       Join on=(#0 = #32) type=differential            +
         ArrangeBy keys=[[#0]]                         +
           ReadStorage db-pw-1720518117-0.s-0.mytable10+
         ArrangeBy keys=[[#7]]                         +
           ReadStorage db-pw-1720518117-0.s-0.v-16     +
                                                       +
 Source db-pw-1720518117-0.s-0.v-16                    +
 Source db-pw-1720518117-0.s-0.mytable10               +
                                                       +
 Target cluster: mz_system                             +

(1 row)
materialize=> SELECT * FROM "db-pw-1720518117-0"."s-0".mytable10;
 key0 | key1 | key2 | key3 | key4 | key5 | key6 | key7 | key8 | key9 | value0 | value1 | value2 | value3 | value4 | value5 | value6 | value7 | value8 | value9 | value10 | value11 | value12 | value13 | value14
------+------+------+------+------+------+------+------+------+------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+---------+---------+---------+---------+---------
(0 rows)
materialize=> explain SELECT * FROM "db-pw-1720518117-0"."s-0".mytable10;
                 Optimized Plan
------------------------------------------------
 Explained Query:                              +
   ReadStorage db-pw-1720518117-0.s-0.mytable10+
                                               +
 Source db-pw-1720518117-0.s-0.mytable10       +
                                               +
 Target cluster: mz_system                     +

(1 row)
materialize=> explain timestamp for SELECT * FROM "db-pw-1720518117-0"."s-0".mytable10;
                                 Timestamp
---------------------------------------------------------------------------
                 query timestamp: 1720518453999 (2024-07-09 09:47:33.999) +
 largest not in advance of upper: 1720518453999 (2024-07-09 09:47:33.999) +
                           upper:[1720518454000 (2024-07-09 09:47:34.000)]+
                           since:[1720518453000 (2024-07-09 09:47:33.000)]+
         can respond immediately: true                                    +
                        timeline: Some(EpochMilliseconds)                 +
               session wall time: 1720519383421 (2024-07-09 10:03:03.421) +
                                                                          +
 source db-pw-1720518117-0.s-0.mytable10 (u193, storage):                 +
                   read frontier:[1720518453000 (2024-07-09 09:47:33.000)]+
                  write frontier:[1720518454000 (2024-07-09 09:47:34.000)]+

(1 row)
materialize=> select * from "db-pw-1720518117-0"."s-0"."v-16";
 c-5-boolean-t-2 | key2-kafka_table8 |             c-1-uuid-t-2             | c-3-date-t-2 | key1-kafka_table8 |                                c-2-int[]-t-2                                 | key4-kafka_table8 | key3-kafka_table8 | key0-kafka_table8 | key5-kafka_table8
-----------------+-------------------+--------------------------------------+--------------+-------------------+------------------------------------------------------------------------------+-------------------+-------------------+-------------------+-------------------
[...]
(95 rows)
materialize=> explain timestamp for select * from "db-pw-1720518117-0"."s-0"."v-16";
                                 Timestamp
---------------------------------------------------------------------------
                 query timestamp: 1720519466462 (2024-07-09 10:04:26.462) +
 largest not in advance of upper: 1720519466462 (2024-07-09 10:04:26.462) +
                           upper:[1720519466463 (2024-07-09 10:04:26.463)]+
                           since:[1720519464516 (2024-07-09 10:04:24.516)]+
         can respond immediately: true                                    +
                        timeline: Some(EpochMilliseconds)                 +
               session wall time: 1720519467734 (2024-07-09 10:04:27.734) +
                                                                          +
 source db-pw-1720518117-0.s-0.v-16 (u115, storage):                      +
                   read frontier:[1720519465000 (2024-07-09 10:04:25.000)]+
                  write frontier:[1720519466463 (2024-07-09 10:04:26.463)]+

(1 row)
materialize=> explain select * from "db-pw-1720518117-0"."s-0"."v-16";
              Optimized Plan
-------------------------------------------
 Explained Query:                         +
   ReadStorage db-pw-1720518117-0.s-0.v-16+
                                          +
 Source db-pw-1720518117-0.s-0.v-16       +
                                          +
 Target cluster: mz_system                +

(1 row)

All queries: parallel-workload-queries.log The definitions of mytable10 and v-16:

[mysql] CREATE TABLE mytable10 (key0 int, key1 int, key2 float4, key3 int, key4 bigint, key5 smallint, key6 bigint, key7 smallint, key8 int, key9 float8, value0 float4, value1 smallint, value2 float4, value3 bigint, value4 timestamp, value5 time, value6 date, value7 time, value8 text, value9 float8, value10 time, value11 bigint, value12 timestamp, value13 text, value14 date , PRIMARY KEY (key0, key1, key2, key3, key4, key5, key6, key7, key8, key9));
[worker_8] CREATE SOURCE "db-pw-1720518117-0"."s-0".mysql_source10
                    IN CLUSTER "cluster-1"
                    FROM MYSQL CONNECTION mysql10
                    FOR TABLES (mysql.mytable10 AS mytable10)
[worker_2] CREATE SOURCE "db-pw-1720518117-0"."s-0".kafka_table8
                    IN CLUSTER "cluster-0"
                    FROM KAFKA CONNECTION materialize.public.kafka_conn (TOPIC 'data-ingest-8')
                    FORMAT AVRO
                    USING CONFLUENT SCHEMA REGISTRY CONNECTION materialize.public.csr_conn
                    ENVELOPE UPSERT
[worker_8] CREATE MATERIALIZED VIEW "db-pw-1720518117-0"."s-0"."v-16" WITH (REFRESH ON COMMIT, ASSERT NOT NULL "c-3-date-t-2", ASSERT NOT NULL "key0-kafka_table8", ASSERT NOT NULL "key5-kafka_table8", ASSERT NOT NULL "key4-kafka_table8", ASSERT NOT NULL "key2-kafka_table8", ASSERT NOT NULL "key1-kafka_table8") AS SELECT "db-pw-1720518117-0"."s-0"."t-2"."c-5-boolean" AS "c-5-boolean-t-2", "db-pw-1720518117-0"."s-0".kafka_table8.key2 AS "key2-kafka_table8", "db-pw-1720518117-0"."s-0"."t-2"."c-1-uuid" AS "c-1-uuid-t-2", "db-pw-1720518117-0"."s-0"."t-2"."c-3-date" AS "c-3-date-t-2", "db-pw-1720518117-0"."s-0".kafka_table8.key1 AS "key1-kafka_table8", "db-pw-1720518117-0"."s-0"."t-2"."c-2-int[]" AS "c-2-int[]-t-2", "db-pw-1720518117-0"."s-0".kafka_table8.key4 AS "key4-kafka_table8", "db-pw-1720518117-0"."s-0".kafka_table8.key3 AS "key3-kafka_table8", "db-pw-1720518117-0"."s-0".kafka_table8.key0 AS "key0-kafka_table8", "db-pw-1720518117-0"."s-0".kafka_table8.key5 AS "key5-kafka_table8" FROM "db-pw-1720518117-0"."s-0"."t-2" JOIN "db-pw-1720518117-0"."s-0".kafka_table8 ON TRUE;

services.log: services.log.zip

Somehow the frontiers of the sources look off by a lot and mytable10 never gets new frontiers. The MySQL subsources seem to somehow be stuck in starting:

  id  |          name           |   type    |   last_status_change_at    |  status  | error | details
------+-------------------------+-----------+----------------------------+----------+-------+---------
 u35  | mysql_source0           | mysql     | 2024-07-09 09:43:41.456+00 | running  |       |
 u36  | mytable0                | subsource | 2024-07-09 09:43:41.455+00 | starting |       |
 u40  | mysql_source1           | mysql     | 2024-07-09 09:43:46.907+00 | running  |       |
 u41  | mytable1                | subsource | 2024-07-09 09:43:46.907+00 | starting |       |
 u74  | mysql_source2           | mysql     | 2024-07-09 09:44:29.983+00 | running  |       |
 u75  | mytable2                | subsource | 2024-07-09 09:44:29.983+00 | starting |       |
 u113 | mysql_source3           | mysql     | 2024-07-09 09:45:27.139+00 | running  |       |
 u114 | mytable3                | subsource | 2024-07-09 09:45:27.139+00 | starting |       |
 u134 | mysql_source4           | mysql     | 2024-07-09 09:45:58.464+00 | running  |       |
 u135 | mytable4                | subsource | 2024-07-09 09:45:58.462+00 | starting |       |
 u149 | mysql_source6           | mysql     | 2024-07-09 09:46:19.659+00 | running  |       |
 u150 | mytable6                | subsource | 2024-07-09 09:46:19.658+00 | starting |       |
 u157 | mysql_source7           | mysql     | 2024-07-09 09:46:30.022+00 | running  |       |
 u158 | mytable7                | subsource | 2024-07-09 09:46:30.022+00 | starting |       |
 u162 | mysql_source5           | mysql     | 2024-07-09 09:46:34.633+00 | running  |       |
 u163 | mytable5                | subsource | 2024-07-09 09:46:34.632+00 | starting |       |
 u173 | mysql_source8           | mysql     | 2024-07-09 09:46:49.241+00 | running  |       |
 u174 | mytable8                | subsource | 2024-07-09 09:46:49.219+00 | starting |       |
 u185 | mysql_source9           | mysql     | 2024-07-09 09:47:04.626+00 | running  |       |
 u186 | mytable9                | subsource | 2024-07-09 09:47:04.626+00 | starting |       |
 u192 | mysql_source10          | mysql     | 2024-07-09 09:47:15.321+00 | running  |       |
 u193 | mytable10               | subsource | 2024-07-09 09:47:15.321+00 | starting |       |
 u203 | mysql_source11          | mysql     | 2024-07-09 09:47:30.536+00 | running  |       |
 u204 | mytable11               | subsource | 2024-07-09 09:47:30.535+00 | starting |       |
 u209 | mysql_source12          | mysql     | 2024-07-09 09:47:37.988+00 | running  |       |
 u210 | mytable12               | subsource | 2024-07-09 09:47:37.988+00 | starting |       |
 u34  | mysql_source0_progress  | progress  |                            | running  |       |
 u39  | mysql_source1_progress  | progress  |                            | running  |       |
 u73  | mysql_source2_progress  | progress  |                            | running  |       |
 u112 | mysql_source3_progress  | progress  |                            | running  |       |
 u133 | mysql_source4_progress  | progress  |                            | running  |       |
 u148 | mysql_source6_progress  | progress  |                            | running  |       |
 u156 | mysql_source7_progress  | progress  |                            | running  |       |
 u161 | mysql_source5_progress  | progress  |                            | running  |       |
 u172 | mysql_source8_progress  | progress  |                            | running  |       |
 u184 | mysql_source9_progress  | progress  |                            | running  |       |
 u191 | mysql_source10_progress | progress  |                            | running  |       |
 u202 | mysql_source11_progress | progress  |                            | running  |       |
 u208 | mysql_source12_progress | progress  |                            | running  |       |
(39 rows)
materialize=> select * from mz_internal.mz_source_status_history where source_id = 'u193';
        occurred_at         | source_id |  status  | error | details
----------------------------+-----------+----------+-------+---------
 2024-07-09 09:47:15.321+00 | u193      | starting |       |
(1 row)
materialize=> select * from mz_internal.mz_source_status_history where source_id = 'u192';
        occurred_at         | source_id |  status  | error | details
----------------------------+-----------+----------+-------+---------
 2024-07-09 09:47:15.321+00 | u192      | running  |       |
 2024-07-09 09:47:15.321+00 | u192      | starting |       |
(2 rows)

@rjobanp Any idea what is happening here? I'll keep it running for a bit if you have an idea for what to check/do.

(I found this while looking into https://github.com/MaterializeInc/materialize/issues/23582, but we already had stuck queries before even adding the MySQL source, so this is just an additional issue.)

rjobanp commented 2 months ago

@def- unfortunately I don't see many details in the services.log besides the fact that the source started. Since u193 remains in the starting phase it likely was snapshotting the upstream mytable10 and didn't finish... I would be able to get more insight if you could run this with mysql debug logs (like we do here: https://github.com/MaterializeInc/materialize/blob/8cc6cf10478834d478da996fd0650f2abd991b20/test/mysql-cdc/mzcompose.py#L45 ) but without that it's difficult to know why it never entered the running phase

def- commented 2 months ago

Here is a new run where mytable5 hung: services.log.zip

rjobanp commented 2 months ago

Okay it looks like the table had no rows in it and the snapshot produced 0 values:

parallel-workload-materialized-1     | cluster-u3-replica-u3-gen-0: 2024-07-09T15:54:09.672615Z TRACE mz_storage::source::mysql::snapshot: timely-0 starting transaction with consistent snapshot at: {([00000000-0000-0000-0000-000000000000, fc2abc16-3e0a-11ef-ad59-0242ac180006], Absent), ([fc2abc16-3e0a-11ef-ad59-0242ac180007, fc2abc16-3e0a-11ef-ad59-0242ac180007], 18), ([fc2abc16-3e0a-11ef-ad59-0242ac180008, ffffffff-ffff-ffff-ffff-ffffffffffff], Absent)} id=u107
parallel-workload-materialized-1     | cluster-u3-replica-u3-gen-0: 2024-07-09T15:54:09.672923Z TRACE mz_storage::source::mysql::snapshot: timely-0 started transaction id=u107
parallel-workload-materialized-1     | cluster-u3-replica-u3-gen-0: 2024-07-09T15:54:09.675315Z TRACE mz_storage::source::mysql::snapshot: timely-0 reading snapshot from table '`mysql`.`mytable5`':
parallel-workload-materialized-1     | MySqlTableDesc { schema_name: "mysql", name: "mytable5", columns: [MySqlColumnDesc { name: "key0", column_type: Some(ColumnType { scalar_type: Int16, nullable: false }), meta: None }, MySqlColumnDesc { name: "key1", column_type: Some(ColumnType { scalar_type: Float32, nullable: false }), meta: None }, MySqlColumnDesc { name: "key2", column_type: Some(ColumnType { scalar_type: Int64, nullable: false }), meta: None }, MySqlColumnDesc { name: "key3", column_type: Some(ColumnType { scalar_type: Float64, nullable: false }), meta: None }, MySqlColumnDesc { name: "key4", column_type: Some(ColumnType { scalar_type: Int32, nullable: false }), meta: None }, MySqlColumnDesc { name: "key5", column_type: Some(ColumnType { scalar_type: Int16, nullable: false }), meta: None }, MySqlColumnDesc { name: "key6", column_type: Some(ColumnType { scalar_type: Int32, nullable: false }), meta: None }, MySqlColumnDesc { name: "value0", column_type: Some(ColumnType { scalar_type: Int64, nullable: true }), meta: None }, MySqlColumnDesc { name: "value1", column_type: Some(ColumnType { scalar_type: Float32, nullable: true }), meta: None }, MySqlColumnDesc { name: "value2", column_type: Some(ColumnType { scalar_type: Timestamp { precision: Some(TimestampPrecision(0)) }, nullable: true }), meta: None }, MySqlColumnDesc { name: "value3", column_type: Some(ColumnType { scalar_type: Timestamp { precision: Some(TimestampPrecision(0)) }, nullable: true }), meta: None }, MySqlColumnDesc { name: "value4", column_type: Some(ColumnType { scalar_type: Float32, nullable: true }), meta: None }, MySqlColumnDesc { name: "value5", column_type: Some(ColumnType { scalar_type: Time, nullable: true }), meta: None }, MySqlColumnDesc { name: "value6", column_type: Some(ColumnType { scalar_type: String, nullable: true }), meta: None }, MySqlColumnDesc { name: "value7", column_type: Some(ColumnType { scalar_type: Timestamp { precision: Some(TimestampPrecision(0)) }, nullable: true }), meta: None }, MySqlColumnDesc { name: "value8", column_type: Some(ColumnType { scalar_type: Int64, nullable: true }), meta: None }, MySqlColumnDesc { name: "value9", column_type: Some(ColumnType { scalar_type: String, nullable: true }), meta: None }, MySqlColumnDesc { name: "value10", column_type: Some(ColumnType { scalar_type: Int32, nullable: true }), meta: None }, MySqlColumnDesc { name: "value11", column_type: Some(ColumnType { scalar_type: Float64, nullable: true }), meta: None }, MySqlColumnDesc { name: "value12", column_type: Some(ColumnType { scalar_type: Time, nullable: true }), meta: None }, MySqlColumnDesc { name: "value13", column_type: Some(ColumnType { scalar_type: Float32, nullable: true }), meta: None }, MySqlColumnDesc { name: "value14", column_type: Some(ColumnType { scalar_type: Timestamp { precision: Some(TimestampPrecision(0)) }, nullable: true }), meta: None }], keys: {} } id=u107
parallel-workload-materialized-1     | cluster-u3-replica-u3-gen-0: 2024-07-09T15:54:09.675590Z TRACE mz_storage::source::mysql::snapshot: timely-0 snapshotted 0 records from table '`mysql`.`mytable5`' id=u107

I wonder if this is always the case (a subsource remains in starting phase) until at least one row has been produced for the relevant table?

def- commented 2 months ago

This currently makes it impossible to use MySQL sources in parallel-workload since they seem to always be in a stuck state, so I disabled them.

rjobanp commented 2 months ago

Do you know if the tables are supposed to have any data in them?

def- commented 2 months ago

Initially they are empty in these tests, but then pretty quickly filled.

rjobanp commented 2 months ago

Okay, I'm not sure that the starting status is actually an issue with the MySQL source. In the code all source types will not get put into running status until at least one row has been processed: https://github.com/MaterializeInc/materialize/blob/d8f578ed9e3cdaf40e32c13fd511feba2e546575/src/storage/src/source/source_reader_pipeline.rs#L316-L318

So I think this issue is somehow related to the parallel workload test. For some reason the mysql tables aren't getting data put into them, such that the subsources never receive rows and move out of 'starting'. I would expect the same to happen for postgres sources but maybe there is some sort of race condition in the test that is triggered differently on mysql vs postgres (since it's flaky)?

def- commented 1 month ago

For what it's worth I don't care much about whether the status is running or starting. The important part to me is that all queries using the source hang:

a JOIN hangs indefinitely, but each of the individual objects is selectable

The timestamps of the mysql source never advance.

I don't have a simpler reproducer, but parallel-workload runs into it all the time: bin/mzcompose --find parallel-workload down && bin/mzcompose --find parallel-workload run default

diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py
index fb37f977c2..19376dcc11 100644
--- a/misc/python/materialize/parallel_workload/action.py
+++ b/misc/python/materialize/parallel_workload/action.py
@@ -2051,9 +2051,8 @@ ddl_action_list = ActionList(
         (DropKafkaSinkAction, 4),
         (CreateKafkaSourceAction, 4),
         (DropKafkaSourceAction, 4),
-        # TODO: Reenable when #28108 is fixed
-        # (CreateMySqlSourceAction, 4),
-        # (DropMySqlSourceAction, 4),
+        (CreateMySqlSourceAction, 4),
+        (DropMySqlSourceAction, 4),
         (CreatePostgresSourceAction, 4),
         (DropPostgresSourceAction, 4),
         (GrantPrivilegesAction, 4),