Closed sakamomo554101 closed 3 years ago
https://docs.microsoft.com/ja-jp/azure/architecture/patterns/async-request-reply
上記のようなHTTPの非同期処理パターンが良い気がするな。 ようは、
FastAPI + Queue(Kafkaとか?)で作ってみようかな。
APIとして必要なのは、下記かな。
リクエスト時の必要なパラメーターはjsonフォーマットで渡す感じにするか。
設計として、良さそうなのが以下のどちらか。(多分、後者だけど、一旦は簡易的に前者でやってもいいかな)
いや、API gateway側でDBにアクセスしない方がいいか。 そしたら、一時的なuuidを作成して、Queue側に渡す感じにしよう。
SQS互換なElasticMQを使うか、Kafkaを使うか。 SQSだと、メッセージサイズの上限が256kb(拡張ライブラリであげられる)か。 https://github.com/softwaremill/elasticmq
kafka使うかな。 https://hub.docker.com/r/bitnami/kafka/
自前でコンテナイメージ作ってもいいけど、上記のようにすでにあるイメージを使う。
jsonをメッセージとして渡す場合は、serializer/deserializerを作れば良さそう。
https://aiokafka.readthedocs.io/en/stable/examples/batch_produce.html send_batchを使っても良い(一旦、愚直なforループにする)
https://docs.python.org/ja/3.6/library/asyncio-task.html async def と asyncio.coroutineについて(両者は異なる)
======================================================================
ERROR: test_send_and_receive_body (test.test_queue_helper.TestQueueHelper)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/workspace/queue/test/test_queue_helper.py", line 24, in test_send_and_receive_body
queue_helper.send_body_into_queue(body_text=body_text, config=self.__config)
File "/workspace/queue/queue_helper.py", line 17, in send_body_into_queue
producer.produce(messages=[message])
File "/workspace/queue/queue_client.py", line 58, in produce
loop.run_until_complete(self.__produce(loop=loop, messages=messages))
File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/workspace/queue/queue_client.py", line 71, in __produce
await producer.send(self._config.optional_param["topic_name"], message)
File "/usr/local/lib/python3.8/site-packages/aiokafka/producer/producer.py", line 420, in send
await self.client._wait_on_metadata(topic)
File "/usr/local/lib/python3.8/site-packages/aiokafka/client.py", line 628, in _wait_on_metadata
raise UnknownTopicOrPartitionError()
kafka.errors.UnknownTopicOrPartitionError: [Error 3] UnknownTopicOrPartitionError
あれ、事前にtopicは作らないとだっけ?
aiokafkaだと、topic作れないっけ、、ドキュメント探したが、わからん。
======================================================================
FAIL: test_send_and_receive_body (test.test_queue_helper.TestQueueHelper)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/workspace/queue/test/test_queue_helper.py", line 18, in setUp
queue_helper.create_kafka_topics(topics=[self.__config.optional_param["topic_name"]], cliend_id="test", config=self.__config)
File "/workspace/queue/queue_helper.py", line 32, in create_kafka_topics
client = KafkaAdminClient(
File "/usr/local/lib/python3.8/site-packages/kafka/admin/client.py", line 218, in __init__
self._refresh_controller_id()
File "/usr/local/lib/python3.8/site-packages/kafka/admin/client.py", line 278, in _refresh_controller_id
controller_version = self._client.check_version(controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000))
File "/usr/local/lib/python3.8/site-packages/kafka/client_async.py", line 901, in check_version
self._maybe_connect(try_node)
File "/usr/local/lib/python3.8/site-packages/kafka/client_async.py", line 372, in _maybe_connect
assert broker, 'Broker id %s not in current metadata' % (node_id,)
AssertionError: Broker id -1 not in current metadata
うーん、topicが上手く作れん。 kafkaのシェルスクリプト経由で作れるかを確認する。
kafkaのコンテナにはいって、下記を実行すると、ちゃんとトピックは作られてそう。
I have no name!@c8b57d77ce69:/$ kafka-topics.sh --zookeeper youyaku_ai_queue_zookeeper --create --replication-factor 1 --partitions 1 --topic test1
Created topic test1.
I have no name!@c8b57d77ce69:/$ kafka-topics.sh --list --zookeeper youyaku_ai_queue_zookeeper
test1
https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html#kafka.KafkaAdminClient KafkaAdminClientの取得処理で失敗してる
KAFKA_CFG_ADVERTISED_LISTENERSの設定が誤っていて、コンテナ経由でのqueueのアクセスが失敗していた。
api gateway部分は正常系は動作するようになった。 summarizer部分をQueueからデータ取得して、動作させるようにする。
https://matsuand.github.io/docs.docker.jp.onthefly/compose/environment-variables/
docker-composeだと、変数定義をenvファイルに設定できる。 これを利用して、DBのホスト名とかを設定しようかな。
単純にFlaskで組むと、余裕で耐えられなくなるので、前段にキューをかますなどの検討が必要。