sakamoto-family-smile / line_sakamomo_family_api

0 stars 0 forks source link

デイリーでEDINETからデータを取得するジョブの生成 #35

Open sakamoto-family-smile opened 1 month ago

sakamoto-family-smile commented 1 month ago

概要

デイリーでEDINETからデータを取得するジョブの生成を行う。 ※デイリー実行としては、CloudRun Jobを使えば良さそう

sakamoto-family-smile commented 3 weeks ago

これやりたいな

sakamoto-family-smile commented 3 weeks ago

選択肢は下記

sakamoto-family-smile commented 3 weeks ago

とりあえずdocker化しておけば、API化もむずくないから、CloudRun Jobで作るかな

sakamoto-family-smile commented 3 weeks ago

https://blog.g-gen.co.jp/entry/dataflow-explained#%E4%BE%8B2--%E3%83%87%E3%83%BC%E3%82%BF%E3%82%B9%E3%83%88%E3%82%A2%E9%96%93%E3%81%AE%E3%83%87%E3%83%BC%E3%82%BF%E7%A7%BB%E8%A1%8C

dataflowとかを将来的にパイプラインで使うなら、コンポーネント(ファイル単位)ではちゃんと呼びやすくしておくと良さそう

sakamoto-family-smile commented 3 weeks ago

あ、もうjobは作ってたのか

sakamoto-family-smile commented 3 weeks ago

うまく重複削除をしたいと思っていたが、下記は良さそう ※geminiに聞いた

from google.cloud import bigquery

client = bigquery.Client()
table_id = 'your-project.your_dataset.your_table'

# 重複を排除する条件を指定した MERGE 文
merge_query = f"""
    MERGE `{table_id}` T
    USING (
        SELECT * FROM UNNEST(@df)
    ) S
    ON T.id = S.id  -- 重複を判定するカラムを指定
    WHEN MATCHED THEN
        UPDATE SET ...  -- 更新するカラムを指定 (必要があれば)
    WHEN NOT MATCHED THEN
        INSERT ROW  -- 挿入するカラムを指定
"""

# dataframe をクエリパラメータとして渡す
job_config = bigquery.QueryJobConfig(
    query_parameters=[
        bigquery.ArrayQueryParameter('df', 'STRUCT<...>', df.to_dict('records'))
    ]
)

# クエリを実行
query_job = client.query(merge_query, job_config=job_config)
query_job.result()
sakamoto-family-smile commented 3 weeks ago

手作業で調べた感じ、とりあえず下記を重複のキーとしてみるか ※適当に見てるので、本来は正しくないかもしれない

docID,
  submitDateTime,
  docDescription,
  docInfoEditStatus,
  issuerEdinetCode,
  formCode,
  docTypeCode,
  parentDocID
sakamoto-family-smile commented 3 weeks ago

ジョブがエラーになるが、10分間の処理時間をオーバーしているっぽい。 タイムアウトを1時間に伸ばしてみる

sakamoto-family-smile commented 3 weeks ago

https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJobConfig#google_cloud_bigquery_job_QueryJobConfig_job_timeout_ms クエリのタイムアウトに引っかかっていそう

job_timeout_msで伸ばす

sakamoto-family-smile commented 1 week ago

日数を削減したら、エラーが変わった

"Traceback (most recent call last):
  File "/work/app/main.py", line 93, in <module>
    main(duration_days=duration_days, api_key=api_key, table_id=table_id)
  File "/work/app/main.py", line 78, in main
    query_job = client.query(merge_query, job_config=job_config, timeout=3600)
  File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 3496, in query
    return _job_helpers.query_jobs_insert(
  File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/_job_helpers.py", line 159, in query_jobs_insert
    future = do_query()
  File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/_job_helpers.py", line 136, in do_query
    query_job._begin(retry=retry, timeout=timeout)
  File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/query.py", line 1383, in _begin
    super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout)
  File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/base.py", line 746, in _begin
    api_response = client._call_api(
  File "/usr/local/lib/python3.10/site-packages/google/cloud/bigquery/client.py", line 837, in _call_api
    return call()
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 293, in retry_wrapped_func
    return retry_target(
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 153, in retry_target
    _retry_error_helper(
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_base.py", line 212, in _retry_error_helper
    raise final_exc from source_exc
  File "/usr/local/lib/python3.10/site-packages/google/api_core/retry/retry_unary.py", line 144, in retry_target
    result = target()
  File "/usr/local/lib/python3.10/site-packages/google/cloud/_http/__init__.py", line 494, in api_request
    raise exceptions.from_http_response(response)
google.api_core.exceptions.BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/youyaku-ai/jobs?prettyPrint=false: Invalid value for type: STRUCT<seqNumber INT64, docID STRING, edinetCode STRING, secCode STRING, JCN STRING, filerName STRING, fundCode STRING, ordinanceCode STRING, formCode STRING, docTypeCode STRING, periodStart STRING, periodEnd STRING, submitDateTime STRING, docDescription STRING, issuerEdinetCode STRING, subjectEdinetCode STRING, subsidiaryEdinetCode STRING, currentReportReason STRING, parentDocID STRING, opeDateTime STRING, withdrawalStatus STRING, docInfoEditStatus STRING, disclosureStatus STRING, xbrlFlag STRING, pdfFlag STRING, attachDocFlag STRING, englishDocFlag STRING, csvFlag STRING, legalStatus STRING> is not a valid value"
sakamoto-family-smile commented 1 week ago

pandasの場合、文字列はobject型として格納される。

sakamoto-family-smile commented 6 days ago

うまく動かないので、以下のフローに直して、修正を試す

sakamoto-family-smile commented 6 days ago

調べてみると、submitDateTimeが、EDINET APIの日付情報とリンクしてそうなので、 submitDateTimeをbq上でも日付情報として格納するようにして、フィルタできるようにする

sakamoto-family-smile commented 6 days ago

問題なく動いてそうなので、次にデイリージョブを作る https://cloud.google.com/run/docs/execute/jobs-on-schedule?hl=ja#command-line

sakamoto-family-smile commented 6 days ago

実装は完了。 あとは定期実行が動いていたら、issueはクローズ予定。

sakamoto-family-smile commented 5 days ago

実行自体はされているが、環境変数に下記が設定されていない。 CloudScheduler経由でCloudRun Jobを実行した際に変数が渡ってないかも