apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.02k stars 14.27k forks source link

DataprocCreateClusterOperator doesn't read softwareConfig properties as it should #38127

Closed Tchopane closed 3 months ago

Tchopane commented 7 months ago

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-google==10.14.0

Apache Airflow version

2.6.3

Operating System

PRETTY_NAME="Ubuntu 22.04.3 LTS" NAME="Ubuntu" VERSION_ID="22.04" VERSION="22.04.3 LTS (Jammy Jellyfish)" VERSION_CODENAME=jammy ID=ubuntu ID_LIKE=debian HOME_URL="https://www.ubuntu.com/" SUPPORT_URL="https://help.ubuntu.com/" BUG_REPORT_URL="https://bugs.launchpad.net/ubuntu/" PRIVACY_POLICY_URL="https://www.ubuntu.com/legal/terms-and-policies/privacy-policy" UBUNTU_CODENAME=jammy

Deployment

Google Cloud Composer

Deployment details

No response

What happened

DataprocCreateClusterOperator works well as long as you do not specify software_config.properties in cluster_config when creating a dataproc cluster. Indeed, since cluster_config is a template_field,

"properties": {
            "capacity-scheduler:yarn.scheduler.capacity.maximum-am-resource-percent": "0.5",
            "capacity-scheduler:yarn.scheduler.capacity.root.default.ordering-policy": "fair",
            ...
            "core:fs.gs.metadata.cache.enable": "false",
}

becomes

"properties": {
            "capacity-scheduler:yarn.scheduler.capacity.maximum-am-resource-percent": 0.5,
            "capacity-scheduler:yarn.scheduler.capacity.root.default.ordering-policy": "fair",
            ...
            "core:fs.gs.metadata.cache.enable": "false",
}

But here is what I get when the DataprocCreateClusterOperator gets executed:

Traceback (most recent call last):
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/dataproc.py", line 744, in execute
    operation = self._create_cluster(hook)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/dataproc.py", line 655, in _create_cluster
    return hook.create_cluster(
           ^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/common/hooks/base_google.py", line 482, in inner_wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/dataproc.py", line 327, in create_cluster
    result = client.create_cluster(
             ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/google/cloud/dataproc_v1/services/cluster_controller/client.py", line 606, in create_cluster
    request = clusters.CreateClusterRequest(request)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/proto/message.py", line 581, in __init__
    pb_value = marshal.to_proto(pb_type, value)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/proto/marshal/marshal.py", line 228, in to_proto
    pb_value = self.get_rule(proto_type=proto_type).to_proto(value)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/proto/marshal/rules/message.py", line 41, in to_proto
    return self._wrapper(value)._pb
           ^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/proto/message.py", line 581, in __init__
    pb_value = marshal.to_proto(pb_type, value)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/proto/marshal/marshal.py", line 228, in to_proto
    pb_value = self.get_rule(proto_type=proto_type).to_proto(value)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/proto/marshal/rules/message.py", line 41, in to_proto
    return self._wrapper(value)._pb
           ^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/proto/message.py", line 581, in __init__
    pb_value = marshal.to_proto(pb_type, value)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/proto/marshal/marshal.py", line 228, in to_proto
    pb_value = self.get_rule(proto_type=proto_type).to_proto(value)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/proto/marshal/rules/message.py", line 41, in to_proto
    return self._wrapper(value)._pb
           ^^^^^^^^^^^^^^^^^^^^
  File "/opt/python3.11/lib/python3.11/site-packages/proto/message.py", line 615, in __init__
    super().__setattr__("_pb", self._meta.pb(**params))
                               ^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen _collections_abc>", line 949, in update
TypeError: bad argument type for built-in operation

Because properties is meant to be a MutableMapping[str, str] according to the documentation

I dirty patched it using my own custom operator with

template_fields = tuple(field for field in DataprocCreateClusterOperator.template_fields if field != "cluster_config")

What you think should happen instead

String in properties should remain strings and not be converted into numerical values

How to reproduce

Create a DataprocCreateClusterOperator and use a config such as

{
    "gce_cluster_config": {
        "zone_uri": "https://www.googleapis.com/compute/v1/projects/dedge-stg-datafusion/zones/europe-west1-d",
        "subnetwork_uri": "https://www.googleapis.com/compute/v1/projects/dedge-stg-shared-backbone/regions/europe-west1/subnetworks/shared-default-subnet",
        "internal_ip_only": True,
        "service_account_scopes": [
            "https://www.googleapis.com/auth/cloud-platform",
            "https://www.googleapis.com/auth/cloud.useraccounts.readonly",
            "https://www.googleapis.com/auth/devstorage.read_write",
            "https://www.googleapis.com/auth/logging.write",
        ],
        "metadata": {
            "VmDnsSetting": "ZonalPreferred",
        },
        "shielded_instance_config": {
            "enable_secure_boot": False,
            "enable_vtpm": False,
            "enable_integrity_monitoring": False,
        }
    },
    "master_config": {
        "num_instances": 1,
        "image_uri": "https://www.googleapis.com/compute/v1/projects/cloud-dataproc/global/images/dataproc-2-0-deb10-20240306-155100-rc01",
        "machine_type_uri": "https://www.googleapis.com/compute/v1/projects/dedge-stg-datafusion/zones/europe-west1-d/machineTypes/e2-custom-2-8192",
        "disk_config": {
            "boot_disk_type": "pd-standard",
            "boot_disk_size_gb": 100
        },
        "preemptibility": "NON_PREEMPTIBLE",
        "min_cpu_platform": "AUTOMATIC",
    },
    "worker_config": {
        "num_instances": 2,
        "image_uri": "https://www.googleapis.com/compute/v1/projects/cloud-dataproc/global/images/dataproc-2-0-deb10-20240306-155100-rc01",
        "machine_type_uri": "https://www.googleapis.com/compute/v1/projects/dedge-stg-datafusion/zones/europe-west1-d/machineTypes/e2-custom-2-8192",
        "disk_config": {
            "boot_disk_type": "pd-standard",
            "boot_disk_size_gb": 200
        },
        "preemptibility": "NON_PREEMPTIBLE",
        "min_cpu_platform": "AUTOMATIC"
    },
    "software_config": {
        "image_version": "2.0.95-debian10",
        "properties": {
            "capacity-scheduler:yarn.scheduler.capacity.maximum-am-resource-percent": "0.5",
            "capacity-scheduler:yarn.scheduler.capacity.root.default.ordering-policy": "fair",
            "core:fs.gs.block.size": "134217728",
            "core:fs.gs.metadata.cache.enable": "false",
            "core:hadoop.ssl.enabled.protocols": "TLSv1,TLSv1.1,TLSv1.2",
            "dataproc:dataproc.allow.zero.workers": "true",
            "dataproc:dataproc.conscrypt.provider.enable": "false",
            "dataproc:dataproc.logging.stackdriver.enable": "false",
            "dataproc:dataproc.monitoring.stackdriver.enable": "false",
            "distcp:mapreduce.map.java.opts": "-Xmx576m",
            "distcp:mapreduce.map.memory.mb": "768",
            "distcp:mapreduce.reduce.java.opts": "-Xmx576m",
            "distcp:mapreduce.reduce.memory.mb": "768",
            "hadoop-env:HADOOP_DATANODE_OPTS": "-Xmx512m",
            "hdfs:dfs.datanode.address": "0.0.0.0:9866",
            "hdfs:dfs.datanode.http.address": "0.0.0.0:9864",
            "hdfs:dfs.datanode.https.address": "0.0.0.0:9865",
            "hdfs:dfs.datanode.ipc.address": "0.0.0.0:9867",
            "hdfs:dfs.namenode.handler.count": "20",
            "hdfs:dfs.namenode.http-address": "0.0.0.0:9870",
            "hdfs:dfs.namenode.https-address": "0.0.0.0:9871",
            "hdfs:dfs.namenode.secondary.http-address": "0.0.0.0:9868",
            "hdfs:dfs.namenode.secondary.https-address": "0.0.0.0:9869",
            "hdfs:dfs.namenode.service.handler.count": "10",
            "hive:hive.fetch.task.conversion": "none",
            "mapred-env:HADOOP_JOB_HISTORYSERVER_HEAPSIZE": "2048",
            "mapred:mapreduce.job.maps": "9",
            "mapred:mapreduce.job.reduce.slowstart.completedmaps": "0.95",
            "mapred:mapreduce.job.reduces": "3",
            "mapred:mapreduce.jobhistory.recovery.store.class": "org.apache.hadoop.mapreduce.v2.hs.HistoryServerLeveldbStateStoreService",
            "mapred:mapreduce.map.cpu.vcores": "1",
            "mapred:mapreduce.map.java.opts": "-Xmx2621m",
            "mapred:mapreduce.map.memory.mb": "3277",
            "mapred:mapreduce.reduce.cpu.vcores": "1",
            "mapred:mapreduce.reduce.java.opts": "-Xmx2621m",
            "mapred:mapreduce.reduce.memory.mb": "3277",
            "mapred:mapreduce.task.io.sort.mb": "256",
            "mapred:yarn.app.mapreduce.am.command-opts": "-Xmx2621m",
            "mapred:yarn.app.mapreduce.am.resource.cpu-vcores": "1",
            "mapred:yarn.app.mapreduce.am.resource.mb": "3277",
            "spark-env:SPARK_DAEMON_MEMORY": "2048m",
            "spark:spark.default.parallelism": "32",
            "spark:spark.driver.maxResultSize": "1024m",
            "spark:spark.driver.memory": "2048m",
            "spark:spark.executor.cores": "1",
            "spark:spark.executor.instances": "2",
            "spark:spark.executor.memory": "2893m",
            "spark:spark.executorEnv.OPENBLAS_NUM_THREADS": "1",
            "spark:spark.metrics.namespace": "spark",
            "spark:spark.scheduler.mode": "FAIR",
            "spark:spark.sql.adaptive.coalescePartitions.initialPartitionNum": "128",
            "spark:spark.sql.cbo.enabled": "true",
            "spark:spark.ui.port": "0",
            "spark:spark.yarn.am.memory": "640m",
            "yarn-env:YARN_NODEMANAGER_HEAPSIZE": "819",
            "yarn-env:YARN_RESOURCEMANAGER_HEAPSIZE": "2048",
            "yarn-env:YARN_TIMELINESERVER_HEAPSIZE": "2048",
            "yarn:yarn.nodemanager.address": "0.0.0.0:8026",
            "yarn:yarn.nodemanager.delete.debug-delay-sec": "86400",
            "yarn:yarn.nodemanager.pmem-check-enabled": "false",
            "yarn:yarn.nodemanager.resource.cpu-vcores": "2",
            "yarn:yarn.nodemanager.resource.memory-mb": "6554",
            "yarn:yarn.nodemanager.vmem-check-enabled": "false",
            "yarn:yarn.resourcemanager.decommissioning-nodes-watcher.decommission-if-no-shuffle-data": "true",
            "yarn:yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs": "86400",
            "yarn:yarn.scheduler.maximum-allocation-mb": "6554",
            "yarn:yarn.scheduler.minimum-allocation-mb": "1"
        }
    }
}

Anything else

No response

Are you willing to submit PR?

Code of Conduct

boring-cyborg[bot] commented 7 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

kevgeo commented 7 months ago

Hi @Tchopane, I was trying to reproduce your issue, both in airflow composer and locally from dev environment with the following config where I copied your software_config values.

CLUSTER_CONFIG = {
    "master_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "worker_config": {
        "num_instances": 2,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 32},
    },
    "secondary_worker_config": {
        "num_instances": 1,
        "machine_type_uri": "n1-standard-4",
        "disk_config": {
            "boot_disk_type": "pd-standard",
            "boot_disk_size_gb": 32,
        },
        "is_preemptible": True,
        "preemptibility": "PREEMPTIBLE",
    },
    "software_config": {
        "image_version": "2.0.95-debian10",
        "properties": {
            "capacity-scheduler:yarn.scheduler.capacity.maximum-am-resource-percent": "0.5",
            "capacity-scheduler:yarn.scheduler.capacity.root.default.ordering-policy": "fair",
            "core:fs.gs.block.size": "134217728",
            "core:fs.gs.metadata.cache.enable": "false",
            "core:hadoop.ssl.enabled.protocols": "TLSv1,TLSv1.1,TLSv1.2",
            "dataproc:dataproc.allow.zero.workers": "true",
            "dataproc:dataproc.conscrypt.provider.enable": "false",
            "dataproc:dataproc.logging.stackdriver.enable": "false",
            "dataproc:dataproc.monitoring.stackdriver.enable": "false",
            "distcp:mapreduce.map.java.opts": "-Xmx576m",
            "distcp:mapreduce.map.memory.mb": "768",
            "distcp:mapreduce.reduce.java.opts": "-Xmx576m",
            "distcp:mapreduce.reduce.memory.mb": "768",
            "hadoop-env:HADOOP_DATANODE_OPTS": "-Xmx512m",
            "hdfs:dfs.datanode.address": "0.0.0.0:9866",
            "hdfs:dfs.datanode.http.address": "0.0.0.0:9864",
            "hdfs:dfs.datanode.https.address": "0.0.0.0:9865",
            "hdfs:dfs.datanode.ipc.address": "0.0.0.0:9867",
            "hdfs:dfs.namenode.handler.count": "20",
            "hdfs:dfs.namenode.http-address": "0.0.0.0:9870",
            "hdfs:dfs.namenode.https-address": "0.0.0.0:9871",
            "hdfs:dfs.namenode.secondary.http-address": "0.0.0.0:9868",
            "hdfs:dfs.namenode.secondary.https-address": "0.0.0.0:9869",
            "hdfs:dfs.namenode.service.handler.count": "10",
            "hive:hive.fetch.task.conversion": "none",
            "mapred-env:HADOOP_JOB_HISTORYSERVER_HEAPSIZE": "2048",
            "mapred:mapreduce.job.maps": "9",
            "mapred:mapreduce.job.reduce.slowstart.completedmaps": "0.95",
            "mapred:mapreduce.job.reduces": "3",
            "mapred:mapreduce.jobhistory.recovery.store.class": "org.apache.hadoop.mapreduce.v2.hs.HistoryServerLeveldbStateStoreService",
            "mapred:mapreduce.map.cpu.vcores": "1",
            "mapred:mapreduce.map.java.opts": "-Xmx2621m",
            "mapred:mapreduce.map.memory.mb": "3277",
            "mapred:mapreduce.reduce.cpu.vcores": "1",
            "mapred:mapreduce.reduce.java.opts": "-Xmx2621m",
            "mapred:mapreduce.reduce.memory.mb": "3277",
            "mapred:mapreduce.task.io.sort.mb": "256",
            "mapred:yarn.app.mapreduce.am.command-opts": "-Xmx2621m",
            "mapred:yarn.app.mapreduce.am.resource.cpu-vcores": "1",
            "mapred:yarn.app.mapreduce.am.resource.mb": "3277",
            "spark-env:SPARK_DAEMON_MEMORY": "2048m",
            "spark:spark.default.parallelism": "32",
            "spark:spark.driver.maxResultSize": "1024m",
            "spark:spark.driver.memory": "2048m",
            "spark:spark.executor.cores": "1",
            "spark:spark.executor.instances": "2",
            "spark:spark.executor.memory": "2893m",
            "spark:spark.executorEnv.OPENBLAS_NUM_THREADS": "1",
            "spark:spark.metrics.namespace": "spark",
            "spark:spark.scheduler.mode": "FAIR",
            "spark:spark.sql.adaptive.coalescePartitions.initialPartitionNum": "128",
            "spark:spark.sql.cbo.enabled": "true",
            "spark:spark.ui.port": "0",
            "spark:spark.yarn.am.memory": "640m",
            "yarn-env:YARN_NODEMANAGER_HEAPSIZE": "819",
            "yarn-env:YARN_RESOURCEMANAGER_HEAPSIZE": "2048",
            "yarn-env:YARN_TIMELINESERVER_HEAPSIZE": "2048",
            "yarn:yarn.nodemanager.address": "0.0.0.0:8026",
            "yarn:yarn.nodemanager.delete.debug-delay-sec": "86400",
            "yarn:yarn.nodemanager.pmem-check-enabled": "false",
            "yarn:yarn.nodemanager.resource.cpu-vcores": "2",
            "yarn:yarn.nodemanager.resource.memory-mb": "6554",
            "yarn:yarn.nodemanager.vmem-check-enabled": "false",
            "yarn:yarn.resourcemanager.decommissioning-nodes-watcher.decommission-if-no-shuffle-data": "true",
            "yarn:yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs": "86400",
            "yarn:yarn.scheduler.maximum-allocation-mb": "6554",
            "yarn:yarn.scheduler.minimum-allocation-mb": "1"
        }
    }
}

I am not getting any error when creating the Dataproc cluster. Also when looking at the underlying code logic of the operator, I do not see a reason why the string values would be converted to int.

Please let me know if i have correctly understood the issue.

github-actions[bot] commented 5 months ago

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] commented 3 months ago

This issue has been closed because it has not received response from the issue author.