googleapis / java-bigtable-hbase

Java libraries and HBase client extensions for accessing Google Cloud Bigtable
https://cloud.google.com/bigtable/
Apache License 2.0
174 stars 178 forks source link

SourceRowCount example seems not working with the higher versions of bean packages? #2182

Open liufuyang opened 5 years ago

liufuyang commented 5 years ago

Basically I am trying to follow the guide from

And I wanted to run the job SourceRowCount to count the bigtable row numbers.

As I used the pom.xml from the second url above, some of the packages are newer than the first url above.

Then in encountered this issue: https://github.com/googleapis/cloud-bigtable-client/issues/2150

Then I used the solution mentioned in the #2150 to change bigtable-hbase-beam to 1.5.0, the local error of com.google.bigtable.repackaged.io.grpc.StatusRuntimeException: UNAUTHENTICATED: Request had invalid authentication credentials. is gone.

However I still have Permission denied error on the log of dataflow job while scan bigtable. I believed that I have set the service accounts correct for my project.

Then I changed <beam.version>2.13.0</beam.version> to <beam.version>2.11.0</beam.version>, my job stopped reporting Permission denined issue on dataflow. However the job still failed after a long hour with message: Workflow failed. Causes: The Dataflow job appears to be stuck because no worker activity has been seen in the last 1h.

So my question now is: How can I do a bigtable row count properly. Should I just use the exact configuration of the SourceRowCount example or?

liufuyang commented 5 years ago

This is the project pom file: (basically changed based generated from https://cloud.google.com/dataflow/docs/quickstarts/quickstart-java-maven guide) https://pastebin.com/UmmihvYX

This is the command I used to run the job:

mvn clean compile

export GOOGLE_APPLICATION_CREDENTIALS=$(pwd)/key.json

mvn -Pdataflow-runner exec:java \
    -Dexec.mainClass=org.apache.beam.examples.SourceRowCount \
    -Dexec.args="--project=promotions-targeting \
      --region=europe-west1 \
      --stagingLocation=gs://promo-targeting-temporary/fuyangl/staging/ \
      --tempLocation=gs://promo-targeting-temporary/fuyangl/temp/ \
      --output=gs://promo-targeting-temporary/fuyangl/bt-count-output \
      --bigtableProjectId=draper \
      --bigtableInstanceId=draper \
      --bigtableTableId=cohorts \
      --numWorkers=3 \
      --workerMachineType=n1-standard-8 \
      --serviceAccount=fuyangl@promotions-targeting.iam.gserviceaccount.com \
      --runner=DataflowRunner"

And this is the job java code:

package org.apache.beam.examples;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;

import com.google.cloud.bigtable.beam.CloudBigtableIO;
import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration;

public class SourceRowCount {

  public interface CloudBigtableOptions extends PipelineOptions {
    @Description("The Google Cloud project ID for the Cloud Bigtable instance.")
    String getProjectId();

    void setProjectId(String projectId);

    @Description("The Google Cloud project ID for the Cloud Bigtable instance.")
    String getBigtableProjectId();

    void setBigtableProjectId(String bigtableProjectId);

    @Description("The Google Cloud Bigtable instance ID .")
    String getBigtableInstanceId();

    void setBigtableInstanceId(String bigtableInstanceId);

    @Description("The Cloud Bigtable table ID in the instance." )
    String getBigtableTableId();

    void setBigtableTableId(String bigtableTableId);

    /** Set this required option to specify where to write the output. */
    @Description("Path of the file to write to")
    @Validation.Required
    String getOutput();

    void setOutput(String value);
  }

  // Converts a Long to a String so that it can be written to a file.
  static DoFn<Long, String> stringifier = new DoFn<Long, String>() {
    private static final long serialVersionUID = 1L;

    @ProcessElement
    public void processElement(DoFn<Long, String>.ProcessContext context) throws Exception {
      context.output(context.element().toString());
    }
  };

  public static void main(String[] args) {
    CloudBigtableOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(CloudBigtableOptions.class);
    String PROJECT_ID = options.getBigtableProjectId();
    String INSTANCE_ID = options.getBigtableInstanceId();
    String TABLE_ID = options.getBigtableTableId();

    // [START bigtable_dataflow_connector_scan_config]
    Scan scan = new Scan();
    scan.setCacheBlocks(false);
    scan.setFilter(new FirstKeyOnlyFilter());

    // CloudBigtableTableConfiguration contains the project, zone, cluster and table to connect to.
    // You can supply an optional Scan() to filter the rows that will be read.
    CloudBigtableScanConfiguration config =
        new CloudBigtableScanConfiguration.Builder()
            .withProjectId(PROJECT_ID)
            .withInstanceId(INSTANCE_ID)
            .withTableId(TABLE_ID)
            .withScan(scan)
            .build();

    Pipeline p = Pipeline.create(options);

    p.apply(Read.from(CloudBigtableIO.read(config)))
        .apply(Count.<Result>globally())
        .apply(ParDo.of(stringifier))
        .apply(TextIO.write().to(options.getOutput()));
    // [END bigtable_dataflow_connector_scan_config]

    p.run().waitUntilFinish();

    // Once this is done, you can get the result file via "gsutil cp <resultLocation>-00000-of-00001"
  }
}
liufuyang commented 5 years ago

@sduskis I created the issue here again.

This is as far as I can get for now, but still not success on getting the count of bigtable rows: image

liufuyang commented 5 years ago

Okay, the error of Workflow failed. Causes: The Dataflow job appears to be stuck because no worker activity has been seen in the last 1h. is probably because I setup a wrong project id on --bigtableProjectId=draper. I am testing it again with a correct one to see what happens.

At the same time, I checked out the code from https://github.com/GoogleCloudPlatform/cloud-bigtable-examples/tree/master/java/dataflow-connector-examples, and use the code directly and start the job with command:

export GOOGLE_APPLICATION_CREDENTIALS=$(pwd)/key.json

mvn package exec:exec \
>     -DSourceRowCount \
>     -DprojectID=promotions-targeting \
>     -Dbigtable.projectID=xpn-draper-1 \
>     -Dbigtable.instanceID=draper \
>     -Dgs=gs://promo-targeting-temporary/fuyangl

And I got this warning again as #2150 :

[main] WARN org.apache.beam.runners.dataflow.internal.CustomSources - Size estimation of the source failed: com.google.cloud.bigtable.beam.CloudBigtableIO$Source@710d7aff
com.google.bigtable.repackaged.io.grpc.StatusRuntimeException: UNAUTHENTICATED: Request had invalid authentication credentials. Expected OAuth 2 access token, login cookie or other valid authentication credential. See https://developers.google.com/identity/sign-in/web/devconsole-project.

(And it hangs there for a long time before it can submit a dataflow job...)

rahulKQL commented 5 years ago

@liufuyang Thanks for creating a fresh issue.

I was able to recreate it and for me, it took ~20mins to get the row count of 10K rows. Adding the complete stack trace mentioned in the earlier comment.

[main] WARN org.apache.beam.runners.dataflow.internal.CustomSources - Size estimation of the source failed: com.google.cloud.bigtable.beam.CloudBigtableIO$Source@989da1
com.google.bigtable.repackaged.io.grpc.StatusRuntimeException: UNAUTHENTICATED: Request had invalid authentication credentials. Expected OAuth 2 access token, login cookie or other valid authentication credential. See https://developers.google.com/identity/sign-in/web/devconsole-project.
        at com.google.bigtable.repackaged.io.grpc.Status.asRuntimeException(Status.java:521)
        at com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.async.AbstractRetryingOperation.getBlockingResult(AbstractRetryingOperation.java:446)
        at com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.BigtableDataGrpcClient.sampleRowKeys(BigtableDataGrpcClient.java:330)
        at com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.BigtableDataClientWrapper.sampleRowKeys(BigtableDataClientWrapper.java:148)
        at com.google.cloud.bigtable.batch.common.CloudBigtableServiceImpl.getSampleRowKeys(CloudBigtableServiceImpl.java:31)
        at com.google.cloud.bigtable.beam.CloudBigtableIO$AbstractSource.getSampleRowKeys(CloudBigtableIO.java:284)
        at com.google.cloud.bigtable.beam.CloudBigtableIO$Source.getSampleRowKeys(CloudBigtableIO.java:410)
        at com.google.cloud.bigtable.beam.CloudBigtableIO$Source.getEstimatedSizeBytes(CloudBigtableIO.java:472)
        at org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:77)
        at org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:51)
        at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:38)
        at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:35)
        at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:473)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:665)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:657)
        at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:317)
        at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:251)
        at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
        at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:412)
        at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:171)
        at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:735)
        at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:179)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
        at com.google.cloud.bigtable.dataflow.example.SourceRowCount.main(SourceRowCount.java:111)

I am trying to debug the cause of Unauthntication, would add more info here.

liufuyang commented 5 years ago

Even with correct project setup, my code above still give this Workflow failed. Causes: The Dataflow job appears to be stuck because no worker activity has been seen in the last 1h. log and couldn't get the count of rows of a bigtable.

However the official code at https://github.com/GoogleCloudPlatform/cloud-bigtable-examples/tree/master/java/dataflow-connector-examples does work, but it hangs for a long time before submit the job and giving this warning as mentioned in #2150

[main] WARN org.apache.beam.runners.dataflow.internal.CustomSources - Size estimation of the source failed: com.google.cloud.bigtable.beam.CloudBigtableIO$Source@710d7aff
com.google.bigtable.repackaged.io.grpc.StatusRuntimeException: UNAUTHENTICATED: Request had invalid authentication credentials. Expected OAuth 2 access token, login cookie or other valid authentication credential. See https://developers.google.com/identity/sign-in/web/devconsole-project.
    at com.google.bigtable.repackaged.io.grpc.Status.asRuntimeException(Status.java:521)
    at com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.async.AbstractRetryingOperation.getBlockingResult(AbstractRetryingOperation.java:446)
    at com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.BigtableDataGrpcClient.sampleRowKeys(BigtableDataGrpcClient.java:330)
    at com.google.bigtable.repackaged.com.google.cloud.bigtable.grpc.BigtableDataClientWrapper.sampleRowKeys(BigtableDataClientWrapper.java:148)
    at com.google.cloud.bigtable.batch.common.CloudBigtableServiceImpl.getSampleRowKeys(CloudBigtableServiceImpl.java:31)
    at com.google.cloud.bigtable.beam.CloudBigtableIO$AbstractSource.getSampleRowKeys(CloudBigtableIO.java:284)
    at com.google.cloud.bigtable.beam.CloudBigtableIO$Source.getSampleRowKeys(CloudBigtableIO.java:410)
    at com.google.cloud.bigtable.beam.CloudBigtableIO$Source.getEstimatedSizeBytes(CloudBigtableIO.java:472)
    at org.apache.beam.runners.dataflow.internal.CustomSources.serializeToCloudSource(CustomSources.java:78)
    at org.apache.beam.runners.dataflow.ReadTranslator.translateReadHelper(ReadTranslator.java:53)
    at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:40)
    at org.apache.beam.runners.dataflow.ReadTranslator.translate(ReadTranslator.java:37)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.visitPrimitiveTransform(DataflowPipelineTranslator.java:453)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
    at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator$Translator.translate(DataflowPipelineTranslator.java:392)
    at org.apache.beam.runners.dataflow.DataflowPipelineTranslator.translate(DataflowPipelineTranslator.java:170)
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:680)
    at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:174)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
    at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)

I am not exactly sure what is the problem with my own code. I copied most of the code from the example code, however I am using PipelineOptions, rather than DataflowPipelineOptions. Would that cause the issue of no worker activity has been seen in the last 1h.? 🤔

liufuyang commented 5 years ago

@rahulKQL Thanks for the input. Also, if you have time, it would be nice if you could also test update the beam version to <beam.version>2.13.0</beam.version>, then you might also encounter the Permission denied log for workers to read bigtable.

rahulKQL commented 5 years ago

@liufuyang I tried this example with <beam.version>2.13.0</beam.version>. However, I did not receive any error or warning related to permission( For background all of my tests were performed with resources in a single project).

And I am trying to run a workflow job on project A and connecting to a bigtable on project B Tried to setup the service account end also tired to use --serviceAccount option as mentioned here but I never get through the permission denied error...

I saw that in the comment that your resources are in two different projects. Could you please check if you have proper authorization in both the project?

liufuyang commented 5 years ago

@rahulKQL Did you run a workflow that read or write into a bigtable? Yes my authorization should be okay, as I have tested with the example code and it works. I can use the example code and upgrade the beam version see what happens.

rahulKQL commented 5 years ago

The test was reading from the bigtable. But I believe writing into the bigtable would also be fine.

liufuyang commented 5 years ago

@rahulKQL Yes, I tried again with the official code and beam version 2.13.0 and there was no issues. But if I use the code and pom.xml above I provided, I would get this issue, even with the same service account setup. However I guess I should probably just use the example code then.

sduskis commented 5 years ago

@liufuyang, can you provide the test of your pom.xml, pelase?

liufuyang commented 5 years ago

It's here https://pastebin.com/UmmihvYX

johnlhamilton commented 5 years ago

I believe I'm hitting the same issue trying to run a dataflow batch job that has a CloudBigtableIO source. I am using beam 2.11.0 and bigtable-hbase-beam 1.10.0. When deploying the job, it will log warnings like this and repeatedly retry:

10:59:51  19/07/08 15:59:50 WARN AbstractRetryingOperation: Retrying failed call. Failure #1, got: Status{code=UNAUTHENTICATED, description=Request had invalid authentication credentials. Expected OAuth 2 access token, login cookie or other valid authentication credential. See https://developers.google.com/identity/sign-in/web/devconsole-project., cause=null} on channel 1.

After 5 minutes it will time out with this warning:

WARNING: Size estimation of the source failed: com.google.cloud.bigtable.beam.CloudBigtableIO$Source@18ca3c62

There will be one timeout per bigtable source in my application. After that, it will successfully deploy the pipeline and will run without issue, though I'm left wondering if it successfully split the table reads.

We've been running similar dataflow batch jobs using the same mechanism (with the same bigtable and beam versions) for a while now and this just started happening last week.

It is also worth noting that this only happens when I use a service account. If I use my application default credentials, the deployment proceeds without errors. The service account I'm trying to use has the Owner role. The Dataflow job and Bigtable instance are both in the same GCP project.

igorbernstein2 commented 5 years ago

I'm sorry for the troubles you are having and thank you for the time you've spent debugging the issue. The issue is caused by an inconsistent configuration of the bigtable-batch.googleapis.com endpoint and gRPC's JWT support. I just added a workaround: #2187 that should fix the issue until we get it sorted on the serverside.