langgenius / dify

Dify is an open-source LLM app development platform. Dify's intuitive interface combines AI workflow, RAG pipeline, agent capabilities, model management, observability features and more, letting you quickly go from prototype to production.
https://dify.ai
Other
45.3k stars 6.36k forks source link

longrun workflow api call got cut off by Dify's Cloudflare gateway #5982

Closed OCboy5 closed 2 months ago

OCboy5 commented 2 months ago

Self Checks

Dify version

0.6.11

Cloud or Self Hosted

Cloud

Steps to reproduce

mylog.txt My call to https://api.dify.ai/v1/workflows/run is a longrun call, typically lasts about 130s. I used to be able do that call successfully, but today I got cut off by the gateway after about 100 seconds. Here is the response record in my logfile. Apparently it's the Cloudflare who cut off the connection. Is this a new configuration change or an unexpected event?

✔️ Expected Behavior

the llm-based workflow takes a long time to complete. I would like to be able to do longrun call to the workflow api

❌ Actual Behavior

the longrun call was cut off in about 100 seconds by Dify's Cloudflare gateway.

dosubot[bot] commented 2 months ago

Hey @OCboy5! 👋 I'm here to help you with bugs, answer questions, and even chat about becoming a contributor. Let's tackle this issue together!

The cutoff of your long-running API call to https://api.dify.ai/v1/workflows/run by the Cloudflare gateway after approximately 100 seconds is due to a known configuration restriction. According to the documentation, Cloudflare imposes a restriction that interrupts requests after 100 seconds [1][2].

For handling long-running tasks, Dify provides asynchronous processing capabilities using Celery for task queueing. This can help you manage tasks that require more time to complete without being interrupted by the gateway [3][4][5].

To continue talking to Dosu, mention @dosu.

OCboy5 commented 2 months ago

Thanks! Can you show me an example of "asynchronous processing capabilities using Celery for task queueing" in Dify? @dosu

dosubot[bot] commented 2 months ago

To handle long-running API calls using Celery for asynchronous processing in Dify, you can follow these steps:

Step 1: Install Celery and Redis

Ensure you have Celery and Redis installed. You can add them to your pyproject.toml or install them directly:

poetry add celery redis

Step 2: Configure Celery

Create a celery.py file in your project to configure Celery:

from celery import Celery

app = Celery('dify_tasks', broker='redis://localhost:6379/0')

app.conf.update(
    result_backend='redis://localhost:6379/0',
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
)

Step 3: Define a Celery Task

Create a task to handle the long-running API call. For example, in a file named tasks.py:

import requests
from celery import shared_task

@shared_task
def run_workflow():
    url = 'https://api.dify.ai/v1/workflows/run'
    headers = {
        'Authorization': 'Bearer YOUR_API_KEY',
        'Content-Type': 'application/json'
    }
    data = {
        # Your payload here
    }
    response = requests.post(url, headers=headers, json=data)
    return response.json()

Step 4: Call the Celery Task

In your application, call the Celery task asynchronously:

from tasks import run_workflow

# Trigger the task
result = run_workflow.delay()

# You can check the status or result later
print(result.status)
print(result.result)

Step 5: Start the Celery Worker

Start the Celery worker as described in the document:

poetry run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace

This setup will allow you to handle long-running API calls asynchronously using Celery, avoiding the Cloudflare timeout issue [1].

To continue talking to Dosu, mention @dosu.

OCboy5 commented 2 months ago

@dosu thanks. Do I have to use an asynchronous HTTP client like aiohttp or httpx for doing requests.post to dify api server? otherwise it will still wait on the api connection and get cut off eventually.

dosubot[bot] commented 2 months ago

You don't necessarily have to use an asynchronous HTTP client like aiohttp or httpx for making requests to the Dify API server. However, using an asynchronous HTTP client can be beneficial for handling long-running tasks more efficiently, as it allows your application to perform other operations while waiting for the API response.

If you choose to use an asynchronous HTTP client, here's an example using httpx with Celery:

Step 1: Install httpx

Ensure you have httpx installed:

poetry add httpx

Step 2: Define a Celery Task with httpx

Create a task to handle the long-running API call using httpx. For example, in a file named tasks.py:

import httpx
from celery import shared_task

@shared_task
async def run_workflow():
    url = 'https://api.dify.ai/v1/workflows/run'
    headers = {
        'Authorization': 'Bearer YOUR_API_KEY',
        'Content-Type': 'application/json'
    }
    data = {
        # Your payload here
    }
    async with httpx.AsyncClient() as client:
        response = await client.post(url, headers=headers, json=data)
    return response.json()

Step 3: Call the Celery Task

In your application, call the Celery task asynchronously:

from tasks import run_workflow

# Trigger the task
result = run_workflow.delay()

# You can check the status or result later
print(result.status)
print(result.result)

Step 4: Start the Celery Worker

Start the Celery worker as described in the document:

poetry run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace

Using httpx in this manner allows your Celery tasks to handle long-running API calls more efficiently, reducing the risk of being cut off by the gateway.

To continue talking to Dosu, mention @dosu.

OCboy5 commented 2 months ago

thank you. is gevent necessary for making celery work on windows VSCode virtual env? @dosu

dosubot[bot] commented 2 months ago

No, gevent is not strictly necessary for making Celery work on a Windows VSCode virtual environment. While gevent is commonly used for concurrency, you can configure Celery to use other worker classes that are compatible with Windows. For Windows, you should use the solo worker class instead of gevent.

Here is an example configuration for running Celery on Windows:

{
    "name": "Python: Celery",
    "type": "debugpy",
    "request": "launch",
    "python": "${workspaceFolder}/api/.venv/bin/python",
    "cwd": "${workspaceFolder}/api",
    "module": "celery",
    "justMyCode": true,
    "envFile": ".env",
    "console": "integratedTerminal",
    "env": {
        "FLASK_APP": "app.py",
        "FLASK_DEBUG": "1",
        "GEVENT_SUPPORT": "True"
    },
    "args": [
        "-A",
        "app.celery",
        "worker",
        "-P",
        "solo",  // Use "solo" for Windows
        "-c",
        "1",
        "--loglevel",
        "info",
        "-Q",
        "dataset,generation,mail,ops_trace"
    ]
}

Additionally, you should include the --without-gossip and --without-mingle options for better compatibility on Windows:

celery -A app.celery worker -P solo --without-gossip --without-mingle -Q dataset,generation,mail --loglevel INFO

This configuration uses the solo worker class, making it compatible with Windows [1][2][3][4][5].

To continue talking to Dosu, mention @dosu.

OCboy5 commented 2 months ago

@dosu thanks. further question, if I want to implement a polling mechanism to check the api request status, and after it reports back SUCCESSFUL then retrieve the result. how should I make that happen?

dosubot[bot] commented 2 months ago

To implement a polling mechanism to check the status of the API request and retrieve the result once it is successful, you can use the crawl_url and check_crawl_status methods provided in the FirecrawlApp class. Here is how you can do it:

Step 1: Define the FirecrawlApp Class

import time
import requests
import json

class FirecrawlApp:
    def __init__(self, api_key=None, base_url=None):
        self.api_key = api_key
        self.base_url = base_url or 'https://api.firecrawl.dev'
        if self.api_key is None and self.base_url == 'https://api.firecrawl.dev':
            raise ValueError('No API key provided')

    def crawl_url(self, url, params=None) -> str:
        headers = self._prepare_headers()
        json_data = {'url': url}
        if params:
            json_data.update(params)
        response = self._post_request(f'{self.base_url}/v0/crawl', json_data, headers)
        if response.status_code == 200:
            job_id = response.json().get('jobId')
            return job_id
        else:
            self._handle_error(response, 'start crawl job')

    def check_crawl_status(self, job_id) -> dict:
        headers = self._prepare_headers()
        response = self._get_request(f'{self.base_url}/v0/crawl/status/{job_id}', headers)
        if response.status_code == 200:
            crawl_status_response = response.json()
            if crawl_status_response.get('status') == 'completed':
                total = crawl_status_response.get('total', 0)
                if total == 0:
                    raise Exception('Failed to check crawl status. Error: No page found')
                data = crawl_status_response.get('data', [])
                url_data_list = []
                for item in data:
                    if isinstance(item, dict) and 'metadata' in item and 'markdown' in item:
                        url_data = {
                            'title': item.get('metadata').get('title'),
                            'description': item.get('metadata').get('description'),
                            'source_url': item.get('metadata').get('sourceURL'),
                            'markdown': item.get('markdown')
                        }
                        url_data_list.append(url_data)
                return {
                    'status': 'completed',
                    'total': crawl_status_response.get('total'),
                    'current': crawl_status_response.get('current'),
                    'data': url_data_list
                }
            else:
                return {
                    'status': crawl_status_response.get('status'),
                    'total': crawl_status_response.get('total'),
                    'current': crawl_status_response.get('current'),
                    'data': []
                }
        else:
            self._handle_error(response, 'check crawl status')

    def _prepare_headers(self):
        return {
            'Content-Type': 'application/json',
            'Authorization': f'Bearer {self.api_key}'
        }

    def _post_request(self, url, data, headers, retries=3, backoff_factor=0.5):
        for attempt in range(retries):
            response = requests.post(url, headers=headers, json=data)
            if response.status_code == 502:
                time.sleep(backoff_factor * (2 ** attempt))
            else:
                return response
        return response

    def _get_request(self, url, headers, retries=3, backoff_factor=0.5):
        for attempt in range(retries):
            response = requests.get(url, headers=headers)
            if response.status_code == 502:
                time.sleep(backoff_factor * (2 ** attempt))
            else:
                return response
        return response

    def _handle_error(self, response, action):
        error_message = response.json().get('error', 'Unknown error occurred')
        raise Exception(f'Failed to {action}. Status code: {response.status_code}. Error: {error_message}')

Step 2: Use the FirecrawlApp Class to Implement Polling

api_key = 'your_api_key'
firecrawl_app = FirecrawlApp(api_key=api_key)

# Start the crawl job
job_id = firecrawl_app.crawl_url('https://example.com')

# Poll the status until the job is completed
while True:
    status = firecrawl_app.check_crawl_status(job_id)
    if status['status'] == 'completed':
        print(status['data'])
        break
    else:
        print(f"Current status: {status['status']}, Progress: {status['current']}/{status['total']}")
        time.sleep(5)  # Wait for 5 seconds before checking again

This setup will start the crawl job and keep polling the status every 5 seconds until the job is completed. Once completed, it will return the result [1][2][3].

To continue talking to Dosu, mention @dosu.

OCboy5 commented 2 months ago

@dosu thanks. what would the code be like if I want to implement the polling mechanism in javascript code?

dosubot[bot] commented 2 months ago

To implement a polling mechanism in JavaScript to check the status of an API request and retrieve the result once it is successful, you can use the fetch API along with setInterval to periodically check the status. Here is an example:

const POLLING_INTERVAL = 5000; // 5 seconds

async function checkStatus(url) {
  try {
    const response = await fetch(url, {
      method: 'GET',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': 'Bearer YOUR_API_KEY'
      },
    });

    if (!response.ok) {
      throw new Error('Network response was not ok');
    }

    const data = await response.json();

    if (data.status === 'completed') {
      console.log('Request completed:', data);
      return data;
    } else {
      console.log('Request still in progress...');
      return null;
    }
  } catch (error) {
    console.error('Error checking status:', error);
    return null;
  }
}

function pollStatus(url) {
  const intervalId = setInterval(async () => {
    const result = await checkStatus(url);
    if (result) {
      clearInterval(intervalId);
      // Handle the completed result here
      console.log('Final result:', result);
    }
  }, POLLING_INTERVAL);
}

// Usage
const apiUrl = 'https://api.example.com/status';
pollStatus(apiUrl);

In this example, checkStatus is an asynchronous function that makes a GET request to the provided URL and checks if the status of the request is 'completed'. The pollStatus function uses setInterval to call checkStatus every 5 seconds until the request is completed [1].

To continue talking to Dosu, mention @dosu.

OCboy5 commented 2 months ago

@dosu thanks. what is the exact url to use to check the status of certain dify api request of a task_id?

dosubot[bot] commented 2 months ago

The exact URL to use to check the status of a specific Dify API request using a task_id is:

https://api.dify.ai/v1/datasets/{dataset_id}/documents/{batch}/indexing-status

Replace {dataset_id} with the specific dataset ID and {batch} with the task ID [1].

To continue talking to Dosu, mention @dosu.