apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.81k stars 4.23k forks source link

[Bug]: BigqueryIO is very slow if using storage api and dynamic destination to write data to over thousand different tables with high data skew #32508

Open ns-shua opened 2 weeks ago

ns-shua commented 2 weeks ago

What happened?

I'm trying to use BigqueryIO and use the Storage API as suggested in at least once mode(both pipeline and IO) My requirement is to write data to over thousand table in different projects. And the data is highly skews the top 10 tables could take 80% of the traffic. I observe the pipeline becomes super slow and CPU utilization is almost always below 30%. I think it is the data skew problem. But our data is logically partitioned in that way that I have no control of it. I tried to write same volume to data to single table(all the tables are in same schema). It perform very well even with 1/4 of the machines. The document claims DynamicDestination should perform as good as single destination. Is there any performance issue or is there any suggestions?

Here is the code I use to write to different table

BigQueryIO.<KV<TopicMetadata, T>>write()
        .withFormatFunction(...)
        .withoutValidation()
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .optimizedWrites()
        .withClustering()
        .to(
            new SerializableFunction<..>() {...} // Here I tried both SerializableFunction and DynamicDestination class
         );

This code perform much much worse than

BigQueryIO.<KV<TopicMetadata, T>>write()
        .withFormatFunction(...)
        .withoutValidation()
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .optimizedWrites()
        .withClustering()
        .to(
            "project_all.example_dataset.alldata"
         );

with same amount of data

Writing to different tables the CPU usage is constantly below 30% while writing to single table CPU usage is constantly near 100%

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

liferoad commented 2 weeks ago

Have you tried to profile the pipeline to figure out some potential issues? cc @ahmedabu98

ns-shua commented 1 week ago

@liferoad There are some upstream transform I could improve but it has nothing to do with the bigquery write. The only difference in code is writing to one table or writing to many tables

liferoad commented 1 week ago

Added the dev list thread here: https://lists.apache.org/thread/gz5zhnworvcjog0o4g96lsqbw5tz6y03 @ns-shua -shua Have you opened a customer support ticket for Dataflow? It will be helpful to check your Dataflow jobs.

ahmedabu98 commented 1 week ago

Can you try to enable multi-plexing [1]? You can do so by setting --useStorageApiConnectionPool=true [2]

[1] https://cloud.google.com/bigquery/docs/write-api-best-practices#connection_pool_management [2] https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/gcp/bigquery/BigQueryOptions.html#setUseStorageApiConnectionPool-java.lang.Boolean-

ns-shua commented 1 week ago

@ahmedabu98 I believe if i don't use connection pool, writing to one table won't work. So yes I've set it to true @liferoad I asked in mailing channel also I created support ticket but so far I got 0 useful help or tip. They mentioned they found a hotkey? I'm not sure. Can you explain if the data volume is high skewed among all the tables what would auto sharding behave, does it create more workers for hot tables?

liferoad commented 1 week ago

What is your support ticket number? Is this streaming or batch?

ns-shua commented 1 week ago

@liferoad Case 53209037 I'm confused by the memory dump, I do see a lot of StorageApiWriteUnshardedRecords but I have withAutosharding() in my code

ns-shua commented 1 week ago

It is streaming at least once mode

liferoad commented 1 week ago

Can you share the latest entire code if possible? From the ticket, it seems the job with withAutosharding does not scale down.

ns-shua commented 1 week ago

@liferoad The latest entire code is a little bit complicated, I can give you a simplified version for the bigquery write part

PCollection<KV<TenantMetadata, Row>> data = .... // here the TenantMetadata has table destination information
BigQueryIO.<KV<TopicMetadata, Row>>write()
           .withFormatFunction(
                    elem -> BigQueryUtils.toTableRow(row(elem.getValue()))
           .withMethod(BigQueryIO.Write.Method.STORAGE_API_AT_LEAST_ONCE).
           .withoutValidation()
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
        .optimizedWrites()
        .withClustering()
        .withAutoSharding()
       .withFailedInsertRetryPolicy(retryTransientErrors()
       .to(tenantProjectDestinations) // refer to the TenantProjectDestination function, because all the tables have identical schema so I choose the to(TableFunction) method
);

public class TenantProjectDestinations<T> implements
    SerializableFunction<ValueInSingleWindow<KV<TopicMetadata, T>>, TableDestination> {

    private static final Map<TopicMetadata, TableDestination> destinationCache = Maps.newHashMap();

    private final ValueProvider<String> tableId;

    public static <TableRowType> TenantProjectDestinations<TableRowType> of(
            ValueProvider<String> tableId) {
        return new TenantProjectDestinations<>(tableId);
    }

    private TenantProjectDestinations(
            ValueProvider<String> tableId) {
        this.tableId = tableId;
    }

    @Override
    public TableDestination apply(ValueInSingleWindow<KV<TopicMetadata, T>> input) {
      assert input != null;
      TopicMetadata k = Objects.requireNonNull(input.getValue()).getKey();
        destinationCache.computeIfAbsent(k, topicMetadata -> new TableDestination(
            new TableReference()
                .setProjectId(k.getProjectId())
                .setDatasetId(k.getDatasetId())
                .setTableId(tableId.get()),
            "TenantTable"));
        return destinationCache.get(k);
    }
}
ns-shua commented 1 week ago

@liferoad one thing I'm not sure is how do we get the schema of the table? From the internal implementation it looks like it cache the table schema by TableDestination, so it will get schema for each table once? If I use withSchema(...) Would it improve the performance. Thanks! Also I'm confused when you say autoShading doesn't scale down. It looks like to me writing to BQ is the bottleneck, it could write at the speed of the upstream transform but most workers has CPU usage less than 50%

ns-shua commented 1 week ago

One thing I notice that doesn't look right to me is the memory usage keeps going up. I enabled the profiler and I saw a lot of message used by TwoLevelMessageConverterCache

ns-shua commented 1 week ago

@liferoad And in diagnostic errors table I see a lot of

WARNING 2024-09-22T03:32:15.969Z Operation ongoing in bundle for at least 10m00s without completing Processing times in each step(millis) Step name: WriteToCommonAndTenantProject/StorageApiLoads/Convert/Convert to message Time spent in this step: IntSummaryStatistics{count=2412, sum=107, min=0, average=0.044362, max=21} Step name: WriteToCommonAndTenantProjectFailedInsert/StorageApiLoads/StorageApiWriteInconsistent/Write Records Time spent in this step: IntSummaryStatistics{count=1934, sum=161, min=0, average=0.083247, max=85} Step name: WriteToCommonAndTenantProjectFailedInsert/PrepareWrite/ParDo(Anonymous) Time spent in this step: IntSummaryStatistics{count=1934, sum=0, min=0, average=0.000000, max=0} Step name: ExtractFeature Time spent in this step: IntSummaryStatistics{count=7, sum=5, min=0, average=0.714286, max=2} Step name: WriteToCommonAndTenantProject/StorageApiLoads/StorageApiWriteInconsistent/Write Records Time spent in this step: IntSummaryStatistics{count=2412, sum=14, min=0, average=0.005804, max=8} Step name: WriteToCommonAndTenantProject/StorageApiLoads/rewindowIntoGlobal/Window.Assign Time spent in this step: IntSummaryStatistics{count=2412, sum=0, min=0, average=0.000000, max=0} Step name: WriteToCommonAndTenantProjectFailedInsert/StorageApiLoads/Convert/Convert to message Time spent in this step: IntSummaryStatistics{count=1934, sum=7, min=0, average=0.003619, max=4} Step name: EnrichTenantMetadata Time spent in this step: IntSummaryStatistics{count=1206, sum=0, min=0, average=0.000000, max=0} Step name: Reshuffle.ViaRandomKey/Reshuffle/ExpandIterable Time spent in this step: IntSummaryStatistics{count=7, sum=0, min=0, average=0.000000, max=0} Step name: Reshuffle.ViaRandomKey/Values/Values/Map Time spent in this step: IntSummaryStatistics{count=7, sum=0, min=0, average=0.000000, max=0} Step name: DropFields.Inner/Select.Fields/ParDo(Select) Time spent in this step: IntSummaryStatistics{count=1206, sum=0, min=0, average=0.000000, max=0} Step name: GEFMessageDecode Time spent in this step: IntSummaryStatistics{count=7, sum=12, min=0, average=1.714286, max=11} Step name: WriteToCommonAndTenantProject/PrepareWrite/ParDo(Anonymous) Time spent in this step: IntSummaryStatistics{count=2412, sum=0, min=0, average=0.000000, max=0} Step name: WriteToCommonAndTenantProjectFailedInsert/StorageApiLoads/rewindowIntoGlobal/Window.Assign Time spent in this step: IntSummaryStatistics{count=1934, sum=0, min=0, average=0.000000, max=0} Step name: Reshuffle.ViaRandomKey/Reshuffle/GroupBy…
  {
    "insertId": "7981515283196192541:164283:0:12850531",
    "jsonPayload": {
      "worker": "txn-log-v4-static-tenant--09211208-8ico-harness-7411",
      "message": "Operation ongoing in bundle for at least 10m00s without completing\nProcessing times in each step(millis)\nStep name: WriteToCommonAndTenantProject/StorageApiLoads/Convert/Convert to message\nTime spent in this step: IntSummaryStatistics{count=2412, sum=107, min=0, average=0.044362, max=21}\nStep name: WriteToCommonAndTenantProjectFailedInsert/StorageApiLoads/StorageApiWriteInconsistent/Write Records\nTime spent in this step: IntSummaryStatistics{count=1934, sum=161, min=0, average=0.083247, max=85}\nStep name: WriteToCommonAndTenantProjectFailedInsert/PrepareWrite/ParDo(Anonymous)\nTime spent in this step: IntSummaryStatistics{count=1934, sum=0, min=0, average=0.000000, max=0}\nStep name: ExtractFeature\nTime spent in this step: IntSummaryStatistics{count=7, sum=5, min=0, average=0.714286, max=2}\nStep name: WriteToCommonAndTenantProject/StorageApiLoads/StorageApiWriteInconsistent/Write Records\nTime spent in this step: IntSummaryStatistics{count=2412, sum=14, min=0, average=0.005804, max=8}\nStep name: WriteToCommonAndTenantProject/StorageApiLoads/rewindowIntoGlobal/Window.Assign\nTime spent in this step: IntSummaryStatistics{count=2412, sum=0, min=0, average=0.000000, max=0}\nStep name: WriteToCommonAndTenantProjectFailedInsert/StorageApiLoads/Convert/Convert to message\nTime spent in this step: IntSummaryStatistics{count=1934, sum=7, min=0, average=0.003619, max=4}\nStep name: EnrichTenantMetadata\nTime spent in this step: IntSummaryStatistics{count=1206, sum=0, min=0, average=0.000000, max=0}\nStep name: Reshuffle.ViaRandomKey/Reshuffle/ExpandIterable\nTime spent in this step: IntSummaryStatistics{count=7, sum=0, min=0, average=0.000000, max=0}\nStep name: Reshuffle.ViaRandomKey/Values/Values/Map\nTime spent in this step: IntSummaryStatistics{count=7, sum=0, min=0, average=0.000000, max=0}\nStep name: DropFields.Inner/Select.Fields/ParDo(Select)\nTime spent in this step: IntSummaryStatistics{count=1206, sum=0, min=0, average=0.000000, max=0}\nStep name: GEFMessageDecode\nTime spent in this step: IntSummaryStatistics{count=7, sum=12, min=0, average=1.714286, max=11}\nStep name: WriteToCommonAndTenantProject/PrepareWrite/ParDo(Anonymous)\nTime spent in this step: IntSummaryStatistics{count=2412, sum=0, min=0, average=0.000000, max=0}\nStep name: WriteToCommonAndTenantProjectFailedInsert/StorageApiLoads/rewindowIntoGlobal/Window.Assign\nTime spent in this step: IntSummaryStatistics{count=1934, sum=0, min=0, average=0.000000, max=0}\nStep name: Reshuffle.ViaRandomKey/Reshuffle/GroupByKey/ReadStream\nTime spent in this step: IntSummaryStatistics{count=14, sum=0, min=0, average=0.000000, max=0}\n  at java.base@11.0.20/java.lang.Thread.sleep(Native Method)\n  at app//org.apache.beam.sdk.util.Sleeper$$Lambda$764/0x0000000800c70c40.sleep(Unknown Source)\n  at app//org.apache.beam.sdk.util.BackOffUtils.next(BackOffUtils.java:48)\n  at app//org.apache.beam.sdk.io.gcp.bigquery.RetryManager.await(RetryManager.java:317)\n  at app//org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushAll(StorageApiWriteUnshardedRecords.java:1026)\n  at app//org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.flushIfNecessary(StorageApiWriteUnshardedRecords.java:994)\n  at app//org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn.process(StorageApiWriteUnshardedRecords.java:1144)\n  at app//org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)\n",
      "thread": "36",
      "job": "2024-09-21_12_07_17-18246689280237981546",
      "logger": "org.apache.beam.runners.dataflow.worker.DataflowExecutionContext$DataflowExecutionStateTracker"
    },
    "resource": {
      "type": "dataflow_step",
      "labels": {
        "region": "australia-southeast1",
        "step_id": "",
        "job_id": "2024-09-21_12_07_17-18246689280237981546",
        "job_name": "txn-log-v4-static-tenant-filter-dynamicdestination",
        "project_id": "project-data-au"
      }
    },
    "timestamp": "2024-09-22T03:32:15.969Z",
    "severity": "WARNING",
    "labels": {
      "dataflow.googleapis.com/log_type": "supportability",
      "dataflow.googleapis.com/region": "australia-southeast1",
      "dataflow.googleapis.com/job_id": "2024-09-21_12_07_17-18246689280237981546",
      "dataflow.googleapis.com/job_name": "txn-log-v4-static-tenant-filter-dynamicdestination",
      "compute.googleapis.com/resource_type": "instance",
      "compute.googleapis.com/resource_name": "txn-log-v4-static-tenant--09211208-8ico-harness-7411",
      "compute.googleapis.com/resource_id": "7981515283196192541"
    },
    "logName": "projects/project-data-au/logs/dataflow.googleapis.com%2Fworker",
    "receiveTimestamp": "2024-09-22T03:32:25.025214266Z",
    "errorGroups": [
      {
        "id": "CLTglOLFqKTiag"
      }
    ]
  }
liferoad commented 1 week ago

@ns-shua our engineers provided more comments through your support ticket. Let us move the discussions to the support ticket. And I also shared this issue with our team. Thanks.

ns-shua commented 1 week ago

@liferoad Thanks!