zking2000 / NotePad

1 stars 0 forks source link

Airflow #50

Open zking2000 opened 2 weeks ago

zking2000 commented 2 weeks ago

当然,我可以为您创建一个更详细的基于Java代码CI/CD的Airflow DAG。这个DAG将包含更多具体的步骤,涵盖从代码拉取到生产部署的整个过程。


from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.email_operator import EmailOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email': ['dev-team@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'java_cicd_pipeline',
    default_args=default_args,
    description='Detailed CI/CD pipeline for Java project',
    schedule_interval=timedelta(days=1),
    catchup=False
)

# 定义任务
start = DummyOperator(task_id='start', dag=dag)

fetch_code = BashOperator(
    task_id='fetch_code',
    bash_command='git clone https://github.com/example/java-project.git && cd java-project && git checkout develop',
    dag=dag
)

check_code_style = BashOperator(
    task_id='check_code_style',
    bash_command='cd java-project && ./gradlew checkstyle',
    dag=dag
)

run_unit_tests = BashOperator(
    task_id='run_unit_tests',
    bash_command='cd java-project && ./gradlew test',
    dag=dag
)

static_code_analysis = BashOperator(
    task_id='static_code_analysis',
    bash_command='cd java-project && ./gradlew sonarqube',
    dag=dag
)

build_jar = BashOperator(
    task_id='build_jar',
    bash_command='cd java-project && ./gradlew build -x test',
    dag=dag
)

build_docker_image = BashOperator(
    task_id='build_docker_image',
    bash_command='cd java-project && docker build -t java-app:${BUILD_NUMBER} .',
    dag=dag
)

run_integration_tests = BashOperator(
    task_id='run_integration_tests',
    bash_command='cd java-project && ./gradlew integrationTest',
    dag=dag
)

push_to_registry = BashOperator(
    task_id='push_to_registry',
    bash_command='docker push our-registry.example.com/java-app:${BUILD_NUMBER}',
    dag=dag
)

deploy_to_staging = BashOperator(
    task_id='deploy_to_staging',
    bash_command='kubectl set image deployment/java-app java-app=our-registry.example.com/java-app:${BUILD_NUMBER} --namespace=staging',
    dag=dag
)

run_smoke_tests = BashOperator(
    task_id='run_smoke_tests',
    bash_command='cd java-project && ./gradlew smokeTest -Denv=staging',
    dag=dag
)

approve_production = DummyOperator(task_id='approve_production', dag=dag)

deploy_to_production = BashOperator(
    task_id='deploy_to_production',
    bash_command='kubectl set image deployment/java-app java-app=our-registry.example.com/java-app:${BUILD_NUMBER} --namespace=production',
    dag=dag
)

notify_team = EmailOperator(
    task_id='notify_team',
    to='dev-team@example.com',
    subject='CI/CD Pipeline Complete',
    html_content='The CI/CD pipeline for the Java project has completed successfully.',
    dag=dag
)

end = DummyOperator(task_id='end', dag=dag)

# 设置任务依赖
start >> fetch_code
fetch_code >> [check_code_style, run_unit_tests, static_code_analysis]
[check_code_style, run_unit_tests, static_code_analysis] >> build_jar
build_jar >> build_docker_image >> run_integration_tests
run_integration_tests >> push_to_registry >> deploy_to_staging
deploy_to_staging >> run_smoke_tests >> approve_production
approve_production >> deploy_to_production >> notify_team >> end

这个详细的Airflow DAG模拟了一个完整的Java项目CI/CD流程。以下是对这个DAG的详细解释:

  1. 开始:使用DummyOperator标记流程的开始。

  2. 获取代码:从Git仓库克隆代码并切换到develop分支。

  3. 代码风格检查:使用Checkstyle工具检查Java代码风格。

  4. 单元测试:运行项目的单元测试。

  5. 静态代码分析:使用SonarQube进行静态代码分析。

  6. 构建JAR包:使用Gradle构建项目的JAR包。

  7. 构建Docker镜像:基于项目代码构建Docker镜像。

  8. 集成测试:运行集成测试。

  9. 推送到镜像仓库:将Docker镜像推送到私有镜像仓库。

  10. 部署到预发环境:使用kubectl将新版本部署到Kubernetes的预发环境。

  11. 冒烟测试:在预发环境中运行冒烟测试。

  12. 批准生产部署:这是一个手动步骤,需要人工审批才能继续。

  13. 部署到生产环境:将新版本部署到Kubernetes的生产环境。

  14. 通知团队:发送电子邮件通知开发团队部署完成。

  15. 结束:标记流程的结束。

这个DAG的任务依赖关系确保了流程按照正确的顺序执行,同时允许某些任务并行运行以提高效率。

注意事项:

  1. 这个DAG假设使用了Gradle作为构建工具,Docker用于容器化,Kubernetes用于部署。
  2. 您需要根据实际的项目结构和工具链来调整具体的命令。
  3. 安全敏感的信息(如仓库URL、镜像仓库地址等)应该使用Airflow的变量或密钥来管理,而不是直接硬编码在DAG中。
  4. approve_production 任务是一个DummyOperator,在实际使用中,您可能需要替换为一个人工触发的传感器。

您需要我对这个详细的Java CI/CD Airflow DAG做进一步的解释或修改吗?

zking2000 commented 2 weeks ago
apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-config
  namespace: airflow
data:
  AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@127.0.0.1:5432/airflow
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@127.0.0.1:5432/airflow
  AIRFLOW__CELERY__BROKER_URL: rediss://{REDIS_USER}:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/0?ssl_cert_reqs=CERT_REQUIRED&ssl_ca_certs=/path/to/ca_cert.pem
  AIRFLOW__LOGGING__REMOTE_LOGGING: "True"
  AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: gs://{YOUR_GCS_BUCKET}/airflow/logs
  AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: google_cloud_default
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__LOGGING__LOGGING_LEVEL: INFO
  AIRFLOW__LOGGING__FAB_LOGGING_LEVEL: WARNING
  AIRFLOW__API__AUTH_BACKEND: airflow.api.auth.backend.basic_auth
  AIRFLOW__LOGGING__REMOTE_LOGGING: "True"
  AIRFLOW__LOGGING__REMOTE_HANDLER: gcp_cloud_logging.GCPCloudLoggingHandler
  AIRFLOW__LOGGING__GOOGLE_KEY_PATH: /path/to/your/service-account-key.json
zking2000 commented 2 weeks ago
apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-config
  namespace: airflow
data:
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@127.0.0.1:5432/airflow
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@127.0.0.1:5432/airflow
  AIRFLOW__CELERY__BROKER_URL: rediss://:${REDIS_AUTH}@${REDIS_HOST}:${REDIS_PORT}/0
  AIRFLOW__CELERY__BROKER_USE_SSL: "True"
  AIRFLOW__CELERY__SSL_ACTIVE: "True"
  AIRFLOW__CELERY__SSL_CERT_REQS: "CERT_REQUIRED"
  AIRFLOW__CELERY__SSL_CA_CERTS: "/path/to/ca_cert.pem"
  AIRFLOW__CORE__LOAD_EXAMPLES: "False"
  AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "False"
  AIRFLOW__LOGGING__LOGGING_LEVEL: INFO
  AIRFLOW__LOGGING__FAB_LOGGING_LEVEL: WARNING
  AIRFLOW__API__AUTH_BACKEND: airflow.api.auth.backend.basic_auth
  AIRFLOW__LOGGING__REMOTE_LOGGING: "False"
  AIRFLOW__LOGGING__BASE_LOG_FOLDER: "/opt/airflow/logs"
  AIRFLOW__SCHEDULER__CHILD_PROCESS_LOG_DIRECTORY: "/opt/airflow/logs/scheduler"
zking2000 commented 2 weeks ago
apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-config
  namespace: airflow
data:
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@127.0.0.1:5432/airflow
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@127.0.0.1:5432/airflow
  AIRFLOW__CELERY__BROKER_URL: rediss://:${REDIS_AUTH}@${REDIS_HOST}:${REDIS_PORT}/0
  AIRFLOW__CELERY__BROKER_USE_SSL: "True"
  AIRFLOW__CELERY__SSL_ACTIVE: "True"
  AIRFLOW__CELERY__SSL_CERT_REQS: "CERT_REQUIRED"
  AIRFLOW__CELERY__SSL_CA_CERTS: "/path/to/ca_cert.pem"
  AIRFLOW__CORE__LOGGING_LEVEL: INFO
  AIRFLOW__LOGGING__LOGGING_LEVEL: INFO
  AIRFLOW__LOGGING__FAB_LOGGING_LEVEL: WARNING
  AIRFLOW__LOGGING__REMOTE_LOGGING: "False"
  AIRFLOW__LOGGING__BASE_LOG_FOLDER: "/opt/airflow/logs"
  AIRFLOW__LOGGING__WORKER_LOG_SERVER_PORT: "8793"
  AIRFLOW__API__AUTH_BACKEND: airflow.api.auth.backend.basic_auth
  AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "True"
zking2000 commented 2 weeks ago
FROM apache/airflow:2.5.0
USER root
RUN pip install google-cloud-storage
USER airflow

apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-config
  namespace: airflow
data:
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@127.0.0.1:5432/airflow
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@127.0.0.1:5432/airflow
  AIRFLOW__CELERY__BROKER_URL: rediss://:${REDIS_AUTH}@${REDIS_HOST}:${REDIS_PORT}/0
  AIRFLOW__CELERY__BROKER_USE_SSL: "True"
  AIRFLOW__CELERY__SSL_ACTIVE: "True"
  AIRFLOW__CELERY__SSL_CERT_REQS: "CERT_REQUIRED"
  AIRFLOW__CELERY__SSL_CA_CERTS: "/path/to/ca_cert.pem"
  AIRFLOW__CORE__LOAD_EXAMPLES: "False"
  AIRFLOW__WEBSERVER__EXPOSE_CONFIG: "False"
  AIRFLOW__LOGGING__LOGGING_LEVEL: INFO
  AIRFLOW__LOGGING__FAB_LOGGING_LEVEL: WARNING
  AIRFLOW__API__AUTH_BACKEND: airflow.api.auth.backend.basic_auth
  AIRFLOW__LOGGING__REMOTE_LOGGING: "True"
  AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER: gs://${GCS_BUCKET}/airflow/logs
  AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID: google_cloud_default
  AIRFLOW__LOGGING__ENCRYPT_S3_LOGS: "False"
zking2000 commented 2 weeks ago

RUN pip install --no-cache-dir apache-airflow-providers-google google-cloud-storage

zking2000 commented 1 week ago
# sonar_operator.py
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import subprocess
import os

class SonarScanOperator(BaseOperator):

    @apply_defaults
    def __init__(
        self,
        project_key,
        project_name,
        sonar_host_url,
        sonar_login,
        sources_path,
        java_binaries=None,
        additional_properties=None,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.project_key = project_key
        self.project_name = project_name
        self.sonar_host_url = sonar_host_url
        self.sonar_login = sonar_login
        self.sources_path = sources_path
        self.java_binaries = java_binaries
        self.additional_properties = additional_properties or {}

    def execute(self, context):
        command = [
            "sonar-scanner",
            f"-Dsonar.projectKey={self.project_key}",
            f"-Dsonar.projectName={self.project_name}",
            f"-Dsonar.host.url={self.sonar_host_url}",
            f"-Dsonar.login={self.sonar_login}",
            f"-Dsonar.sources={self.sources_path}"
        ]

        if self.java_binaries:
            command.append(f"-Dsonar.java.binaries={self.java_binaries}")

        for key, value in self.additional_properties.items():
            command.append(f"-D{key}={value}")

        self.log.info(f"执行SonarQube扫描: {' '.join(command)}")

        try:
            result = subprocess.run(command, check=True, capture_output=True, text=True)
            self.log.info(f"SonarQube扫描成功完成: \n{result.stdout}")
        except subprocess.CalledProcessError as e:
            self.log.error(f"SonarQube扫描失败: \n{e.stderr}")
            raise
# sonar_scan_dag.py
from airflow import DAG
from airflow.utils.dates import days_ago
from sonar_operator import SonarScanOperator

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
}

dag = DAG(
    'sonar_scan_example',
    default_args=default_args,
    description='A simple DAG to run SonarQube scan',
    schedule_interval=None,
)

scan_task = SonarScanOperator(
    task_id='run_sonar_scan',
    project_key='your_project_key',
    project_name='Your Project Name',
    sonar_host_url='http://your-sonarqube-url:9000',
    sonar_login='your_sonar_token',
    sources_path='/path/to/your/source/code',
    java_binaries='/path/to/your/compiled/classes',
    additional_properties={
        'sonar.java.libraries': '/path/to/your/dependencies/*.jar',
        'sonar.coverage.jacoco.xmlReportPaths': '/path/to/your/coverage/report.xml'
    },
    dag=dag,
)

scan_task
# nexus_operator.py
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import requests
from requests.auth import HTTPBasicAuth
import json

class NexusDockerOperator(BaseOperator):

    @apply_defaults
    def __init__(
        self,
        nexus_url,
        nexus_username,
        nexus_password,
        repository,
        image_name,
        image_tag,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.nexus_url = nexus_url
        self.nexus_username = nexus_username
        self.nexus_password = nexus_password
        self.repository = repository
        self.image_name = image_name
        self.image_tag = image_tag

    def execute(self, context):
        auth = HTTPBasicAuth(self.nexus_username, self.nexus_password)

        # 构建API URL
        api_url = f"{self.nexus_url}/service/rest/v1/components?repository={self.repository}"

        # 准备上传的数据
        data = {
            "name": self.image_name,
            "version": self.image_tag,
            "assets": [
                {
                    "docker.imageName": self.image_name,
                    "docker.imageTag": self.image_tag
                }
            ]
        }

        # 发送POST请求上传镜像
        response = requests.post(api_url, auth=auth, json=data)

        if response.status_code == 204:
            self.log.info(f"成功上传镜像 {self.image_name}:{self.image_tag} 到 {self.repository}")
        else:
            self.log.error(f"上传失败: {response.text}")
            raise Exception("镜像上传失败")
# nexus_upload_dag.py
from airflow import DAG
from airflow.utils.dates import days_ago
from nexus_operator import NexusDockerOperator

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
}

dag = DAG(
    'nexus_upload_example',
    default_args=default_args,
    description='A simple DAG to upload Docker image to Nexus',
    schedule_interval=None,
)

upload_task = NexusDockerOperator(
    task_id='upload_to_nexus',
    nexus_url='http://your-nexus-url:8081',
    nexus_username='your_username',
    nexus_password='your_password',
    repository='your-docker-repo',
    image_name='your-image-name',
    image_tag='latest',
    dag=dag,
)

upload_task
# nexus_iq_operator.py
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import subprocess
import os

class NexusIQScanOperator(BaseOperator):

    @apply_defaults
    def __init__(
        self,
        nexus_iq_url,
        username,
        password,
        application_id,
        stage,
        scan_target,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.nexus_iq_url = nexus_iq_url
        self.username = username
        self.password = password
        self.application_id = application_id
        self.stage = stage
        self.scan_target = scan_target

    def execute(self, context):
        command = [
            "java", "-jar", "nexus-iq-cli.jar",
            f"-s {self.nexus_iq_url}",
            f"-a {self.username}:{self.password}",
            f"-i {self.application_id}",
            f"-t {self.stage}",
            self.scan_target
        ]

        self.log.info(f"执行Nexus IQ扫描: {' '.join(command)}")

        try:
            result = subprocess.run(command, check=True, capture_output=True, text=True)
            self.log.info(f"Nexus IQ扫描成功完成: \n{result.stdout}")
        except subprocess.CalledProcessError as e:
            self.log.error(f"Nexus IQ扫描失败: \n{e.stderr}")
            raise

        self.log.info(f"Nexus IQ扫描完成,目标: {self.scan_target}")
# git_clone_operator.py
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import subprocess
import os

class GitCloneOperator(BaseOperator):

    @apply_defaults
    def __init__(
        self,
        repo_url,
        branch='main',
        clone_path='/tmp/repo',
        github_token=None,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.repo_url = repo_url
        self.branch = branch
        self.clone_path = clone_path
        self.github_token = github_token

    def execute(self, context):
        if os.path.exists(self.clone_path):
            self.log.info(f"删除已存在的目录: {self.clone_path}")
            subprocess.run(['rm', '-rf', self.clone_path], check=True)

        clone_command = ['git', 'clone', '-b', self.branch, self.repo_url, self.clone_path]

        if self.github_token:
            auth_url = f"https://{self.github_token}@github.com/"
            clone_command[2] = self.repo_url.replace("https://github.com/", auth_url)

        self.log.info(f"克隆仓库: {self.repo_url} (分支: {self.branch})")
        try:
            result = subprocess.run(clone_command, check=True, capture_output=True, text=True)
            self.log.info(f"Git克隆成功完成: \n{result.stdout}")
        except subprocess.CalledProcessError as e:
            self.log.error(f"Git克隆失败: \n{e.stderr}")
            raise

        self.log.info(f"代码已克隆到: {self.clone_path}")
# github_ci_dag.py
from airflow import DAG
from airflow.utils.dates import days_ago
from git_clone_operator import GitCloneOperator
from sonar_operator import SonarScanOperator
from nexus_iq_operator import NexusIQScanOperator

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
}

dag = DAG(
    'github_ci_example',
    default_args=default_args,
    description='A DAG to clone GitHub repo, run SonarQube scan, and Nexus IQ scan',
    schedule_interval=None,
)

clone_task = GitCloneOperator(
    task_id='clone_repo',
    repo_url='https://github.com/your-username/your-repo.git',
    branch='main',
    clone_path='/tmp/your-repo',
    github_token='{{ var.value.github_token }}',
    dag=dag,
)

sonar_scan_task = SonarScanOperator(
    task_id='run_sonar_scan',
    project_key='your_project_key',
    project_name='Your Project Name',
    sonar_host_url='http://your-sonarqube-url:9000',
    sonar_login='{{ var.value.sonar_token }}',
    sources_path='/tmp/your-repo',
    java_binaries='/tmp/your-repo/target/classes',
    additional_properties={
        'sonar.java.libraries': '/tmp/your-repo/target/dependency/*.jar',
        'sonar.coverage.jacoco.xmlReportPaths': '/tmp/your-repo/target/site/jacoco/jacoco.xml'
    },
    dag=dag,
)

nexus_iq_scan_task = NexusIQScanOperator(
    task_id='run_nexus_iq_scan',
    nexus_iq_url='http://your-nexus-iq-url:8070',
    username='{{ var.value.nexus_iq_username }}',
    password='{{ var.value.nexus_iq_password }}',
    application_id='your-application-id',
    stage='build',
    scan_target='/tmp/your-repo',
    dag=dag,
)

clone_task >> [sonar_scan_task, nexus_iq_scan_task]
zking2000 commented 1 week ago
# nexus_iq_scan_dag.py
from airflow import DAG
from airflow.utils.dates import days_ago
from nexus_iq_operator import NexusIQScanOperator
from git_clone_operator import GitCloneOperator

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
}

dag = DAG(
    'nexus_iq_scan_example',
    default_args=default_args,
    description='A DAG to clone a repo and run Nexus IQ scan',
    schedule_interval=None,
)

# 克隆代码任务
clone_task = GitCloneOperator(
    task_id='clone_repo',
    repo_url='https://github.com/your-username/your-repo.git',
    branch='main',
    clone_path='/tmp/your-repo',
    github_token='{{ var.value.github_token }}',
    dag=dag,
)

# Nexus IQ扫描任务
nexus_iq_scan_task = NexusIQScanOperator(
    task_id='run_nexus_iq_scan',
    nexus_iq_url='http://your-nexus-iq-url:8070',
    username='{{ var.value.nexus_iq_username }}',
    password='{{ var.value.nexus_iq_password }}',
    application_id='your-application-id',
    stage='build',
    scan_target='/tmp/your-repo',
    dag=dag,
)

# 设置任务依赖关系
clone_task >> nexus_iq_scan_task