yandex-cloud / python-sdk

Yandex.Cloud Python SDK
MIT License
83 stars 26 forks source link

Yandexcloud 0.289.0 breaks Airflow dataproc tests for yandex provider #103

Closed potiuk closed 4 months ago

potiuk commented 4 months ago

The new provider seems to break dataproc tests for airflow and it looks like internally breaking change.

Example failing tests: https://github.com/apache/airflow/actions/runs/9311110663/job/25629906993

__________ TestYandexCloudDataprocHook.test_create_mapreduce_job_hook __________

self = <tests.providers.yandex.hooks.test_dataproc.TestYandexCloudDataprocHook object at 0x7fabf9471760>
mock_create_operation = <MagicMock name='create_operation_and_get_result' id='140371307643232'>

    @mock.patch("yandexcloud.SDK.create_operation_and_get_result")
    def test_create_mapreduce_job_hook(self, mock_create_operation):
        self._init_hook()

>       self.hook.client.create_mapreduce_job(
            archive_uris=None,
            args=[
                "-mapper",
                "mapper.py",
                "-reducer",
                "reducer.py",
                "-numReduceTasks",
                "1",
                "-input",
                "s3a://some-in-bucket/jobs/sources/data/cities500.txt.bz2",
                "-output",
                "s3a://some-out-bucket/dataproc/job/results",
            ],
            cluster_id="my_cluster_id",
            file_uris=[
                "s3a://some-in-bucket/jobs/sources/mapreduce-001/mapper.py",
                "s3a://some-in-bucket/jobs/sources/mapreduce-001/reducer.py",
            ],
            jar_file_uris=None,
            main_class="org.apache.hadoop.streaming.HadoopStreaming",
            main_jar_file_uri=None,
            name="Mapreduce job",
            properties={
                "yarn.app.mapreduce.am.resource.mb": "2048",
                "yarn.app.mapreduce.am.command-opts": "-Xmx2048m",
                "mapreduce.job.maps": "6",
            },
        )

tests/providers/yandex/hooks/test_dataproc.py:109: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <yandexcloud._wrappers.dataproc.Dataproc object at 0x7faad0082490>
main_class = 'org.apache.hadoop.streaming.HadoopStreaming'
main_jar_file_uri = None, jar_file_uris = None, archive_uris = None
file_uris = ['s3a://some-in-bucket/jobs/sources/mapreduce-001/mapper.py', 's3a://some-in-bucket/jobs/sources/mapreduce-001/reducer.py']
args = ['-mapper', 'mapper.py', '-reducer', 'reducer.py', '-numReduceTasks', '1', ...]
properties = {'mapreduce.job.maps': '6', 'yarn.app.mapreduce.am.command-opts': '-Xmx2048m', 'yarn.app.mapreduce.am.resource.mb': '2048'}
cluster_id = 'my_cluster_id', name = 'Mapreduce job'

    def create_mapreduce_job(
        self,
        main_class=None,
        main_jar_file_uri=None,
        jar_file_uris=None,
        archive_uris=None,
        file_uris=None,
        args=None,
        properties=None,
        cluster_id=None,
        name="Mapreduce job",
    ):
        """
        Run Mapreduce job in Yandex.Cloud Data Proc cluster.

        :param main_jar_file_uri: URI of jar file with job.
                                  Can be placed in HDFS or S3. Can be specified instead of main_class.
        :type main_class: str
        :param main_class: Name of the main class of the job. Can be specified instead of main_jar_file_uri.
        :type main_class: str
        :param file_uris: URIs of files used in the job. Can be placed in HDFS or S3.
        :type file_uris: List[str]
        :param archive_uris: URIs of archive files used in the job. Can be placed in HDFS or S3.
        :type archive_uris: List[str]
        :param jar_file_uris: URIs of JAR files used in the job. Can be placed in HDFS or S3.
        :type archive_uris: List[str]
        :param properties: Properties for the job.
        :type properties: Dist[str, str]
        :param args: Arguments to be passed to the job.
        :type args: List[str]
        :param cluster_id: ID of the cluster to run job in.
                           Will try to take the ID from Dataproc Hook object if ot specified.
        :type cluster_id: str
        :param name: Name of the job. Used for labeling.
        :type name: str
        """
        cluster_id = cluster_id or self.cluster_id
        if not cluster_id:
            raise RuntimeError("Cluster id must be specified.")
        self.log.info("Running Mapreduce job. Cluster ID: %s", cluster_id)

        request = job_service_pb.CreateJobRequest(
            cluster_id=cluster_id,
            name=name,
            mapreduce_job=job_pb.MapreduceJob(
                main_class=main_class,
                main_jar_file_uri=main_jar_file_uri,
                jar_file_uris=jar_file_uris,
                archive_uris=archive_uris,
                file_uris=file_uris,
                args=args,
                properties=properties,
            ),
        )
>       return self.sdk.create_operation_and_get_result(
            request,
            service=job_service_grpc_pb.JobServiceStub,
            method_name="Create",
            response_type=job_pb.Job,
            meta_type=job_service_pb.CreateJobMetadata,
        )
E       AttributeError: 'NoneType' object has no attribute 'create_operation_and_get_result'

/usr/local/lib/python3.8/site-packages/yandexcloud/_wrappers/dataproc/__init__.py:593: AttributeError
PeppaTheC commented 4 months ago

Hi @potiuk, issue fixed in v0.291.0, please check.

potiuk commented 4 months ago

PR to check it https://github.com/apache/airflow/pull/39974