This repo contains a reference implementation for an end-to-end data tokenization solution. This solution does the following:
This solution comprises the following pipelines. To view the job graphs of these pipelines, see Dataflow DAG.
You can use this pipeline for Avro, CSV, JSONL, ORC, Parquet and TSV files stored or ingested in Cloud Storage or an Amazon S3 bucket. This pipeline uses the State and Timer APIs to batch and process the files optimally. The results of the inspection and de-identification processes are written in a BigQuery table.
The re-identification pipeline is used to read data from a BigQuery table and publish the re-identified data to a secure Pub/Sub topic.
The following resources describe key concepts related to the DLP API and Dataflow. The DLP API is part of Sensitive Data Protection.
This tutorial uses billable components of Google Cloud, including the following:
Use the pricing calculator to generate a cost estimate based on your projected usage.
Make sure that billing is enabled for your Google Cloud project.
Use the link below to open Cloud Shell.
Run the following commands to set up the data tokenization solution in your Google Cloud project:
gcloud config set project <project_id>
sh setup-data-tokeninzation-solution-v2.sh
The setup-data-tokenization-solution-v2.sh
script performs the following tasks:
Creates a bucket ({project-id}-demo-data) in the us-central1 region and uploads a sample dataset with mock PII data.
Creates a BigQuery dataset (demo_dataset) in the US multi-region to store the tokenized data.
Creates a KMS-wrapped key (KEK) by creating an automatic TEK (token encryption key).
Creates inspection, de-identification, and re-identification templates with the KEK and crypto-based transformations.
Creates a service account (with a custom role) for running the DLP API pipeline.
Enables Pub/Sub and creates a Pub/Sub topic that will be notified whenever a new file is added in the Cloud Storage bucket.
Emits a set_env.sh
script, which you can use to set temporary environment variables while triggering the DLP API pipelines.
Run set_env.sh:
source set_env.sh
./gradlew build
You can run the inspection pipeline to do one or both of the following:
To trigger a streaming inspection Dataflow pipeline that processes all the CSV files in the DATA_STORAGE_BUCKET
bucket (specified in the filePattern
parameter), run the following command:
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \
-Pargs=" --region=${REGION} \
--project=${PROJECT_ID} \
--streaming --enableStreamingEngine \
--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \
--numWorkers=1 --maxNumWorkers=2 \
--runner=DataflowRunner \
--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \
--dataset=${BQ_DATASET_NAME} \
--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \
--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \
--batchSize=200000 \
--DLPMethod=INSPECT \
--serviceAccount=${SERVICE_ACCOUNT_EMAIL} \
--gcsNotificationTopic=projects/${PROJECT_ID}/topics/${GCS_NOTIFICATION_TOPIC}"
To trigger a batch inspection Dataflow pipeline that processes only the CSV files that are currently present in the DATA_STORAGE_BUCKET
bucket (specified in the filePattern
parameter), run the following command:
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \
-Pargs=" --region=${REGION} \
--project=${PROJECT_ID} \
--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \
--numWorkers=1 --maxNumWorkers=2 \
--runner=DataflowRunner \
--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \
--dataset=${BQ_DATASET_NAME} \
--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \
--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \
--batchSize=200000 \
--DLPMethod=INSPECT \
--serviceAccount=${SERVICE_ACCOUNT_EMAIL}"
Any files that are added to the bucket after the pipeline was triggered are not inspected.
To trigger a streaming inspection Dataflow pipeline that processes new CSV files that are added to the DATA_STORAGE_BUCKET
bucket (specified in the filePattern
parameter), run the following command:
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \
-Pargs=" --region=${REGION} \
--project=${PROJECT_ID} \
--streaming --enableStreamingEngine \
--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \
--numWorkers=1 --maxNumWorkers=2 \
--runner=DataflowRunner \
--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \
--dataset=${BQ_DATASET_NAME} \
--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \
--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \
--batchSize=200000 \
--DLPMethod=INSPECT \
--serviceAccount=${SERVICE_ACCOUNT_EMAIL} \
--gcsNotificationTopic=projects/${PROJECT_ID}/topics/${GCS_NOTIFICATION_TOPIC}
--processExistingFiles=false"
Any files that were added to the bucket before the pipeline was triggered are not inspected.
You can find the inspection results in the BigQuery dataset specified in the dataset
parameter.
If --processExistingFiles
is set to false
and --gcsNotificationTopic
is not provided, then the pipeline fails with an error similar to the following:
Exception in thread "main" java.lang.IllegalArgumentException: Either --processExistingFiles should be set to true or --gcsNotificationTopic should be provided
at com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2.runInspectAndDeidPipeline(DLPTextToBigQueryStreamingV2.java:113)
at com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2.run(DLPTextToBigQueryStreamingV2.java:93)
at com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2.main(DLPTextToBigQueryStreamingV2.java:84)
For information about the default values of the pipeline parameters, see Pipeline parameters on this page.
You can run the de-identification pipeline to do one or both of the following:
To trigger a streaming de-identification Dataflow pipeline that processes all the CSV files in the DATA_STORAGE_BUCKET
bucket (specified in the filePattern
parameter), run the following command:
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \
-Pargs=" --region=${REGION} \
--project=${PROJECT_ID} \
--streaming --enableStreamingEngine \
--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \
--numWorkers=2 --maxNumWorkers=3 \
--runner=DataflowRunner \
--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \
--dataset=${BQ_DATASET_NAME} \
--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \
--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \
--batchSize=200000 \
--DLPMethod=DEID \
--serviceAccount=${SERVICE_ACCOUNT_EMAIL} \
--gcsNotificationTopic=projects/${PROJECT_ID}/topics/${GCS_NOTIFICATION_TOPIC}"
To trigger a batch de-identification Dataflow pipeline that processes only the CSV files that are currently present in the DATA_STORAGE_BUCKET
bucket (specified in the filePattern
parameter), run the following command:
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \
-Pargs=" --region=${REGION} \
--project=${PROJECT_ID} \
--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \
--numWorkers=1 --maxNumWorkers=2 \
--runner=DataflowRunner \
--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \
--dataset=${BQ_DATASET_NAME} \
--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \
--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \
--batchSize=200000 \
--DLPMethod=DEID \
--serviceAccount=${SERVICE_ACCOUNT_EMAIL}"
Any files that are added to the bucket after the pipeline was triggered are not de-identified.
To trigger a streaming de-identification Dataflow pipeline that processes new CSV files that are added to the DATA_STORAGE_BUCKET
bucket (specified in the filePattern
parameter), run the following command:
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \
-Pargs=" --region=${REGION} \
--project=${PROJECT_ID} \
--streaming --enableStreamingEngine \
--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \
--numWorkers=1 --maxNumWorkers=2 \
--runner=DataflowRunner \
--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \
--dataset=${BQ_DATASET_NAME} \
--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \
--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \
--batchSize=200000 \
--DLPMethod=INSPECT \
--serviceAccount=${SERVICE_ACCOUNT_EMAIL} \
--gcsNotificationTopic=projects/${PROJECT_ID}/topics/${GCS_NOTIFICATION_TOPIC} \
--processExistingFiles=false"
Any files that were added to the bucket before the pipeline was triggered are not de-identified.
You can find the de-identification findings in the BigQuery dataset, specified in the dataset
parameter, with the same names as the respective input files.
If --processExistingFiles
is set to false
and --gcsNotificationTopic
is not provided, then the pipeline fails with an error similar to the following:
Exception in thread "main" java.lang.IllegalArgumentException: Either --processExistingFiles should be set to true or --gcsNotificationTopic should be provided
at com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2.runInspectAndDeidPipeline(DLPTextToBigQueryStreamingV2.java:113)
at com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2.run(DLPTextToBigQueryStreamingV2.java:93)
at com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2.main(DLPTextToBigQueryStreamingV2.java:84)
For information about the default values of the pipeline parameters, see Pipeline parameters on this page.
You can run some quick validations for a BigQuery table to check the tokenized data. The following steps use a sample dataset to validate de-identified results:
In Cloud Shell, display the header row of the CSV file that you used to create the schema:
gsutil cp gs://${PROJECT_ID}-demo-data/CCRecords_1564602825.csv - | head -1
In the query editor of SQL workspace, run the following query to compare the schema with the header row of the CSV file:
SELECT table_name, column_name
FROM `<dataset_id>.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name="CCRecords_1564602825"
There are no spaces in the column names because the pipeline ensures that the column and table names only contain valid characters according to the BigQuery naming standard.
Validate that the number of rows in the input file and the output table are equal:
Input records:
echo $(($(gcloud storage cat gs://${PROJECT_ID}-demo-data/CCRecords_1564602825.csv | wc -l) - 1))
Output records:
SELECT COUNT(*) as number_of_rows
FROM `<dataset_id>.CCRecords_1564602825` LIMIT 1
Validate that the bucketing transformation is successfully applied to the JobTitle column:
SELECT JobTitle, COUNT(*) AS number_of_records_found
FROM `<dataset_id>.CCRecords_1564602825`
GROUP BY JobTitle
In the output, the JobTitle values should be grouped into three generalized buckets: Executive, Engineer, and Manager.
Validate that values in the Age column are grouped into six different buckets from 60 to 20:
SELECT Age, COUNT(*) AS number_of_records_found
FROM `<dataset_id>.CCRecords_1564602825`
GROUP BY Age
ORDER BY Age DESC
Validate the masking transformation for the SSN:
SELECT SSN
FROM `<dataset_id>.CCRecords_*`
WHERE REGEXP_CONTAINS(SSN, "@*")
In the output, the first five digits for all SSN entries should be masked.
Validate that the cryptographic transformation used deterministic encryption for the card_holders_name
,
card_number
and card_pin
entries:
SELECT Additional_Details
FROM `<dataset_id>.CCRecords_*`
WHERE REGEXP_CONTAINS(Additional_Details, r'(IBAN_CODE+)') or REGEXP_CONTAINS(Additional_Details, r'(EMAIL_ADDRESS+)')or regexp_contains(Additional_Details, r'(PHONE_NUMBER+)')or regexp_contains(Additional_Details, r'(ONLINE_USER_ID+)')
In the output, sensitive values should be replaced with placeholder values such as [IBAN_CODE]
, [EMAIL_ADDRESS]
,
[PHONE_NUMBER,]
and [ONLINE_USER_ID]
.
Query the de-identified copies of the dataset for the ID 76901:
SELECT * FROM `<dataset_id>.CCRecords_1564602825` WHERE ID='76901'
In Cloud Shell, compare the output from the previous step with the original dataset in the CSV file for the ID 76901:
gsutil cp gs://${PROJECT_ID}-demo-data/CCRecords_1564602825.csv - | awk -F "," '$1 == 76901'
Export a SQL query to read and re-identify data from BigQuery. The sample provided below selects 10 records that match the query.
export QUERY="select ID,Card_Number,Card_Holders_Name from \`${PROJECT_ID}.${BQ_DATASET_NAME}.CCRecords_1564602828\` where safe_cast(Credit_Limit as int64)>100000 and safe_cast (Age as int64)>50 group by ID,Card_Number,Card_Holders_Name limit 10"
Create a Cloud Storage file with the following query:
export REID_QUERY_BUCKET=<name>
cat << EOF | gsutil cp - gs://${REID_QUERY_BUCKET}/reid_query.sql
${QUERY}
EOF
Create a Pub/Sub topic. For more information, see create a topic.
Run the pipeline by passing the required parameters:
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2
-Pargs="--region=${REGION}
--project=${PROJECT_ID}
--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp
--numWorkers=1 --maxNumWorkers=2
--runner=DataflowRunner
--tableRef=${PROJECT_ID}:${BQ_DATASET_NAME}.<table>
--dataset=${BQ_DATASET_NAME}
--topic=projects/${PROJECT_ID}/topics/<topic_name>
--autoscalingAlgorithm=THROUGHPUT_BASED
--workerMachineType=n1-highmem-4
--deidentifyTemplateName=${REID_TEMPLATE_NAME}
--DLPMethod=REID
--keyRange=1024
--queryPath=gs://${REID_QUERY_BUCKET}/reid_query.sql
--serviceAccount=${SERVICE_ACCOUNT_EMAIL}"
This command triggers a batch re-identification Dataflow pipeline that processes all the records from the query
stored in reid_query.sql
. The re-identified results can be found in the BigQuery
dataset (dataset
parameter) with the name of the input table as the suffix.
Pipeline Option | Description | Used in Operations |
---|---|---|
region |
Specifies a regional endpoint for deploying your Dataflow jobs. | All |
project |
The project ID for your Google Cloud project. | All |
streaming |
true if streaming pipeline. |
INSPECT/DEID |
enableStreamingEngine |
Specifies whether Dataflow Streaming Engine is enabled or disabled. | INSPECT/DEID |
tempLocation |
Cloud Storage path for temporary files. Must be a valid Cloud Storage URL. | All |
numWorkers |
(Optional) The initial number of Compute Engine instances to use when executing your pipeline. This option determines how many workers the Dataflow service starts up when your job begins. | All |
maxNumWorkers |
(Optional) The maximum number of Compute Engine instances to be made available to your pipeline during execution. This value can be higher than the initial number of workers (specified by numWorkers) to allow your job to scale up, automatically or otherwise. | All |
runner |
DataflowRunner | All |
inspectTemplateName |
DLP API inspect template name. | INSPECT/DEID |
filePattern |
The file pattern that will be used to scan the files to be processed by the job. | INSPECT/DEID |
deidentifyTemplateName |
DLP de-identify template name. | All |
DLPMethod |
Type of DLP operation to perform. Valid values are INSPECT , DEID , or REID . |
All |
processExistingFiles |
Files existing in the Cloud Storage bucket before the pipeline is started will not be processed when the value is set to false. The default value is true. | INSPECT/DEID |
gcsNotificationTopic |
Pub/Sub topic for notifications of new files added to the Cloud Storage bucket in filePattern. Format: projects/$PROJECT_ID>/topics/$GCS_TOPIC_NAME. | INSPECT/DEID |
batchSize |
(Optional) Batch size for the DLP API. The default is 500,000. | All |
dataset |
BigQuery dataset to write the inspection or de-identification results to or to read from in case of re-identification. | All |
recordDelimiter |
(Optional) Record delimiter. | INSPECT/DEID |
columnDelimiter |
Column delimiter. Only required in case of a custom delimiter. | INSPECT/DEID |
tableRef |
BigQuery table to export from in the form <project>:<dataset>.<table> . |
REID |
queryPath |
Query file for re-identification. | REID |
headers |
DLP table headers. Required for the JSONL file type. | INSPECT/DEID |
numShardsPerDLPRequestBatching |
(Optional) Number of shards for DLP request batches. Can be used to control the parallelism of DLP requests. The default value is 100. | All |
numberOfWorkerHarnessThreads |
(Optional) The number of threads per each worker harness process. | All |
dlpApiRetryCount |
(Optional) Number of retries in case of transient errors in DLP API. The default value is 10. | All |
initialBackoff |
(Optional) Initial backoff (in seconds) for retries with exponential backoff. The default is 5s. | All |
outputBucket |
GCS path for storing the deidentified files | DEID |
DLPParent |
(Optional) The resource location for DLP templates. Format: projects/<project-id>/locations/<region> . By default,the location will be global. |
ALL |
For more details, see Dataflow Pipeline Options.
For sample commands for processing CSV files, see Run the samples.
The pipeline supports the TSV file format which uses tabs as column delimiters. The pipeline options are similar to that of CSV files. The extension of the file name should be .tsv.
./gradlew run ... -Pargs="... --filePattern=gs://<bucket_name>/small_file.tsv"
The pipeline supports JSONL file format where each line is a valid JSON object and newline characters separate JSON objects. For a sample file, see the test resources. To run the pipeline for JSONL files, the list of comma-separated headers also needs to be passed in the pipeline options.
./gradlew run ... -Pargs="... --filePattern=gs://${PROJECT_ID}-demo-data/CCRecords_sample.jsonl --headers=<comma_separated_list_of_headers>"
The original schema of the input file will not be preserved. This solution simplifies the data when it converts the data to a DLP API request object.
The pipeline handles Avro files similarly to how it handles CSV files. No additional changes are required to run
the pipeline except updating the --filePattern
parameter. For example:
./gradlew run ... -Pargs="... --filePattern=gs://${PROJECT_ID}-demo-data/*.avro"
The original schema of the input file will not be preserved. This solution simplifies the data when it converts the data to a DLP API request object.
The pipeline supports CSV files with a custom delimiter. The delimiter has to be passed in the pipeline option as --columnDelimiter
.
./gradlew run ... -Pargs="... --columnDelimiter=|"
The inspection and de-identification pipelines support Parquet file format where data can be read from GCS storage bucket and the results of DLP Dataflow pipeline will be written in BigQuery tables. For sample data in Parquet file format, refer to mock-data.
No additional changes are required to run the pipeline except updating the --filePattern
parameter. For example:
./gradlew run ... -Pargs="... --filePattern=gs://${PROJECT_ID}-demo-data/*.parquet"
The original schema of the input file will not be preserved. This solution simplifies the data when it converts the data to a DLP API request object.
The inspection pipeline will read ORC files from an input Cloud Storage bucket and the results will be written in BigQuery tables.
./gradlew run ... -Pargs="... --filePattern=gs://${PROJECT_ID}-demo-data/*.orc"
The de-identification pipeline will read the data from an input Cloud Storage bucket. The de-identified results can be written in a BigQuery dataset as tables or an output Cloud Storage bucket as ORC files.
./gradlew run ... -Pargs="... --filePattern=gs://${PROJECT_ID}-demo-data/*.orc"
./gradlew run ... -Pargs="...
--filePattern=gs://${PROJECT_ID}-demo-data/*.orc \
--outputBucket=<output_storage_bucket> ..."
In the above command, replace output_storage_bucket with the URI of the Cloud Storage bucket where you want to store the de-identified ORC files. The de-identification pipeline allows input files with varying schemas to be processed in the same pipeline.
Currently, this solution can process only primitive data types available in ORC format when the results are stored in an output Cloud Storage bucket.
For sample data in ORC file format, refer to mock-data.
To use Amazon S3 as a source of input files, use AWS credentials as instructed below.
Export the AWS access key, secret key, and credentials provider to your environment variables.
export AWS_ACCESS_KEY="<access_key>"
export AWS_SECRET_KEY="<secret_key>"
export AWS_CRED="{\"@type\":\"AWSStaticCredentialsProvider\",\"awsAccessKeyId\":\"${AWS_ACCESS_KEY}\",\"awsSecretKey\":\"${AWS_SECRET_KEY}\"}"
Use Gradle to build and run the job to perform DLP operations on a CSV file stored in Amazon S3. The results will be written to BigQuery.
Update the filePattern
and awsRegion
parameters with appropriate values in the following command.
./gradlew build
// inspect is default as DLP Method; For deid: --DLPMethod=DEID
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 -Pargs="
--region=${REGION}
--project=${PROJECT_ID}
--streaming --enableStreamingEngine
--tempLocation=gs://${PROJECT_ID}-demo-data/temp
--numWorkers=1 --maxNumWorkers=2
--runner=DataflowRunner
--filePattern=s3://<bucket>/file.csv
--dataset=${BQ_DATASET_NAME}
--inspectTemplateName=${INSPECT_TEMPLATE_NAME}
--deidentifyTemplateName=${DEID_TEMPLATE_NAME}
--awsRegion=<aws_region>
--awsCredentialsProvider=${AWS_CRED}
--serviceAccount=$SERVICE_ACCOUNT_EMAIL"
--awsRegion: The region where the AWS resources reside.
--awsCredentialsProvider: The AWS credentials provider.
The pipeline offers GCS as a sink for the DEID workflow. Currently, it supports the deidentification of CSV files and outputs the files in CSV format.
Following is the command to deidentify existing CSV files. Update outputBucket
parameter with the correct GCS path for storing the deidentified files.
./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \
-Pargs=" --region=${REGION} \
--project=${PROJECT_ID} \
--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \
--numWorkers=1 --maxNumWorkers=2 \
--runner=DataflowRunner \
--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \
--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \
--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \
--batchSize=200000 \
--DLPMethod=DEID \
--outputBucket=gs://${DATA_STORAGE_BUCKET}/output \
--serviceAccount=${SERVICE_ACCOUNT_EMAIL}"
The DLP templates used in this tutorial are specifically tailored for inspecting, de-identifying, and re-identifying sample data containing simulated personally identifiable information (PII). It is important to note that when working with your data, you should create custom DLP templates that align with the characteristics of the data being processed. For information about creating your templates, see the following:
Cloud Shell comes with a pre-configured development environment that includes many of the most popular tools and libraries, such as the Google Cloud CLI (gcloud CLI), Gradle, Java, Git, Vim, and more. This means that you don't have to spend time setting up your own environment on your local machine. If you want to run this solution from your own machine, follow these steps:
gcloud config set project <project_id>
gcloud auth application-default login
gcloud auth application-default set-quota-project <project_id>
sh setup-data-tokeninzation-solution-v2.sh
source set_env.sh
This section provides a minimal list of tools and libraries along with their recommended versions that can be used to run this pipeline outside Google Shell.
./gradlew build
The following are issues you might encounter while running the pipeline, and the ways you can avoid or recover from them.
Duplicate rows: When writing data to a BigQuery table, Cloud DLP might write duplicate rows.
The project uses the Streaming Inserts API of BigQuery, which by default, enables a best-effort deduplication mechanism. For a possible solution, see High number of duplicates in Dataflow pipeline streaming inserts to BigQuery.
Errors in the transformation process. You can view the detailed errors in worker logs for the PTransform.
INVALID_ARGUMENT: Too many findings to de-identify. Retry with a smaller request.
DLP has a max finding per request limit of 3000. Run the pipeline again with a smaller batch size.
RESOURCE_EXHAUSTED: Quota exceeded for quota metric 'Number of requests' and limit 'Number of requests per minute' of service 'dlp.googleapis.com'
This can happen if the Dataflow pipeline is being run with a small batch size. Re-run the pipeline with a larger batch size.
If increasing the batch size is not possible or if the issue persists even after reaching the maximum batch size, consider trying one of the following options:
The pipeline incorporates a retry mechanism for DLP API calls, utilizing an exponential delay approach. To enhance the retry behavior, you can adjust the value of the dlpApiRetryCount
parameter. For more information, see the entries for dlpApiRetryCount
and initialBackoff
in Pipeline parameters.
The pipeline includes a parameter called numShardsPerDLPRequestBatching
. Decreasing this value below the default (100) will result in a lower number of concurrent requests sent to DLP.
Verify if any other pipelines or clients are generating DLP API requests.
Consider submitting a request to increase the quota limit for Sensitive Data Protection.