apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.8k stars 4.22k forks source link

[Bug]: Unable to Restart Google Spanner Change Streams Consumer due to tableExists(table_name) bug #32509

Open NimzyMaina opened 1 week ago

NimzyMaina commented 1 week ago

What happened?

The method used to check whether a table exists on spanner can in some scenarios always return false as there is no ability to specify the table_catalog and table_schema. In my case, these fields are always populated in the information_schema.tables view.

The first time I run the application, it runs fine as it creates the metadata table. The problem arrises when I restart the application & specify the same metadata table. It tries to recreate the table but it already exists resulting in a Spanner Exception.

Caused by: com.google.cloud.spanner.SpannerException: FAILED_PRECONDITION: Operation with name "projects/<project>/instances/<instance>/databases/<db>/operations/..." failed with status = GrpcStatusCode{transportCode=FAILED_PRECONDITION} and message = Duplicate name in schema: java_metadata_2.

Link to tableExists code

  /**
   * Checks whether the metadata table already exists in the database.
   *
   * @return true if the table exists, false if the table does not exist.
   */
  public boolean tableExists() {
    final String checkTableExistsStmt =
        "SELECT t.table_name FROM information_schema.tables AS t "
            + "WHERE t.table_catalog = '' AND "
            + "t.table_schema = '' AND "
            + "t.table_name = '"
            + metadataTableName
            + "'";
    try (ResultSet queryResultSet =
        databaseClient
            .singleUseReadOnlyTransaction()
            .executeQuery(Statement.of(checkTableExistsStmt))) {
      return queryResultSet.next();
    }
  }

Link to Where it is used

@ProcessElement
  public void processElement(OutputReceiver<PartitionMetadata> receiver) {
    PartitionMetadataDao partitionMetadataDao = daoFactory.getPartitionMetadataDao();
    if (!partitionMetadataDao.tableExists()) { // <-- Fails at this point on restart
      daoFactory.getPartitionMetadataAdminDao().createPartitionMetadataTable();
      createFakeParentPartition();
    }
    final PartitionMetadata initialPartition =
        Optional.ofNullable(partitionMetadataDao.getPartition(InitialPartition.PARTITION_TOKEN))
            .map(mapperFactory.partitionMetadataMapper()::from)
            .orElseThrow(
                () -> new IllegalStateException("Initial partition not found in metadata table."));
    receiver.output(initialPartition);
  }

This results in a scenario where a change stream consumer cannot recover from a restart.

Suggested solution

/**
   * Checks whether the metadata table already exists in the database.
   *
   * @return true if the table exists, false if the table does not exist.
   */
  public boolean tableExists() {
    final String checkTableExistsStmt =
        "SELECT t.table_name FROM information_schema.tables AS t "
            + "WHERE t.table_name = '" + metadataTableName + "'";
    try (ResultSet queryResultSet =
        databaseClient
            .singleUseReadOnlyTransaction()
            .executeQuery(Statement.of(checkTableExistsStmt))) {
      return queryResultSet.next();
    }
  }

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

liferoad commented 1 week ago

cc @nielm

nielm commented 6 days ago

cc: @thiagotnunes

nielm commented 6 days ago

The fix may have to be more complicated than the suggested solution, as it is possible to have multiple schemas in a spanner database.

I have handed this over to the team responsible for maintaining SpannerIO Change streams.

NimzyMaina commented 6 days ago

Hi @nielm,

Thanks for your quick response. I understand that may be the case. If they can come up with a quick solution that will factor that in then that would be great.

However, if that will take a bit of time, could we please have this added as an intermediate solution. As the SDK is right now, it's not usable. This solution will allow people to use the SDK as we await for a permanent solution.

If this is still not acceptable, can we please have something like this instead that will not throw an Exception on restart?

CREATE TABLE IF NOT EXISTS metadata_table ...
nielm commented 4 days ago

@shuranzhang

dedocibula commented 6 hours ago

Are you using Postgres dialect by any chance? If so are you seeing table_schema set to public?