MaterializeInc / materialize

The Cloud Operational Data Store: use SQL to transform, deliver, and act on fast-changing data.
https://materialize.com
Other
5.72k stars 466 forks source link

The materialized views consume huge memory #8423

Closed VisionaryAries closed 1 year ago

VisionaryAries commented 2 years ago

What version of Materialize are you using?

materialized v0.9.5 (048de05e3)

How did you install Materialize?

Linux release tarball

What was the issue?

materialized views consume huge memory

The reproduce process is below.

The server configuration are as follows:

CPU cores: 12
MEMORY: 128G
Disk: SAS 8T

After starting materialize, the free memory is 122G.

[root@p-db-materialize-001 materialized]# free -g                                                                  
              total        used        free      shared  buff/cache   available                                    
Mem:            125           2         122           0           0         122                                    
Swap:            97           0          97  

Then I create three kafka sources, and the source table's record are as follows:

BOM_SUBSTITUTE_COMPONENTS: 33945939 records
BOM.BOM_COMPONENTS_B: 42982704 records
BOM.BOM_STRUCTURES_B: 545505 recors

The kafka sources:

CREATE SOURCE BOM.BOM_COMPONENTS_B
FROM KAFKA BROKER 't-edi-kafka-001:9093' TOPIC 'BOM.BOM_COMPONENTS_B'
KEY FORMAT AVRO  USING  SCHEMA '{
    "type":"record",
    "name":"Key",
    "fields":[
        {
            "name":"KAFKA_MSG_KEY",
            "type":"string"
        }
    ]
  }' 
VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://t-edi-kafka-001:8082'
ENVELOPE UPSERT;

CREATE SOURCE BOM.BOM_STRUCTURES_B
FROM KAFKA BROKER 't-edi-kafka-001:9093' TOPIC 'BOM.BOM_STRUCTURES_B'
KEY FORMAT AVRO  USING  SCHEMA '{
    "type":"record",
    "name":"Key",
    "fields":[
        {
            "name":"KAFKA_MSG_KEY",
            "type":"string"
        }
    ]
  }' 
VALUE FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://t-edi-kafka-001:8082'
ENVELOPE UPSERT;

After that, I create a materialized view.

CREATE MATERIALIZED VIEW APS.XXAPS_GP_BOM_ITEM_SUB_V AS
SELECT BS."ORGANIZATION_ID" AS ORG_ID
      ,BS."BILL_SEQUENCE_ID" AS BOM_SEQ_ID
      ,BSC."COMPONENT_SEQUENCE_ID" AS COMPONENT_SEQ_ID
      ,BCB."COMPONENT_ITEM_ID" AS COMPONENT_ITEM_ID
      ,BSC."SUBSTITUTE_COMPONENT_ID" AS SUB_ITEM_ID
      ,BCB."COMPONENT_QUANTITY" AS SUB_ITEM_QTY
      ,1 AS SUB_ITEM_QTY_D
      ,COALESCE(BSC."ACD_TYPE", 1) AS ACD_TYPE
  FROM BOM.BOM_SUBSTITUTE_COMPONENTS BSC
 INNER JOIN BOM.BOM_COMPONENTS_B BCB
    ON BSC."COMPONENT_SEQUENCE_ID" = BCB."COMPONENT_SEQUENCE_ID"
 INNER JOIN BOM.BOM_STRUCTURES_B BS
    ON BS."BILL_SEQUENCE_ID" = BCB."BILL_SEQUENCE_ID"
 WHERE COALESCE(BSC."ACD_TYPE", 1) <> 3;

Then check the free memory is 74G, the materialized view APS.XXAPS_GP_BOM_ITEM_SUB_V used 47G.

[root@p-db-materialize-001 materialized]# free -g
              total        used        free      shared  buff/cache   available
Mem:            125          51          74           0           0          73
Swap:            97           0          97

So, the memory consumption is normal? Or am I doing something wrong?

elindsey commented 2 years ago

How many unique keys do you have in the Kafka topics? As noted here, the UPSERT envelope has higher memory usage than an envelope with explicit retractions (such as debezium) - the memory growth is dependent on number of unique keys.

The view you're creating is joining three Kafka topics together without aggregation and with one filter - potentially this is retaining a large dataset in memory. To understand if 74G is an expected amount of memory for this set:

VisionaryAries commented 2 years ago

How many unique keys do you have in the Kafka topics? As noted here, the UPSERT envelope has higher memory usage than an envelope with explicit retractions (such as debezium) - the memory growth is dependent on number of unique keys.

The view you're creating is joining three Kafka topics together without aggregation and with one filter - potentially this is retaining a large dataset in memory. To understand if 74G is an expected amount of memory for this set:

  • How big in gigabytes are these Kafka topics?
  • What's the average size of the fields you are listing in your SELECT statement?
  • How many records will your ACD_TYPE clause filter out?

@elindsey Thanks for your reply, the details are as follows:

  1. These topic's unique key and size BOM_SUBSTITUTE_COMPONENTS: 33945939 unique keys / 1.24 (GB) BOM.BOM_COMPONENTS_B: 42982704 unique keys / 3.62 (GB) BOM.BOM_STRUCTURES_B: 545505 unique keys / 35.11 (MB)
apstest=> select * from mz_records_per_dataflow_global order by records desc;
  id  |                           name                            |  records  
------+-----------------------------------------------------------+-----------
 1279 | Dataflow: apstest.aps.xxaps_gp_bom_item_sub_v_primary_idx | 153254393
  1. What's the average size of the fields you are listing in your SELECT statement? 32 bytes

    The records are similar to the following.

    apstest=> select * from aps.xxaps_gp_bom_item_sub_v limit 1;
    org_id | bom_seq_id | component_seq_id | component_item_id | sub_item_id | sub_item_qty | sub_item_qty_d | acd_type 
    --------+------------+------------------+-------------------+-------------+--------------+----------------+----------
    1475 |   45823779 |         45823782 |            591175 |      632362 |            1 |              1 |        1
    (1 row)
  2. How many records will your ACD_TYPE clause filter out? 33371739 records

wangandi commented 2 years ago

Hi, @VisionaryAries! We have two types of join implementations in Materialize (For more information see this blog post: https://materialize.com/joins-in-materialize/ ). Based on the information you have provided, if you query EXPLAIN VIEW APS.XXAPS_GP_BOM_ITEM_SUB_V, you should currently have a join with a Differential implementation. You can trigger the DeltaQuery implementation and see if memory goes down.

After creating your sources, create the following indexes and views:

CREATE VIEW BOM.BOM_SUBSTITUTE_COMPONENTS_FILTERED AS 
  SELECT *
  FROM BOM.BOM_SUBSTITUTE_COMPONENTS
  WHERE COALESCE(BSC."ACD_TYPE", 1) <> 3;
CREATE INDEX ON BOM.BOM_SUBSTITUTE_COMPONENTS_FILTERED("COMPONENT_SEQUENCE_ID");
CREATE INDEX ON BOM.BOM_COMPONENTS_B("COMPONENT_SEQUENCE_ID");
CREATE INDEX ON BOM.BOM_COMPONENTS_B("BILL_SEQUENCE_ID");
CREATE INDEX ON BOM.BOM_STRUCTURES_B("BILL_SEQUENCE_ID");

Then replace BOM.BOM_SUBSTITUTE_COMPONENTS with BOM.BOM_SUBSTITUTE_COMPONENTS_FILTERED in the definition of the view APS.XXAPS_GP_BOM_ITEM_SUB_V. You can query EXPLAIN VIEW APS.XXAPS_GP_BOM_ITEM_SUB_V to check that Materialize is indeed using the DeltaQuery implementation.

If memory is still high, calculate the amount of memory different parts of the query take up.

benesch commented 1 year ago

Closing as stale.