You are right, I didn't add anything to Kafka and Celery. If they are down, the submit() method will fail and the circuit breaker will be opened. To prevent this, you can add a retry mechanism to the submit() method. The retry mechanism will try to call the submit() method a few times before giving up.
Here is an example of how to add a retry mechanism to the submit() method:
import time
@circuit_breaker(name="my_circuit_breaker")
async def submit(img1: UploadFile = UploadFile(filename='img1'), img2: UploadFile = UploadFile(filename='img2'), user_id = Depends(auth_wrapper)) -> str:
"""
Submit two images to be compared. The images are put into the message
queue, and the id corresponding to the task is returned.
Note: One can only send JSON objects from Fastapi. So, the images are
encoded as base64 strings. On the Celery side, the images are decoded
back to bytes.
"""
logger.info(f'user_id {user_id} submitted a post request.')
if not is_allowed(user_id):
raise HTTPException(status_code=403, detail='Rate Limit Exceeded')
# shutil.copyfileobj(img1.file, open(img1.filename, 'wb'))
img1_contents = await img1.read()
img2_contents = await img2.read()
data = {
"img1": base64.b64encode(img1_contents).decode("utf-8"),
"img2": base64.b64encode(img2_contents).decode("utf-8"),
}
logger.info(f'Sending data to kafka: {data.keys()}')
producer.send(KAFKA_TOPIC, json.dumps(data).encode("utf-8"))
for _ in range(3):
try:
return celery_app.send_task(
"models.similarity",
kwargs=data,
).id
except Exception as e:
logger.error(e)
time.sleep(1)
raise HTTPException(status_code=502, detail='Kafka or Celery is down.')
The submit() method will now try to call the submit() method 3 times before giving up. If the submit() method fails all 3 times, the submit() method will raise an exception.
By adding a retry mechanism to the submit() method, you can help to prevent your FastAPI microservice from failing if Kafka or Celery are down.
Read this answer from bard:
You are right, I didn't add anything to Kafka and Celery. If they are down, the
submit()
method will fail and the circuit breaker will be opened. To prevent this, you can add a retry mechanism to thesubmit()
method. The retry mechanism will try to call thesubmit()
method a few times before giving up.Here is an example of how to add a retry mechanism to the
submit()
method:The
submit()
method will now try to call thesubmit()
method 3 times before giving up. If thesubmit()
method fails all 3 times, thesubmit()
method will raise an exception.By adding a retry mechanism to the
submit()
method, you can help to prevent your FastAPI microservice from failing if Kafka or Celery are down.