langgenius / dify

Dify is an open-source LLM app development platform. Dify's intuitive interface combines AI workflow, RAG pipeline, agent capabilities, model management, observability features and more, letting you quickly go from prototype to production.
https://dify.ai
Other
46.63k stars 6.58k forks source link

why @shared_task(queue='generation') not in code #7428

Closed 401557122 closed 1 month ago

401557122 commented 1 month ago

Self Checks

Dify version

0.7.0

Cloud or Self Hosted

Self Hosted (Docker)

Steps to reproduce

why @shared_task(queue='generation') not in code

✔️ Expected Behavior

No response

❌ Actual Behavior

No response

dosubot[bot] commented 1 month ago

The code uses @shared_task(queue='dataset') instead of @shared_task(queue='generation'). The task create_segment_to_index_task is assigned to the 'dataset' queue, not the 'generation' queue.

For example, in the document_indexing_update_task function, the @shared_task decorator specifies the 'dataset' queue:

@shared_task(queue='dataset')
def document_indexing_update_task(dataset_id: str, document_id: str):
    """
    Async update document
    :param dataset_id:
    :param document_id:

    Usage: document_indexing_update_task.delay(dataset_id, document_id)
    """
    # function implementation

Similarly, the create_segment_to_index_task function is also assigned to the 'dataset' queue:

@shared_task(queue='dataset')
def create_segment_to_index_task(segment_id: str, keywords: Optional[list[str]] = None):
    # function implementation

Additionally, in the add_annotation_to_index_task function, the @shared_task decorator also specifies the 'dataset' queue:

@shared_task(queue='dataset')
def add_annotation_to_index_task(annotation_id: str, question: str, tenant_id: str, app_id: str,
                                 collection_binding_id: str):
    """
    Add annotation to index.
    :param annotation_id: annotation id
    :param question: question
    :param tenant_id: tenant id
    :param app_id: app id
    :param collection_binding_id: embedding binding id

    Usage: clean_dataset_task.delay(dataset_id, tenant_id, indexing_technique, index_struct)
    """
    # function implementation

This consistent use of the 'dataset' queue across different tasks indicates that the 'generation' queue is not used in this context [1][2][3][4][5].

To continue talking to Dosu, mention @dosu.

401557122 commented 1 month ago

so why i need to start generation celery in entrypoint.sh

dosubot[bot] commented 1 month ago

You need to start the generation Celery in entrypoint.sh to ensure that Celery workers are properly configured and running with the correct settings, such as concurrency options and queue assignments. This setup is crucial for handling background tasks efficiently in your application.

The relevant part of the entrypoint.sh script is:

if [[ "${MODE}" == "worker" ]]; then

  # Get the number of available CPU cores
  if [ "${CELERY_AUTO_SCALE,,}" = "true" ]; then
    # Set MAX_WORKERS to the number of available cores if not specified
    AVAILABLE_CORES=$(nproc)
    MAX_WORKERS=${CELERY_MAX_WORKERS:-$AVAILABLE_CORES}
    MIN_WORKERS=${CELERY_MIN_WORKERS:-1}
    CONCURRENCY_OPTION="--autoscale=${MAX_WORKERS},${MIN_WORKERS}"
  else
    CONCURRENCY_OPTION="-c ${CELERY_WORKER_AMOUNT:-1}"
  fi

  exec celery -A app.celery worker -P ${CELERY_WORKER_CLASS:-gevent} $CONCURRENCY_OPTION --loglevel INFO \
    -Q ${CELERY_QUEUES:-dataset,generation,mail,ops_trace,app_deletion}

This section of the script configures and starts the Celery worker with the appropriate concurrency settings and queue assignments, ensuring that tasks are processed efficiently [1].

To continue talking to Dosu, mention @dosu.

401557122 commented 1 month ago

but generation is not in code

dosubot[bot] commented 1 month ago

The 'generation' queue is indeed referenced in the Celery configuration, but there are no tasks in the codebase that explicitly use the @shared_task(queue='generation') decorator. The 'generation' queue is used by the MessageBasedAppQueueManager for managing tasks related to generating responses, as seen in the CompletionAppGenerator class.

Here is the relevant part of the code:

# init queue manager
queue_manager = MessageBasedAppQueueManager(
    task_id=application_generate_entity.task_id,
    user_id=application_generate_entity.user_id,
    invoke_from=application_generate_entity.invoke_from,
    conversation_id=conversation.id,
    app_mode=conversation.mode,
    message_id=message.id
)

# new thread
worker_thread = threading.Thread(target=self._generate_worker, kwargs={
    'flask_app': current_app._get_current_object(),
    'application_generate_entity': application_generate_entity,
    'queue_manager': queue_manager,
    'message_id': message.id,
})

worker_thread.start()

The MessageBasedAppQueueManager is used to manage the queue for generating responses, which explains why the 'generation' queue is included in the Celery configuration [1].

To continue talking to Dosu, mention @dosu.