vsouza / docker-SQS-local

Docker image to run Amazon Simple Queue Service (Amazon SQS) locally.
MIT License
226 stars 32 forks source link

How can I use it with celery? #19

Closed usamatariq70 closed 2 months ago

usamatariq70 commented 2 months ago

How can I use your docker SQS local with celery as a message broker in python?

vsouza commented 2 months ago

@usamatariq70 Please try to get your docker-machine address with the correct port and set int Celery app.

usamatariq70 commented 2 months ago

I am getting that, but it is not connecting. Even I tried adding sqs:// infront of ip.

vsouza commented 2 months ago

Seems not a problem with the image, I think will be better to find help in a forum.

ab3llini commented 3 weeks ago

Seems not a problem with the image, I think will be better to find help in a forum.

I managed to run it with celery, hope this can still help you..

Environment:


export AWS_ENDPOINT=http://minio:9000
export AWS_ACCESS_KEY=minio
export AWS_SECRET_KEY=minio123

export S3_BUCKET_NAME=images
export S3_URL_PREFIX=http://localhost:9000

export CELERY_BROKER_URL=sqs://${AWS_ACCESS_KEY}:${AWS_SECRET_KEY}@sqs:9324
export CELERY_SQS_QUEUE_NAME=default

export MINIO_ROOT_USER=minio
export MINIO_ROOT_PASSWORD=minio123

export POSTGRES_USER=postgres
export POSTGRES_PASSWORD=postgres
export POSTGRES_DB=db
export POSTGRES_URI=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB}

Worker settings (serialization stuff is for pydantic, don't pay attention to that):


class CeleryConfig:
    # Serialization configuration
    task_serializer = "pickle"
    result_serializer = "pickle"
    event_serializer = "json"
    accept_content = ["application/json", "application/x-python-serialize"]
    result_accept_content = ["application/json", "application/x-python-serialize"]

    # SQS configuration
    broker_transport_options = {
        "queue_name_prefix": "celery-",
        "visibility_timeout": 3600,  # 1 hour
        "polling_interval": 1,
    }

    # Other Celery settings
    broker_connection_retry_on_startup = True
    task_default_queue = os.environ["CELERY_SQS_QUEUE_NAME"]
    worker_prefetch_multiplier = 1
    task_acks_late = True

worker = Celery("tasks")
worker.config_from_object(CeleryConfig)

Compose:


services:
  fastapi:
    container_name: fastapi
    build:
      context: .
      target: dev
    ports:
      - "8000:8000"
    env_file:
      - .envrc
    volumes:
      - ./middleware:/app/middleware
      - ./tests:/app/tests
      - ./alembic:/app/alembic
      - ./alembic.ini:/app/alembic.ini
    working_dir: /app/middleware
    command: 'fastapi dev --host 0.0.0.0 --port 8000 src/app.py'
    tty: true
    restart: always
    depends_on:
      - postgres
      - minio
      - worker

  postgres:
    image: postgres
    container_name: postgres
    ports:
      - "5432:5432"
    env_file:
      - .envrc
    volumes:
      - postgres:/var/lib/postgresql/data
    restart: always

  sqs:
    container_name: sqs
    image: vsouza/sqs-local
    ports:
      - "9324:9324"

  worker:
    container_name: worker
    build:
      context: .
      target: dev
    command: celery -A src.worker worker --loglevel=info
    working_dir: /app/middleware
    user: nobody
    env_file:
      - .envrc
    volumes:
      - ./middleware:/app/middleware
    depends_on:
      - sqs

  minio:
    image: minio/minio
    ports:
      - "9000:9000"
      - "9001:9001"
    volumes:
      - minio_data:/data
      - ./minio.sh:/minio.sh
    command: server /data --console-address ":9001"
    env_file:
      - .envrc
    entrypoint: [ "/bin/sh", "/minio.sh" ]

  flower:
    container_name: flower
    build:
      context: .
      target: dev
    command: celery -A src.worker flower --port=5555
    working_dir: /app/middleware
    env_file:
      - .envrc
    volumes:
      - ./middleware:/app/middleware
    ports:
      - 5555:5555
    depends_on:
      - worker

volumes:
  postgres:
  minio_data:
  sqs:
...