googleapis / java-bigquerystorage

Apache License 2.0
60 stars 80 forks source link

JsonStreamWriter Support ISO TIMESTAMP String #1764

Open ismailsimsek opened 2 years ago

ismailsimsek commented 2 years ago

What the problem is:

currently uploading ISO formatted Timestamp values fails with JSONObject does not have a int64 field at root.c_timestamptz.'.

currently its only accepting int values

is it possible to add support?

What you want to happen. able to upload ISO formatted Timestamp values to TIMESTAMP field.

2022-08-28 12:56:50,656 INFO  [io.deb.ser.ConnectorLifecycle] (pool-11-thread-1) Connector completed: success = 'false', message = 'Stopping connector after error in the application's handler method: JSONObject does not have a int64 field at root.c_timestamptz.', error = '{}': java.lang.IllegalArgumentException: JSONObject does not have a int64 field at root.c_timestamptz.
    at com.google.cloud.bigquery.storage.v1.JsonToProtoMessage.fillField(JsonToProtoMessage.java:351)
    at com.google.cloud.bigquery.storage.v1.JsonToProtoMessage.convertJsonToProtoMessageImpl(JsonToProtoMessage.java:176)
    at com.google.cloud.bigquery.storage.v1.JsonToProtoMessage.convertJsonToProtoMessage(JsonToProtoMessage.java:115)
    at com.google.cloud.bigquery.storage.v1.JsonStreamWriter.append(JsonStreamWriter.java:147)
    at com.google.cloud.bigquery.storage.v1.JsonStreamWriter.append(JsonStreamWriter.java:106)
ismailsimsek commented 2 years ago

it might be that behavior changed between Version 2.20.0 and 2.20.1

with following dependency it works.

            <dependency>
                <groupId>com.google.cloud</groupId>
                <artifactId>google-cloud-bigquerystorage</artifactId>
                <version>2.20.0</version>
            </dependency>
Neenu1995 commented 2 years ago

Hi @ismailsimsek , When you say it might be that behavior changed between Version 2.20.0 and 2.20.1, does that mean 2.20.1 does not support the feature, but 2.20.0 does?

Can you also provide a code snippet to reproduce the issue?

ismailsimsek commented 2 years ago

@Neenu1995 Correct it was working before. after upgrading dependency to 2.20.1 it started giving error

ismailsimsek commented 2 years ago

here is code snippet

package io.debezium.server.bigquery;

import java.io.IOException;
import java.util.Arrays;

import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.*;
import com.google.cloud.bigquery.storage.v1.*;
import com.google.protobuf.Descriptors;
import org.json.JSONArray;
import org.json.JSONObject;

import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.common.collect.ImmutableMap;

public class testStreamLoading {

  public static void main(String[] args) throws IOException, InterruptedException, Descriptors.DescriptorValidationException {
    // [START ]
    // Table schema definition
    BigQuery bigquery = BigQueryOptions.newBuilder()
        .setCredentials(GoogleCredentials.getApplicationDefault())
        .setLocation("EU")
        .build()
        .getService();
    Field[] fields =
        new Field[] {
            Field.of("c_id", LegacySQLTypeName.INTEGER),
            Field.of("c_ts", LegacySQLTypeName.TIMESTAMP)
        };
    // Table schema definition
    Schema schema = Schema.of(fields);

    BigQueryWriteSettings bigQueryWriteSettings = BigQueryWriteSettings
        .newBuilder()
        .setCredentialsProvider(FixedCredentialsProvider.create(GoogleCredentials.getApplicationDefault()))
        .build();
    BigQueryWriteClient bigQueryWriteClient = BigQueryWriteClient.create(bigQueryWriteSettings);
    TableName tn = TableName.of(bigquery.getOptions().getProjectId(), "stage", "test_json_loading");

    JsonStreamWriter streamWriter = JsonStreamWriter
        .newBuilder(tn.toString(), BqToBqStorageSchemaConverter.convertTableSchema(schema), bigQueryWriteClient)
        .build();

    JSONArray jsonArr = new JSONArray();
    JSONObject record = new JSONObject();
    record.put("c_id", 1);
    record.put("c_ts", "2019-11-14T00:55:31.820Z");
    jsonArr.put(record);

    streamWriter.append(jsonArr);
    System.out.println("DONE");
    // [END ]
  }
}

/** Converts structure from BigQuery client to BigQueryStorage client */
class BqToBqStorageSchemaConverter {
  private static ImmutableMap<Field.Mode, TableFieldSchema.Mode> BQTableSchemaModeMap =
      ImmutableMap.of(
          Field.Mode.NULLABLE, TableFieldSchema.Mode.NULLABLE,
          Field.Mode.REPEATED, TableFieldSchema.Mode.REPEATED,
          Field.Mode.REQUIRED, TableFieldSchema.Mode.REQUIRED);

  private static ImmutableMap<StandardSQLTypeName, TableFieldSchema.Type> BQTableSchemaTypeMap =
      new ImmutableMap.Builder<StandardSQLTypeName, TableFieldSchema.Type>()
          .put(StandardSQLTypeName.BOOL, TableFieldSchema.Type.BOOL)
          .put(StandardSQLTypeName.BYTES, TableFieldSchema.Type.BYTES)
          .put(StandardSQLTypeName.DATE, TableFieldSchema.Type.DATE)
          .put(StandardSQLTypeName.DATETIME, TableFieldSchema.Type.DATETIME)
          .put(StandardSQLTypeName.FLOAT64, TableFieldSchema.Type.DOUBLE)
          .put(StandardSQLTypeName.GEOGRAPHY, TableFieldSchema.Type.GEOGRAPHY)
          .put(StandardSQLTypeName.INT64, TableFieldSchema.Type.INT64)
          .put(StandardSQLTypeName.NUMERIC, TableFieldSchema.Type.NUMERIC)
          .put(StandardSQLTypeName.STRING, TableFieldSchema.Type.STRING)
          .put(StandardSQLTypeName.STRUCT, TableFieldSchema.Type.STRUCT)
          .put(StandardSQLTypeName.TIME, TableFieldSchema.Type.TIME)
          .put(StandardSQLTypeName.TIMESTAMP, TableFieldSchema.Type.TIMESTAMP)
          .build();

  /**
   * Converts from BigQuery client Table Schema to bigquery storage API Table Schema.
   *
   * @param schema the BigQuery client Table Schema
   * @return the bigquery storage API Table Schema
   */
  public static TableSchema convertTableSchema(Schema schema) {
    TableSchema.Builder result = TableSchema.newBuilder();
    for (int i = 0; i < schema.getFields().size(); i++) {
      result.addFields(i, convertFieldSchema(schema.getFields().get(i)));
    }
    return result.build();
  }

  /**
   * Converts from bigquery v2 Field Schema to bigquery storage API Field Schema.
   *
   * @param field the BigQuery client Field Schema
   * @return the bigquery storage API Field Schema
   */
  public static TableFieldSchema convertFieldSchema(Field field) {
    TableFieldSchema.Builder result = TableFieldSchema.newBuilder();
    if (field.getMode() == null) {
      field = field.toBuilder().setMode(Field.Mode.NULLABLE).build();
    }
    result.setMode(BQTableSchemaModeMap.get(field.getMode()));
    result.setName(field.getName());
    result.setType(BQTableSchemaTypeMap.get(field.getType().getStandardType()));
    if (field.getDescription() != null) {
      result.setDescription(field.getDescription());
    }
    if (field.getSubFields() != null) {
      for (int i = 0; i < field.getSubFields().size(); i++) {
        result.addFields(i, convertFieldSchema(field.getSubFields().get(i)));
      }
    }
    return result.build();
  }
}
ismailsimsek commented 2 years ago

libs which its working without error


[INFO] +- com.google.cloud:google-cloud-bigquery:jar:2.14.6:compile
[INFO] |  +- com.google.apis:google-api-services-bigquery:jar:v2-rev20220806-2.0.0:compile
[INFO] |  +- com.google.cloud:google-cloud-bigquerystorage:jar:2.20.0:compile
[INFO] |  +- com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta1:jar:0.134.1:compile
[INFO] |  +- com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta2:jar:0.134.1:compile
[INFO] |  +- com.google.api.grpc:grpc-google-cloud-bigquerystorage-v1beta1:jar:0.134.1:compile
[INFO] |  +- com.google.api.grpc:grpc-google-cloud-bigquerystorage-v1beta2:jar:0.134.1:compile
[INFO] |  +- com.google.api.grpc:grpc-google-cloud-bigquerystorage-v1:jar:2.10.1:compile
[INFO] |  +- com.google.api.grpc:proto-google-cloud-bigquerystorage-v1:jar:2.10.1:compile
dark0dave commented 1 year ago

hmm, this is interesting, I wonder what broke this. As we do have tests for this.

dark0dave commented 1 year ago

https://github.com/googleapis/java-bigquerystorage/blob/main/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessageTest.java#L965

dark0dave commented 1 year ago

I'll have a look when I have some more time.