Closed sakamomo554101 closed 2 years ago
これやりたい。
次はBQ対応進めようかな
https://github.com/googleapis/python-bigquery 上記に色々とサンプルがある
https://googleapis.dev/python/bigquery/latest/index.html 上記がBQのAPI情報とかある
BQだと、project_id -> dataset -> tableという階層になっている。 なので、database = datasetとすれば、現状のmysqlのAPIと構造が一致するな。
BQのテーブル作成をどうやるか。 現状はテーブルのスキーマ情報をコードにベタ書き(SQLAlchemyのモデルクラスにベタ書き)している。 上記スキーマ情報をどうBQのAPIに渡すか。
https://www.python.ambitious-engineer.com/archives/1481 上記を参考にしつつ、SQLAlchemyのスキーマ情報をBQ側に反映させる
BigQueryだと、primary keyやindexの概念がなさそう
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableFieldSchema.FIELDS.mode BQのmode(値の扱いをどうするか)は上記。
required, nullableは良いとして、REPEATEDってなんぞ?
https://programwiz.org/2021/05/29/bigquery-difference-datetime-and-timestamp/ BQだと、datetimeとtimestampの型がある。 違いは、タイムゾーンを意識するかどうか。timestampは意識している。
どっち使えば良いかなー。
https://docs.sqlalchemy.org/en/14/tutorial/metadata.html#setting-up-the-registry registry.metadataで、テーブルのスキーマを格納しているmetadataが取れる
あ、BQだと、autoincrementなカラムはどう扱えばいいんだろ?
https://cloud-textbook.com/3635/#auto_increment
そうか、、根本的に直す必要がありそう・・
UUIDをgenerateさせるのはありか
下記エラーがテスト中に出てきた。 なんだろ?
======================================================================
ERROR: setUpClass (__main__.TestBigQueryDBWrapper)
----------------------------------------------------------------------
Traceback (most recent call last):
File "test_bq_db_wrapper.py", line 20, in setUpClass
cls.db_instance.create_all_tables_if_needed()
File "../db_wrapper.py", line 372, in create_all_tables_if_needed
if bigquery_util.exist_table(self.__bq_client, table_name):
File "../bigquery_util.py", line 35, in exist_table
client.get_table(table_id)
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/google/cloud/bigquery/client.py", line 1010, in get_table
table_ref = _table_arg_to_table_ref(table, default_project=self.project)
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/google/cloud/bigquery/table.py", line 2666, in _table_arg_to_table_ref
value = TableReference.from_string(value, default_project=default_project)
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/google/cloud/bigquery/table.py", line 252, in from_string
) = _helpers._parse_3_part_id(
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/google/cloud/bigquery/_helpers.py", line 848, in _parse_3_part_id
raise ValueError(
ValueError: table_id must be a fully-qualified ID in standard SQL format, e.g., "project.dataset.table_id", got body-info
あー、そうか。。正式なパスでテーブル名を指定する必要があるんだ・・。
この場合だと、project_name.dataset_name.table_nameだな
かきのようなエラーが出て、insertが失敗する。
ValueError: Could not determine schema for table 'Table(TableReference(DatasetReference('youyaku-ai', 'test_youyaku_ai_db'), 'body_info'))'. Call client.get_table() or pass in a list of schema fields to the selected_fields argument.
ふーむ、insert_rowsのselected_fieldsを明示的に指定しないとダメっぽい。 ※つまり、テーブルのスキーマ情報を与えて、insert処理をする感じ。
うーむ、dictにして、keyとcolumn名があってればいけると思ったが、違うのか?
https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html#google.cloud.bigquery.client.Client.query queryのAPIみると、下記のようなjobとしてレスポンスが返ってくる。 ※同期的ではなく、非同期的っぽい。
JobのAPIを見ると、add_done_callbackで、終了後の処理をかける。
https://github.com/googleapis/python-bigquery/tree/main/samples 上記にQueryのサンプルも色々ありそう
SQLインジェクション的なもんだいは大丈夫かなー
query発行したら以下のエラー
google.api_core.exceptions.Forbidden: 403 POST https://bigquery.googleapis.com/bigquery/v2/projects/youyaku-ai/jobs?prettyPrint=false: Access Denied: Project youyaku-ai: User does not have bigquery.jobs.create permission in project youyaku-ai.
うーむ、テーブル作成やデータセット作成を都度やると、結構テストが遅くなるし、そもそも失敗したりする(テーブルの使えるタイミングがよくわからん)
そのため、テスト終了後にテーブルの中身を削除するか。 https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#delete_statement
削除処理実装して試してみたら、何回かは大丈夫だったが、以下のエラーが発生したら、全くインサートできなくなったぞ・・
Traceback (most recent call last):
File "test_bq_db_wrapper.py", line 43, in test_insert_body_infos
ids = self.db_instance.insert_body_infos(body_infos=body_infos)
File "../db_wrapper.py", line 429, in insert_body_infos
errors = self.__bq_client.insert_rows(table=table_full_name, rows=chunked_items, selected_fields=bq_schema_list)
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/google/cloud/bigquery/client.py", line 3410, in insert_rows
return self.insert_rows_json(table, json_rows, **kwargs)
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/google/cloud/bigquery/client.py", line 3589, in insert_rows_json
response = self._call_api(
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/google/cloud/bigquery/client.py", line 759, in _call_api
return call()
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_func
return retry_target(
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/google/api_core/retry.py", line 189, in retry_target
return target()
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/google/cloud/_http.py", line 479, in api_request
raise exceptions.from_http_response(response)
google.api_core.exceptions.NotFound: 404 POST https://bigquery.googleapis.com/bigquery/v2/projects/youyaku-ai/datasets/test_youyaku_ai_db/tables/body_info/insertAll?prettyPrint=false: Table is truncated.
https://cloud.google.com/bigquery/docs/managing-table-schemas まじか。スキーマのデータ種別を変更したりする場合は、手動対応になりそう
下記のエラーが出て困ってる・・。 データセット、テーブルを削除して再生成しても、同様のエラーなんだが、、。
Traceback (most recent call last):
File "test_bq_db_wrapper.py", line 24, in setUpClass
cls.db_instance.create_all_tables_if_needed()
File "../db_wrapper.py", line 398, in create_all_tables_if_needed
bigquery_util.create_table(self.__bq_client, dataset_name, table_name, schemas)
File "../bigquery_util.py", line 48, in create_table
client.create_table(table)
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/google/cloud/bigquery/client.py", line 725, in create_table
api_response = self._call_api(
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/google/cloud/bigquery/client.py", line 759, in _call_api
return call()
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/google/api_core/retry.py", line 286, in retry_wrapped_func
return retry_target(
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/google/api_core/retry.py", line 189, in retry_target
return target()
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/google/cloud/_http.py", line 479, in api_request
raise exceptions.from_http_response(response)
google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/youyaku-ai/datasets/test_youyaku_ai_db/tables?prettyPrint=false: Field id already exists in schema
↓
uuidをとるカラムを同一名(id)で設定してたから、スキーマがかぶっていた・・。コーディングミス
現状、タイムゾーンの設定をちゃんとしてないので、別途対応をするissueで行う。 現状はタイムゾーン度外視で対応する(BQのカラムもTIMESTAMPから変更)
job_infoのupdate処理だが、in句を使えるかを確認して、対応する。
試しに、google consoleで、BQに対して、summarize_job_infoに対してupdate処理を実行したら、以下のエラーが出てきた。
UPDATE or DELETE statement over table youyaku-ai.test_youyaku_ai_db.summarize_job_info would affect rows in the streaming buffer, which is not supported
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-manipulation-language#limitations うーわー、なるほど。。即時updateはNGということね・・。
そもそも、現状のサービスでBQ使って良いの?という話は別である。 基本insertで完結させるのが正しい。
取れる選択肢は下記。
job_info周りについて整理。
これinsert->updateじゃなくて 、insert->insert(別テーブル)で行けそうな気がする
SummarizeResultも将来的に修正が必要かな。 (まぁ、これは30分に引っかからないようにバッチ修正でもいいんだけども)
うーむ?job_logにprimary_key設定してるんだが、下記エラーに遭遇。
Traceback (most recent call last):
File "test_bq_db_wrapper.py", line 7, in <module>
from db_wrapper import BodyInfo, DBConfig, DBFactory, DBUtil, SummarizeJobInfo, SummarizeResult
File "../db_wrapper.py", line 170, in <module>
class SummarizeJobLog:
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/sqlalchemy/orm/decl_api.py", line 850, in mapped
_as_declarative(self, cls, cls.__dict__)
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/sqlalchemy/orm/decl_base.py", line 126, in _as_declarative
return _MapperConfig.setup_mapping(registry, cls, dict_, None, {})
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/sqlalchemy/orm/decl_base.py", line 177, in setup_mapping
return cfg_cls(registry, cls_, dict_, table, mapper_kw)
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/sqlalchemy/orm/decl_base.py", line 314, in __init__
self._early_mapping(mapper_kw)
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/sqlalchemy/orm/decl_base.py", line 200, in _early_mapping
self.map(mapper_kw)
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/sqlalchemy/orm/decl_base.py", line 979, in map
mapper_cls(self.cls, self.local_table, **self.mapper_args),
File "<string>", line 2, in __init__
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/sqlalchemy/util/deprecations.py", line 298, in warned
return fn(*args, **kwargs)
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/sqlalchemy/orm/mapper.py", line 686, in __init__
self._configure_pks()
File "/Users/shotasakamoto/.pyenv/versions/3.8.3/lib/python3.8/site-packages/sqlalchemy/orm/mapper.py", line 1328, in _configure_pks
raise sa_exc.ArgumentError(
sqlalchemy.exc.ArgumentError: Mapper mapped class SummarizeJobLog->summarize_job_log could not assemble any primary key columns for mapped table 'summarize_job_log'
上記は、dataclassをSummarizeJobLogに設定してなかったのが、原因。
試しに、docker-composeで、全コンテナ起動してみたが、dashboardが立ち上がらない。
The dash_core_components package is deprecated. Please replace
`import dash_core_components as dcc` with `from dash import dcc`
import dash_core_components as dcc
Traceback (most recent call last):
File "dashboard/app.py", line 147, in <module>
dashboard.initialize()
File "dashboard/app.py", line 30, in initialize
input_text_area = dbc.FormGroup([
File "/usr/local/lib/python3.8/site-packages/dash_bootstrap_components/__init__.py", line 52, in __getattr__
raise AttributeError(
AttributeError: FormGroup was deprecated in dash-bootstrap-components version 1.0.0. You are using 1.0.0. For more details please see the migration guide: https://dbc-v1.herokuapp.com/migration-guide/
https://dash-bootstrap-components.opensource.faculty.ai/migration-guide/ 上記見ると、結構deprecated(break)が多いな・・。
FormGroupがまずbreakされてえる
Card、FormGroupを差し替えるのが必要そう。
うーむ、ちょっと対応が大変そうなので、一旦v1.0への上げるのは諦めて、 下記のようにversionを制限する。
dash-bootstrap-components < 1
とりあえず、ローカル環境(DBはMySQL)では要約サービスが動いていそう。 次にBQで試す。
BQにすると、api_gatewayで下記エラーが起きてる。
Traceback (most recent call last):
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.8/site-packages/uvicorn/subprocess.py", line 76, in subprocess_started
target(sockets=sockets)
File "/usr/local/lib/python3.8/site-packages/uvicorn/server.py", line 68, in run
return asyncio.run(self.serve(sockets=sockets))
File "/usr/local/lib/python3.8/asyncio/runners.py", line 43, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/usr/local/lib/python3.8/site-packages/uvicorn/server.py", line 76, in serve
config.load()
File "/usr/local/lib/python3.8/site-packages/uvicorn/config.py", line 448, in load
self.loaded_app = import_from_string(self.app)
File "/usr/local/lib/python3.8/site-packages/uvicorn/importer.py", line 21, in import_from_string
module = importlib.import_module(module_str)
File "/usr/local/lib/python3.8/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 783, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "./api_gateway/main.py", line 31, in <module>
queue_initializer.initialize()
File "./api_gateway/../queue/kafka_client.py", line 22, in initialize
create_kafka_topics_if_needed(topics=[topic_name], client_id=self.__client_id, config=self._config)
File "./api_gateway/../queue/kafka_helper.py", line 13, in create_kafka_topics_if_needed
client = KafkaAdminClient(
File "/usr/local/lib/python3.8/site-packages/kafka/admin/client.py", line 208, in __init__
self._client = KafkaClient(metrics=self._metrics,
File "/usr/local/lib/python3.8/site-packages/kafka/client_async.py", line 244, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "/usr/local/lib/python3.8/site-packages/kafka/client_async.py", line 900, in check_version
raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable
MySQLでは再現しなさそう。
まさか、DBの初期化対応が早すぎて、Queueが立ち上がりが間に合ってないのかも。 Queueの初期化もリトライすれば良さそう。
多分、解決。 次は以下の問題。
INFO: youyaku_ai_summarizer : [2021/10/24 15:08:28] input text is {'id': '5f6ca377-b725-41d7-b28d-158252c018ab', 'body': 'これはテストです!ほんとです!'}
Traceback (most recent call last):
File "summarizer/summarizer_process.py", line 128, in <module>
main()
File "summarizer/summarizer_process.py", line 56, in main
result = loop_process(summarizer=summarizer,
File "summarizer/summarizer_process.py", line 115, in loop_process
db_instance.insert_summarize_results(result_infos=summarize_results)
File "/workspace/summarizer/../db/db_wrapper.py", line 490, in insert_summarize_results
self.__insert_infos(items, SummarizeResult)
File "/workspace/summarizer/../db/db_wrapper.py", line 521, in __insert_infos
raise DBError("insert error! error details is {}".format(errors))
db_wrapper.DBError: insert error! error details is [{'index': 0, 'errors': [{'reason': 'invalid', 'location': '', 'debugInfo': '', 'message': 'Missing required field: Msg_0_CLOUD_QUERY_TABLE.body_id.'}]}]
body_idがNoneになっている
SQLAlchemyの仕組みで、BodyInfoをインサートした後に勝手にidが入ることを期待していたが、BQではuuidを後追いで入れ直しているため、処理に整合性がなくなっている。
これは、もうuuid化した方が早いかも。
summarizer_process.loop_processを見ると、body_infoのインサート処理をどう使ってるかがわかる。
概要
DBをBQに保存するように対応 現状は、(別コンテナ化しているが)MySQLへ保存するように対応している。
上記のDBヘルパー的なインスタンスをBQ用にも作成し、データを保存できるようにする。