Closed sakamomo554101 closed 3 years ago
だいたい出来上がっているが、あと何が足りないかを確認
API Gatewayが稼働しない。 下記のパス問題を解決する必要がある。
ERROR: Error loading ASGI app. Could not import module "api_gateway/main".
下記のissueが参考になりそう。 https://github.com/tiangolo/fastapi/issues/1495
今度はSummarizerのプロセスがしんだ。
Traceback (most recent call last):
File "summarizer/summarizer_process.py", line 128, in <module>
main()
File "summarizer/summarizer_process.py", line 52, in main
db_instance = DBFactory.get_db_instance(db_config=db_config, log_instance=logger)
File "summarizer/../db/db_wrapper.py", line 160, in get_db_instance
return MySQLDB(config=db_config, log_instance=log_instance)
File "summarizer/../db/db_wrapper.py", line 169, in __init__
self.__db_engine = create_engine("mysql+mysqlconnector://{}:{}@{}:{}/{}?charset={}".format(self._config.username,
File "<string>", line 2, in create_engine
File "/opt/conda/lib/python3.8/site-packages/sqlalchemy/util/deprecations.py", line 298, in warned
return fn(*args, **kwargs)
File "/opt/conda/lib/python3.8/site-packages/sqlalchemy/engine/create.py", line 518, in create_engine
u = _url.make_url(url)
File "/opt/conda/lib/python3.8/site-packages/sqlalchemy/engine/url.py", line 711, in make_url
return _parse_rfc1738_args(name_or_url)
File "/opt/conda/lib/python3.8/site-packages/sqlalchemy/engine/url.py", line 774, in _parse_rfc1738_args
return URL.create(name, **components)
File "/opt/conda/lib/python3.8/site-packages/sqlalchemy/engine/url.py", line 150, in create
cls._assert_port(port),
File "/opt/conda/lib/python3.8/site-packages/sqlalchemy/engine/url.py", line 160, in _assert_port
return int(port)
ValueError: invalid literal for int() with base 10: ''
ダッシュボード以外は動作してそう。 あとはダッシュボードのつなぎ込みを行う。
ダッシュボード側では、urllib.requestを使えば良いか。
下記のエラーが起きてる、、なんぞ、これ。。 loopも新規で作ってるはずなんだがなぁ、、
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/uvicorn/protocols/http/h11_impl.py", line 396, in run_asgi
result = await app(self.scope, self.receive, self.send)
File "/usr/local/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 45, in __call__
return await self.app(scope, receive, send)
File "/usr/local/lib/python3.8/site-packages/fastapi/applications.py", line 199, in __call__
await super().__call__(scope, receive, send)
File "/usr/local/lib/python3.8/site-packages/starlette/applications.py", line 111, in __call__
await self.middleware_stack(scope, receive, send)
File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 181, in __call__
raise exc from None
File "/usr/local/lib/python3.8/site-packages/starlette/middleware/errors.py", line 159, in __call__
await self.app(scope, receive, _send)
File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 82, in __call__
raise exc from None
File "/usr/local/lib/python3.8/site-packages/starlette/exceptions.py", line 71, in __call__
await self.app(scope, receive, sender)
File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 566, in __call__
await route.handle(scope, receive, send)
File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 227, in handle
await self.app(scope, receive, send)
File "/usr/local/lib/python3.8/site-packages/starlette/routing.py", line 41, in app
response = await func(request)
File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 201, in app
raw_response = await run_endpoint_function(
File "/usr/local/lib/python3.8/site-packages/fastapi/routing.py", line 148, in run_endpoint_function
return await dependant.call(**values)
File "./api_gateway/api.py", line 73, in request_summarize
return await self.__request_summarize(input_data=input_data)
File "./api_gateway/api.py", line 87, in __request_summarize
id = queue_helper.send_body_into_queue(body_text=input_data.body, producer=self.__queue_producer)
File "./api_gateway/queue_helper.py", line 19, in send_body_into_queue
producer.produce(messages=[message])
File "./api_gateway/../queue/queue_client.py", line 71, in produce
internal_result = loop.run_until_complete(self.__produce(loop=loop, messages=messages))
File "/usr/local/lib/python3.8/asyncio/base_events.py", line 592, in run_until_complete
self._check_running()
File "/usr/local/lib/python3.8/asyncio/base_events.py", line 554, in _check_running
raise RuntimeError(
RuntimeError: Cannot run the event loop while another loop is running
/usr/local/lib/python3.8/site-packages/uvicorn/protocols/http/h11_impl.py:403: RuntimeWarning: coroutine 'KafkaQueueProducer.__produce' was never awaited
self.transport.close()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
event loopがネストしてる? https://rinoguchi.net/2020/11/python-asyncio.html
うーむ、kafkaのテストコードも動かんし、ちょっと調査する。
そうか、、非同期→同期→非同期で、ネストしてる・・。 設計をやり直すのが正しいなぁ。
うーむ、試しに非同期でQueueのAPIを作り直したが、、なんか同じエラーになる・・。 get_running_loopで今動いているループを取得して、AIOKafkaProducerに渡せば良い気がするが、なんか動かん。
いや、非同期のAPIじゃなくて、同期のAPI読んでただけ・・。自分の凡ミス。 次にSummarizerで以下のエラー(テーブルがなんか作られてない)
Traceback (most recent call last):
File "/opt/conda/lib/python3.8/site-packages/mysql/connector/connection_cext.py", line 506, in cmd_query
self._cmysql.query(query,
_mysql_connector.MySQLInterfaceError: Table 'summarizer_db.body_info' doesn't exist
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/conda/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1706, in _execute_context
self.dialect.do_execute(
File "/opt/conda/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 717, in do_execute
cursor.execute(statement, parameters)
File "/opt/conda/lib/python3.8/site-packages/mysql/connector/cursor_cext.py", line 269, in execute
result = self._cnx.cmd_query(stmt, raw=self._raw,
File "/opt/conda/lib/python3.8/site-packages/mysql/connector/connection_cext.py", line 510, in cmd_query
raise errors.get_mysql_exception(exc.errno, msg=exc.msg,
mysql.connector.errors.ProgrammingError: 1146 (42S02): Table 'summarizer_db.body_info' doesn't exist
メインの関数群で、誰もテーブル作ってない笑(テストでは作ってたけども) 誰が、DBの初期化をするか。 DBのコンテナ自身がやるのが良い?
テーブルが作成されない、、
Traceback (most recent call last):
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.8/site-packages/uvicorn/subprocess.py", line 61, in subprocess_started
target(sockets=sockets)
File "/usr/local/lib/python3.8/site-packages/uvicorn/server.py", line 49, in run
loop.run_until_complete(self.serve(sockets=sockets))
File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/usr/local/lib/python3.8/site-packages/uvicorn/server.py", line 56, in serve
config.load()
File "/usr/local/lib/python3.8/site-packages/uvicorn/config.py", line 308, in load
self.loaded_app = import_from_string(self.app)
File "/usr/local/lib/python3.8/site-packages/uvicorn/importer.py", line 20, in import_from_string
module = importlib.import_module(module_str)
File "/usr/local/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 783, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "./api_gateway/main.py", line 16, in <module>
db_instance.create_all_tables_if_needed()
File "./api_gateway/../db/db_wrapper.py", line 178, in create_all_tables_if_needed
self.__metadata.create_all(self.__db_engine)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/schema.py", line 4744, in create_all
bind._run_ddl_visitor(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3008, in _run_ddl_visitor
with self.begin() as conn:
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2924, in begin
conn = self.connect(close_with_result=close_with_result)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3096, in connect
return self._connection_cls(self, close_with_result=close_with_result)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 92, in __init__
else engine.raw_connection()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3175, in raw_connection
return self._wrap_pool_connect(self.pool.connect, _connection)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3145, in _wrap_pool_connect
Connection._handle_dbapi_exception_noconnection(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2004, in _handle_dbapi_exception_noconnection
util.raise_(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 3142, in _wrap_pool_connect
return fn()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 301, in connect
return _ConnectionFairy._checkout(self)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 761, in _checkout
fairy = _ConnectionRecord.checkout(pool)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 419, in checkout
rec = pool._do_get()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/impl.py", line 145, in _do_get
self._dec_overflow()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/impl.py", line 142, in _do_get
return self._create_connection()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 247, in _create_connection
return _ConnectionRecord(self)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 362, in __init__
self.__connect()
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 605, in __connect
pool.logger.debug("Error on connect(): %s", e)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/pool/base.py", line 599, in __connect
connection = pool._invoke_creator(self)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/create.py", line 578, in connect
return dialect.connect(*cargs, **cparams)
File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 584, in connect
return self.dbapi.connect(*cargs, **cparams)
File "/usr/local/lib/python3.8/site-packages/mysql/connector/__init__.py", line 272, in connect
return CMySQLConnection(*args, **kwargs)
File "/usr/local/lib/python3.8/site-packages/mysql/connector/connection_cext.py", line 85, in __init__
self.connect(**kwargs)
File "/usr/local/lib/python3.8/site-packages/mysql/connector/abstracts.py", line 1003, in connect
self._open_connection()
File "/usr/local/lib/python3.8/site-packages/mysql/connector/connection_cext.py", line 234, in _open_connection
raise errors.get_mysql_exception(msg=exc.msg, errno=exc.errno,
sqlalchemy.exc.DatabaseError: (mysql.connector.errors.DatabaseError) 2003 (HY000): Can't connect to MySQL server on 'youyaku_ai_db:3306' (111)
恐らくだが、DBの立ち上がりが遅いのかも。
api gatewayコンテナから、テストでテーブル作成とかはDBコンテナにできているので、ポートやホスト指定は合ってそう。 ということで、docker-composeでコンテナを立ち上げた際の立ち上がりの差分が出ていそう。
https://docs.sqlalchemy.org/en/14/core/connections.html#sqlalchemy.engine.Engine.connect 試しにconnectできるかで検証すればよい?
とりあえず、テーブル作成をリトライしまくったら、動き出した。
こんどはSummarizer側で下記エラー。
ERROR: youyaku_ai_summarizer : [2021/05/05 02:51:29] Summarize Error is occured! error detail is a bytes-like object is required, not 'str'
INFO: youyaku_ai_summarizer : [2021/05/05 02:51:29] summarize process is end. process time is 0:00:00.022382
INFO: youyaku_ai_summarizer : [2021/05/05 02:51:29] summarize process result is summarizer process is failed because some summarizer error are occured.
INFO: youyaku_ai_summarizer : [2021/05/05 02:51:29] summarize process is started. time is 2021-05-05 02:51:29.681402
キューの中身をリセットするAPIとかが必要(それか、キューから取得して要約する処理のリトライ回数を制限するとか)
キューの中身って、とっても消えんのか?(consumeしても消えないのはおかしい)
https://aiokafka.readthedocs.io/en/stable/consumer.html#reading-transactional-messages
read_commitedモードにすればよさそう?
Consumerに対して、GroupIDを指定すると、下記のエラーが出る。
Group Coordinator Request failed: [Error 15] CoordinatorNotAvailableError
https://qiita.com/sigmalist/items/3b512e2ab49b07271665
Kafkaについての理解を深めた方が良い。
db_wrapperの実装が全然たりなかった・・
db_wrapperの実装を進める。
ダッシュボード側の実装では、推論リクセストを投げた後に定期的に処理ステータスを確認して、待つ必要がある。 今回のデモ向けでは、以下とする。
https://dash.plotly.com/advanced-callbacks なるほど、Dashの仕組みで、callbackからcallbackに値を渡せるっぽい。 これは、独自に非同期処理を書かなくてもいいので、楽だな。
なんかKafkaのBrokerが立ち上がらなくなったぞ・・
Traceback (most recent call last):
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.8/site-packages/uvicorn/subprocess.py", line 61, in subprocess_started
target(sockets=sockets)
File "/usr/local/lib/python3.8/site-packages/uvicorn/server.py", line 49, in run
loop.run_until_complete(self.serve(sockets=sockets))
File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/usr/local/lib/python3.8/site-packages/uvicorn/server.py", line 56, in serve
config.load()
File "/usr/local/lib/python3.8/site-packages/uvicorn/config.py", line 308, in load
self.loaded_app = import_from_string(self.app)
File "/usr/local/lib/python3.8/site-packages/uvicorn/importer.py", line 20, in import_from_string
module = importlib.import_module(module_str)
File "/usr/local/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 783, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "./api_gateway/main.py", line 30, in <module>
queue_initializer.initialize()
File "./api_gateway/../queue/kafka_client.py", line 22, in initialize
create_kafka_topics_if_needed(topics=[topic_name], client_id=self.__client_id, config=self._config)
File "./api_gateway/../queue/kafka_helper.py", line 13, in create_kafka_topics_if_needed
client = KafkaAdminClient(
File "/usr/local/lib/python3.8/site-packages/kafka/admin/client.py", line 208, in __init__
self._client = KafkaClient(metrics=self._metrics,
File "/usr/local/lib/python3.8/site-packages/kafka/client_async.py", line 244, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/local/lib/python3.8/site-packages/kafka/client_async.py", line 900, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
うーむ、、dashのcallbackだと、UI変更がトリガーだから、例えば、
って、なった時に(UI更新なしで)値をどう渡せば良いかがわからん・・
https://docs.python.org/ja/3.7/library/queue.html ダッシュボード内で、queue使うか・・。
OK。動作自体は良さそう。
また、入出力結果をDBに入れる