Open kirin-ri opened 1 year ago
import React, { ChangeEvent, useEffect, useState } from "react";
import { useForm } from 'react-hook-form';
import { Spacer } from "../spacer";
import { commonAjax } from '../../../components/commonAjax';
import * as Defs from './apiDefs';
// Message Modal
const ApiUpdateResultModal = (props: { val: Defs.Api}) => {
const tmp = JSON.parse(JSON.stringify(props.val))
const [editAPI, setEditAPI] = useState<Defs.Api>(tmp);
return (
<>
{/* Content Header (Page header) */}
{/* <div className="modal fade" id={"MessageModal"}> */}
<div className="modal fade" data-backdrop="static" id={editAPI.physical_name+"MessageModal"}>
<div className="modal-dialog modal-xl">
<div className="modal-content">
<div className="modal-header">
<h4 className="modal-title">サンプルデータ挿入 / API連携項目追加</h4>
<button type="button" className="close" data-dismiss="modal" aria-label="Close">
<span aria-hidden="true">×</span>
</button>
</div>
<div className="modal-body">
<div className="name-disp-area">
<div className="api-name">
<div className="dtl-content-width">API 論理名</div>
<div className="dtl-modal-font-color">{editAPI.logical_name}</div>
</div>
<div className="api-name">
<div className="dtl-content-width">API 物理名</div>
<div className="dtl-modal-font-color">{editAPI.physical_name}</div>
</div>
</div>
</div>
<div className="modal-body">
<div className="dividing-line"/>
</div>
<div className="modal-body">
<div className="text-center">
<div>API項目が更新されました。</div>
<div>計算ロジックが崩れるため、関連メトリクスの改修が必要です。</div>
</div>
{Spacer({"size":50})}
<div className="modal-body">
<div>以下のリンクをクリックし、編集してください。</div>
{editAPI.related_metrics.map((item, index) => (
<div key={index} className="list-contents-grid">
<p>-<a href={"/#/scm-metrics-edit/" + item} target="_blank">{item}</a></p>
{/* <div>- {item}</div> */}
</div>
))}
</div>
{Spacer({"size":50})}
<div className="text-center">
<div className="centering-btn">
<div style={{color : "red"}}>
※改修が完了するまで、このページを閉じないでください。
</div>
{Spacer({"size":25})}
<button className="btn btn-primary" data-dismiss="modal">
OK
</button>
</div>
</div>
{Spacer({"size":25})}
</div>
</div>
{/* <!-- /.modal-content --> */}
</div>
{/* <!-- /.modal-dialog --> */}
</div>
{/* <!-- /.modal --> */}
{/* /.content */}
</>
)
}
export default ApiUpdateResultModal;
以下のリンクをクリックし、編集してください。 -基準在庫(在庫数・金額):年次
-販売計画誤差率:年次
<div key={index} className="text-left">
-<a href={"/#/scm-metrics-edit/" + item} target="_blank">{item}</a>
{/* <div>- {item}</div> */}
</div>
<span style={{ marginLeft: `${index * 20}px` }}>-</span>
<div>
<div>API項目が更新中です。</div>
<div>更新状況についてはジョブステータス管理画面より確認できます。</div>
<div>更新完了後、以下のメトリクスを再設定してください。</div>
</div>
style="text-decoration: underline;"
型 'string' には型 'Properties<string | number, string & {}>' と共通のプロパティがありません。ts(2559)
import React, { ChangeEvent, useEffect, useState } from "react";
import { useForm } from 'react-hook-form';
import { Spacer } from "../spacer";
import { commonAjax } from '../../../components/commonAjax';
import * as Defs from './apiDefs';
// Message Modal
const ApiUpdateResultModal = (props: { val: Defs.Api}) => {
const tmp = JSON.parse(JSON.stringify(props.val))
const [editAPI, setEditAPI] = useState<Defs.Api>(tmp);
return (
<>
<div className="modal fade" data-backdrop="static" id={editAPI.physical_name+"MessageModal"}>
<div className="modal-dialog modal-xl">
<div className="modal-content">
<div className="modal-header">
<h4 className="modal-title">サンプルデータ挿入 / API連携項目追加</h4>
<button type="button" className="close" data-dismiss="modal" aria-label="Close">
<span aria-hidden="true">×</span>
</button>
</div>
<div className="modal-body">
<div className="name-disp-area">
<div className="api-name">
<div className="dtl-content-width">API 論理名</div>
<div className="dtl-modal-font-color">{editAPI.logical_name}</div>
</div>
<div className="api-name">
<div className="dtl-content-width">API 物理名</div>
<div className="dtl-modal-font-color">{editAPI.physical_name}</div>
</div>
</div>
</div>
<div className="modal-body">
<div className="dividing-line"/>
</div>
<div className="modal-body">
<div>
<div>API項目が更新中です。</div>
<div>更新状況については
<a href={"http://www.google.com"} target="_blank" style="text-decoration: underline;">ジョブステータス管理画面</a>
より確認できます。</div>
<div>更新完了後、以下のメトリクスを再設定してください。</div>
</div>
{Spacer({"size":15})}
<div className="modal-body">
<div>
{editAPI.related_metrics.map((item, index) => (
<div key={index} className="text-left">
・ <a href={"/#/scm-metrics-edit/" + item} target="_blank">{item}</a>
{/* <div>- {item}</div> */}
</div>
))}
</div>
</div>
{Spacer({"size":25})}
<div>
<div className="centering-btn">
<div style={{color : "red"}}>
※メトリクスの再設定が完了するまでこのページを閉じないでください。
</div>
{Spacer({"size":25})}
<div className="text-center">
<button className="btn btn-primary" data-dismiss="modal">
OK
</button>
</div>
</div>
</div>
</div>
</div>
{/* <!-- /.modal-content --> */}
</div>
{/* <!-- /.modal-dialog --> */}
</div>
{/* <!-- /.modal --> */}
{/* /.content */}
</>
)
}
export default ApiUpdateResultModal;
if (editAPI.related_metrics != null){
$("#" + editAPI.physical_name + "-dtl-modal")
.modal("hide")
.on("hidden.bs.modal", () => {
setTimeout(() =>
$("#" + editAPI.physical_name + "MessageModal")
.modal("show"))
$("#" + editAPI.physical_name + "-dtl-modal").off("hidden.bs.modal")
})
}
commonAjax
.axios({swalFire: true, loading: true})
.put(`/api/api/${editAPI.physical_name}`, apiList.outinfo[0].column)
.then((res) => {
apiList.outinfo[0].column.map((col:Defs.EditColumn)=>{
col["delete"] = false;
col["editing"] = false;
col["edited"] = false;
})
setEditAPI(apiList)
setUpdated(true)
});
})();
SELECT
A.API_PHYSICAL_NAME,
A.API_LOGICAL_NAME,
A.PROVIDE,
A.API_LINKAGE_ITEMS,
A.TAGS,
array_agg(
B.ID
)
WITHIN
GROUP(
ORDER BY B.ID
) RELATED_METRICS
FROM
API_META_INFO A
LEFT OUTER JOIN(
SELECT
tmp.ID,
item.value API_NAME
FROM
CATALOG_META_INFO tmp,
LATERAL FLATTEN(input => tmp.API_NAME) item
) B
ON
A.API_PHYSICAL_NAME = B.API_NAME
GROUP BY
A.API_PHYSICAL_NAME,
A.API_LOGICAL_NAME,
A.PROVIDE,
A.API_LINKAGE_ITEMS,
A.TAGS
ORDER BY
A.API_PHYSICAL_NAME
API_PHYSICAL_NAME: APIの物理的な名前 API_LOGICAL_NAME: APIの論理的な名前 PROVIDE: APIが提供している情報 API_LINKAGE_ITEMS: APIがリンクしているアイテム TAGS: APIに関連するタグ RELATED_METRICS: APIに関連するカタログメタ情報のメトリクスIDのリスト そして、これらの情報をAPIの物理的名前(API_PHYSICAL_NAME)でソートした結果を提供します。
APIの物理的名前が同じデータについては、同じ行にまとめられ、関連するメトリクスIDは配列としてまとめられ、ID順にソートされています。つまり、それぞれのAPIに関する情報と、それに関連するメトリクスIDのリストが得られます。
最後に、LEFT OUTER JOINを使っているため、API_META_INFOテーブルの全ての行が表示され、一致するカタログメタ情報がない場合はNULL値が表示されます。
これにより、私たちはそれぞれのAPIが何を提供し、どのアイテムとリンクしているのか、どのようなタグが付けられているのか、そしてそれに関連するメトリクスのIDが何であるかを一覧で確認することができます。
「カタログメタ情報」と「カタログ提供情報」を「ID」をキーに結合し、「ID」「カテゴリ」「IN提供情報」「OUT提供情報」を取得する
メトリクス毎に「IN提供情報」と「OUT提供情報」の「提供済みフラグ」がすべて真の場合は、返却値の「提供済みフラグ」に真を、それ以外の場合は偽を設定する
返却値に「ID」「カテゴリ」「提供済みフラグ」をメトリクス毎に設定する
このSQLは、APIの物理名をIDと見立て、APIのメタ情報とそれに関連するカタログメタ情報を結合します。APIの論理名、提供情報、リンクアイテム、タグをカテゴリとし、関連する全てのカタログメタ情報IDが存在する場合は「提供済みフラグ」を真、そうでなければ偽とします。結果として、各APIに対して「ID」、「カテゴリ」、「提供済みフラグ」が設定されたリストが生成されます。
<html>
<body>
<!--StartFragment-->
py | jobInfo.py
-- | --
URI | [GET] /api/jobs/:id
IN | id | string | ジョブID
OUT | job | object | ジョブ情報
| id | string | ジョブIDJenkinsと同じ連番にする想定
| type | string | ジョブ種別API追加API更新メトリクスデプロイ新規メトリクス追加サンプルデータ挿入個社環境コピー個社環境削除デモ環境初期化
| name | string | 処理名例:完成品販売計画_月次 サンプルデータ挿入処理
| currPhase | number | 現在の処理フェーズ番号
| totalPhase | number | 合計処理フェーズ数
| phaseDesc | string | 実行中フェーズの処理内容例:scm-metricsリポジトリ取得中
| execDate | string | 実行日時YYYY-MM-DD hh:mm:ss
| endDate | string | 処理終了日時YYYY-MM-DD hh:mm:ss
| status | string | “running” - 実行中“finished” - 正常終了“error” - 異常終了
| errorInfo | object | エラー情報正常時はnull
| title | string | エラー名
| description | string | エラー詳細(HTML可)
| userAction | string | アクション(HTML可)
| errorMsg | string | 出力されたエラーメッセージ
| dumpData | string | その他の出力情報(スタックトレース等)
<!--EndFragment-->
</body>
</html>
ジョブ情報のポーリング方法:
ジョブIDを指定して、定期的にジョブ情報を取得することでポーリングを実現します。 ジョブIDを指定してジョブ情報を取得するエンドポイントは、[GET] /api/jobs/:id の形式です。 ポーリングの頻度:
ポーリングの頻度は、ジョブの実行時間やジョブの進行状況に応じて適切に設定します。 短すぎる間隔だとサーバーに過剰な負荷をかける可能性がありますが、長すぎる間隔だとジョブの進行状況をリアルタイムで把握できない場合があります。 通常は、ジョブが実行中の場合は短めのポーリング間隔(例:数秒から数十秒)、ジョブが終了した場合は長めのポーリング間隔(例:数分から数十分)を設定します。 ポーリング終了条件:
ジョブ情報に含まれる "status" フィールドをチェックして、ジョブの実行状態を確認します。 "status" フィールドが "running" の場合はジョブが実行中として、引き続きポーリングを継続します。 "status" フィールドが "finished" または "error" の場合は、ジョブが正常終了または異常終了したとして、ポーリングを終了します。 ポーリング結果の処理:
ポーリング結果として取得したジョブ情報に含まれるフィールドを利用して、ユーザーに適切な情報を提供します。 ジョブが正常終了した場合は "phaseDesc" や "execDate" を表示して処理の進行状況や実行日時を示します。 ジョブが異常終了した場合は "errorInfo" フィールドをチェックし、エラー名やエラー詳細を表示します。 その他の出力情報については、必要に応じて表示するかログとして記録します。 エラーハンドリング:
ポーリング中にエラーが発生した場合は、エラーメッセージを表示し、ユーザーに再試行を促すか、システム管理者に問い合わせる方法を提供します。
データ挿入リクエストの送信: データの挿入リクエストをサーバーに送信します。
ポーリング開始: データ挿入リクエストが受け付けられた後にポーリングを開始します。
ポーリングの制御: ポーリングの間隔や回数を適切に制御します。短すぎる間隔だとサーバーに過剰な負荷をかける可能性がありますし、長すぎるとユーザー体験に影響を及ぼすことがあります。
結果の確認: ポーリングによってデータ挿入の結果を確認します。挿入が成功したか、エラーが発生したかを判断します。
成功時の処理: データ挿入が成功した場合は、成功メッセージを表示したり、必要な追加の処理を行ったりします。
エラー時の処理: データ挿入がエラーとなった場合は、エラーメッセージを表示したり、ユーザーに再試行を促したりします。
ポーリングの終了: データ挿入の結果が確認できた場合や、一定回数のポーリングが終了した場合にポーリングを終了します。
データ挿入リクエストの送信: データの挿入リクエストをサーバーに送信します。
ポーリング開始: データ挿入リクエストが受け付けられた後にポーリングを開始します。
ポーリングの制御: ポーリングの間隔や回数を適切に制御します。短すぎる間隔だとサーバーに過剰な負荷をかける可能性がありますし、長すぎるとユーザー体験に影響を及ぼすことがあります。
結果の確認: ポーリングによってデータ挿入の結果を確認します。挿入が成功したか、エラーが発生したかを判断します。
成功時の処理: データ挿入が成功した場合は、成功メッセージを表示したり、必要な追加の処理を行ったりします。
エラー時の処理: データ挿入がエラーとなった場合は、エラーメッセージを表示したり、ユーザーに再試行を促したりします。
ポーリングの終了: データ挿入の結果が確認できた場合や、一定回数のポーリングが終了した場合にポーリングを終了します。
データ挿入リクエストを送信: データの挿入リクエストをサーバーに送信します。
ポーリング開始: データ挿入リクエストがサーバーに受け付けられたときに、ポーリングを開始します。この時点で、データ挿入のジョブIDなどの識別子を取得します。
ポーリングの実行: データ挿入のジョブIDを使用して、一定の間隔でデータ挿入の結果を問い合わせます。これには、ジョブ情報を取得するAPIエンドポイントを使用します。例えば、上記の情報に示されているように、[GET] /api/jobs/:id のエンドポイントを使って、ジョブの状態やエラー情報を取得します。
結果の確認: ポーリングによって取得したジョブ情報を解析して、データの挿入が成功したかどうかを判断します。
ポーリングの制御: ポーリングの頻度や回数を適切に制御します。短すぎる間隔だとサーバーに過剰な負荷をかける可能性がありますし、長すぎるとユーザー体験に影響を及ぼすことがあります。ポーリングの間隔や回数は、システムの性能やデータの挿入にかかる時間に合わせて慎重に設定します。
ポーリング終了条件: データ挿入の結果が確認できた場合や、一定回数のポーリングが終了した場合に、ポーリングを終了します。
必要な情報が欠落しているか、データが規定の形式に適合していません。入力内容を確認して再度お試しください。"
if (method == "update"):
dpbDbNm = f"{PROJECT}_{ENV}".upper()
i4sf = SnowflakeAccessor(
warehouse=SNOW_WH,
database=dpbDbNm,
schema='PUBLIC',
role='ACCOUNTADMIN'
)
fquery = f'''
TRUNCATE TABLE "J_{tablename}_{id}"
'''
i4sf.execute(fquery)
i4sf.close()
import os
import json
from tools.i4dplAccsecesor import post
from tools.snowflakeAccessor import SnowflakeAccessor
SNOW_WH = os.getenv("SNOW_WH")
ENV = os.getenv("ENV")
PROJECT = os.getenv("PROJECT")
def postApiData(data: json):
env = os.getenv("ENV")
username = os.getenv("DPB_USER")
project = os.getenv("PROJECT")
host = f"{project}.{env}.ind.prd.is.a.i4square.info"
method = data["method"]
id: str = data["id"]
path = f"/v1/{project}/{id}"
id = id.upper()
tablename = project.upper()
print("id:", id)
try:
# 洗い替えはpostに対応していないので元のテーブルをdeleteしてからinsertをする
if (method == "update"):
dpbDbNm = f"{PROJECT}_{ENV}".upper()
i4sf = SnowflakeAccessor(
warehouse=SNOW_WH,
database=dpbDbNm,
schema='PUBLIC',
role='ACCOUNTADMIN'
)
fquery = f'''
TRUNCATE TABLE "J_{tablename}_{id}"
'''
i4sf.execute(fquery)
i4sf.close()
print("method:", method)
# ind-userのsecretを取得する
query = f"""
SELECT
SECRET
FROM
DPB_USER
WHERE
USER = '{username}'
"""
i4sf = SnowflakeAccessor(
database="INDUSTRIAL_DNA_DB", schema="INDUSTRIAL_DNA_SCHEMA"
)
result = i4sf.execute(query)
for items in result:
secret = items[0]
print("secret:", secret)
finally:
i4sf.close()
for item in data["json"]:
print("data:", item)
post(
username,
secret,
host,
path,
json={
"rowkey": "string",
"timestamp": "2018-01-01T00:00:00.000Z",
"transaction_id": "30-1473940241740",
"data": item,
})
return {"message": "COMPLETE"}
<html>
<body>
<!--StartFragment-->
| 項目名 | TYPE | 詳細
-- | -- | -- | --
py | jobInfo.py
URI | [GET] /api/jobs/:id
IN | id | string | ジョブID
OUT | job | object | ジョブ情報
| id | string | ジョブIDJenkinsと同じ連番にする想定
| type | string | ジョブ種別API追加API更新メトリクスデプロイ新規メトリクス追加サンプルデータ挿入個社環境コピー個社環境削除デモ環境初期化
| name | string | 処理名例:完成品販売計画_月次 サンプルデータ挿入処理
| currPhase | number | 現在の処理フェーズ番号
| totalPhase | number | 合計処理フェーズ数
| phaseDesc | string | 実行中フェーズの処理内容例:scm-metricsリポジトリ取得中
| execDate | string | 実行日時YYYY-MM-DD hh:mm:ss
| endDate | string | 処理終了日時YYYY-MM-DD hh:mm:ss
| status | string | “running” - 実行中“finished” - 正常終了“error” - 異常終了
| errorInfo | object | エラー情報正常時はnull
| message | string | エラーメッセージ
| description | string | エラー詳細説明(HTML可)
| userAction | string | アクション(HTML可)
| dumpMsg | string | 出力されたエラーメッセージ
| dumpData | string | その他の出力情報(スタックトレース等)
<!--EndFragment-->
</body>
</html>
ID string ジョブID (Jenkins連番) TYPE string ジョブ種別 NAME string 処理名 CURRPHASE number 現在の処理フェーズ番号 TOTALPHASE number 合計処理フェーズ数 PHASEDESC string 実行中フェーズの処理内容 EXECDATE string 実行日時 (YYYY-MM-DD hh:mm:ss) ENDDATE string 処理終了日時 (YYYY-MM-DD hh:mm:ss) STATUS string ジョブのステータス ("running", "finished", "error") ERRORINFO object エラー情報 (JSONオブジェクト) MESSAGE string エラーメッセージ DESCRIPTION string エラー詳細説明(HTML可) USERACTION string アクション(HTML可) DUMPMSG string 出力されたエラーメッセージ DUMPDATA string その他の出力情報(スタックトレース等)
ID ジョブID string ジョブID (Jenkins連番) TYPE ジョブ種別 string ジョブ種別 NAME 処理名 string 処理名 CURRPHASE 現在のフェーズ番号 number 現在の処理フェーズ番号 TOTALPHASE 合計フェーズ数 number 合計処理フェーズ数 PHASEDESC フェーズの説明 string 実行中フェーズの処理内容 EXECDATE 実行日時 string 実行日時 (YYYY-MM-DD hh:mm:ss) ENDDATE 終了日時 string 処理終了日時 (YYYY-MM-DD hh:mm:ss) STATUS ステータス string ジョブのステータス ("running", "finished", "error") ERRORINFO エラー情報 object エラー情報 (JSONオブジェクト) MESSAGE エラーメッセージ string エラーメッセージ DESCRIPTION エラー詳細説明 string エラー詳細説明(HTML可) USERACTION ユーザーアクション string アクション(HTML可) DUMPMSG 出力エラーメッセージ string 出力されたエラーメッセージ DUMPDATA その他出力情報 string その他の出力情報(スタックトレース等)
エラーハンドリング設計&実装:サンプルデータ打ち上げ
1.参照/更新テーブル
• ジョブステータス情報(JOB_STATUS_INFO)
論理名 物理名 TYPE 必須 備考
1 ジョブID ID STRING 〇 ジョブID (連番)
2 ジョブ種別 TYPE STRING 〇 ジョブ種別
3 処理名 NAME STRING 〇 処理名
4 現在フェーズ番号 CURRPHASE NUMBER 現在の処理フェーズ番号
5 合計フェーズ数 TOTALPHASE NUMBER 合計処理フェーズ数
6 フェーズ説明 PHASEDESC STRING 実行中フェーズの処理内容
7 実行日時 EXECDATE STRING 実行日時 (YYYY-MM-DD hh:mm:ss)
8 終了日時 ENDDATE STRING 終了日時 (YYYY-MM-DD hh:mm:ss)
9 ステータス STATUS STRING • “running” - 実行中
• “finished” - 正常終了
• “error” - 異常終了
10 エラー情報 ERRORINFO Object[] エラー情報
正常時はnull
11 エラーメッセージ MESSAGE STRING エラーメッセージ
12 エラー詳細説明 DESCRIPTION STRING エラー詳細説明
13 ユーザーアクション USERACTION STRING アクション
14 出力エラーメッセージ DUMPMSG STRING 出力されたエラーメッセージ
15 その他出力情報 DUMPDATA STRING その他の出力情報
2.発生しうるエラーおよび画面に表示させるメッセージの検討
エラーメッセージ エラー詳細 ユーザアクション 備考
ファイル拡張子誤り ファイル拡張子が正しくありません。アップロード可能なファイルはCSVかJSONのみです。 再度ファイルをアップロードしてください。 既に実装済み
既存のままで?
データバリデーションエラー 必要な情報が欠落しているか、データが規定の形式に適合していません。 入力内容を確認して再度お試しください。 現状、1件落ちても、処理は続く
インターナルサーバーエラー サーバーで問題が発生しています。 しばらくしてから再度お試しください。問題が解決しない場合は、サポートへお問い合わせください。
snowflakeへの登録失敗 データの登録が失敗しました。 しばらくしてから再度お試しください。問題が解決しない場合は、サポートへお問い合わせください。
既存データ削除エラー 既存テーブルの削除が失敗しました。 しばらくしてから再度お試しください。問題が解決しない場合は、サポートへお問い合わせください。
タイムアウトエラー(15分以上) タイムアウトしました。 しばらくしてから再度お試しください。問題が解決しない場合は、サポートへお問い合わせください。
2.テーブル挿入結果ポーリング処理の設計
4.処理完了後、ポーリング処理を停止し、以下をジョブステータス情報テーブルに格納。
論理名 物理名 TYPE 備考
1 フェーズ説明 PHASEDESC STRING • ”処理終了”
2 終了日時 ENDCDATE STRING 終了日時 (YYYY-MM-DD hh:mm:ss)
3 ステータス STATUS STRING • “finished”
5.その他エラー
処理中にインターナルサーバーエラーが発生した場合、以下をジョブステータス情報テーブルに格納。
1 終了日時 ENDCDATE STRING 終了日時 (YYYY-MM-DD hh:mm:ss)
2 ステータス STATUS STRING • “error”
3 エラー情報 ERRORINFO Object[]
4 エラーメッセージ MESSAGE STRING • ”インターナルサーバーエラー”
5 エラー詳細説明 DESCRIPTION STRING • ”サーバーで問題が発生しています。”
6 ユーザーアクション USERACTION STRING • ”しばらくしてから再度お試しください。問題が解決しない場合は、サポートへお問い合わせください”
7 出力エラーメッセージ DUMPMSG STRING 出力されたエラーメッセージ
8 その他出力情報 DUMPDATA STRING その他の出力情報
Jsonを扱うのは面倒なので全部フラットな列
import datetime
import os
import random
import string
import json
from tools.i4dplAccsecesor import post
from tools.snowflakeAccessor import SnowflakeAccessor
ENV = os.getenv("ENV")
PROJECT = os.getenv("PROJECT")
DPL_HOST = os.getenv("DPL_HOST")
DPB_USER = os.getenv("DPB_USER")
DPB_USER_DATABASE = os.getenv("DPB_USER_DATABASE")
DPB_USER_SCHEMA = os.getenv("DPB_USER_SCHEMA")
def postApiData(data: json):
method = data["method"]
id: str = data["id"]
path = f"/v1/{PROJECT}/{id}"
id = id.upper()
tablename = PROJECT.upper()
print("id:", id)
try:
# 洗い替えはpostに対応していないので元のテーブルをTRUNCATEしてからinsertをする
if method == "update":
dpbDbNm = f"{PROJECT}_{ENV}".upper()
i4sf = SnowflakeAccessor(
database=dpbDbNm,
schema="PUBLIC",
role="ACCOUNTADMIN",
)
fquery = f"""
TRUNCATE TABLE "J_{tablename}_{id}"
"""
i4sf.execute(fquery)
i4sf.close()
print("method:", method)
# ind-userのsecretを取得する
query = f"""
SELECT
SECRET
FROM
DPB_USER
WHERE
USER = '{DPB_USER}'
"""
i4sf = SnowflakeAccessor(database=DPB_USER_DATABASE, schema=DPB_USER_SCHEMA)
result = i4sf.execute(query)
for items in result:
secret = items[0]
print("secret:", secret)
finally:
i4sf.close()
rowkey = randomname(32)
for item in data["json"]:
print("data:", item)
num_2 = randomnum(2)
num_13 = randomnum(13)
transaction_id = f"{num_2}-{num_13}"
dt = datetime.datetime.now().isoformat(timespec="milliseconds")
timestamp = f"{dt}Z"
post(
DPB_USER,
secret,
DPL_HOST,
path,
json={
"rowkey": rowkey,
"timestamp": timestamp,
"transaction_id": transaction_id,
"data": item,
},
)
return {"message": "COMPLETE", "rowkey": rowkey}
def randomname(n):
return "".join(random.choices(string.ascii_letters + string.digits, k=n))
def randomnum(n):
return "".join(random.choices(string.digits, k=n))
サンプルデータ挿入
データ削除(洗い替えの場合)
Snowflake接続
データ挿入処理
DPL待ち(ポーリング)
申し訳ありません、データバリデーションとエラーハンドリングの設計に不足がありました。データバリデーションがエラーを返した場合でもポーリングを終了する必要があることを理解しました。
以下は修正されたデータバリデーションとエラーハンドリングを考慮したポーリングの設計です:
ポーリング処理の目的: データベース内のテーブルのレコード数を定期的に確認し、新しいレコードが増えなくなった場合に処理を終了するかエラーとして扱う。
ポーリングの流れ: a. データベースに接続する。 b. 指定されたテーブルのレコード数を取得する。 c. 前回のポーリング結果と比較し、レコード数が増えなくなったかどうかを判定する。 d. レコード数が増えなくなった場合はポーリングを終了し、成功として処理を完了する。 e. レコード数が増え続けている場合は次のポーリングまで待機する。 f. 最大試行回数に達した場合、エラーメッセージを作成して処理を中止する。
使用する変数と値:
poll_interval
:ポーリングの間隔(秒)。max_attempts
:最大ポーリング回数。previous_record_count
:前回のポーリング結果で取得したレコード数。error_messages
:エラーメッセージを格納するリスト。データバリデーション:
ポーリング中にデータバリデーションを行う場合、各ポーリングごとにデータの妥当性を確認する処理を追加します。データバリデーションがエラーを返した場合、ポーリングを中止し、エラーメッセージをerror_messages
リストに追加します。
エラーハンドリング:
post()
関数がエラーを返した場合もポーリングを中止し、エラーメッセージをerror_messages
リストに追加します。
この設計に基づいて、修正されたポーリング処理を実装してください。データバリデーションやエラーハンドリングの処理を組み込むことで、より信頼性のあるポーリング処理が実現されます。設計書作成の際には、関数の仕様やエラーメッセージの内容を詳細に記述して、理解しやすい設計となるようにしてください。
def post(username, secret, host, path, query="", json={}, target_status_code=200):
url = get_url(protocol, host, path, query)
headers = {
"X_Consumer_Username": "test",
"Content-Type": "application/json",
"Host": host,
}
auth = KongHMAC(username, secret)
try:
# print("target URL:", get_url(protocol, host, path, query))
# print("json:",json)
res = requests.post(url, auth=auth, headers=headers, json=json, timeout=TIMEOUT)
result = "[{}] status:{} body:{}".format(
datetime.datetime.now(), res.status_code, res.text
)
if res.status_code != 200:
print("api post error")
print(result)
raise InternalServerError
except Exception as e:
print("[{}] {}".format(datetime.datetime.now(), e.args))
ジョブIDを採番する。採番にはシーケンスを使用する。
CREATE SEQUENCE JOB_STATUS_INFO_ID; ステータスに「running」、フェーズ番号に0、実行日時に現在日時を設定しレコードを生成する
合計処理フェーズ数もあらかじめ設定する
機能の本処理を非同期で起動し、ジョブIDを画面に返却する
1.POST AP後、以下の内容をジョブステータス情報テーブルに格納。
import datetime
import os
import random
import string
import json
from tools.i4dplAccsecesor import post
from tools.snowflakeAccessor import SnowflakeAccessor
ENV = os.getenv("ENV")
PROJECT = os.getenv("PROJECT")
DPL_HOST = os.getenv("DPL_HOST")
DPB_USER = os.getenv("DPB_USER")
DPB_USER_DATABASE = os.getenv("DPB_USER_DATABASE")
DPB_USER_SCHEMA = os.getenv("DPB_USER_SCHEMA")
def postApiData(data: json):
method = data["method"]
id: str = data["id"]
path = f"/v1/{PROJECT}/{id}"
id = id.upper()
tablename = PROJECT.upper()
print("id:", id)
try:
# snowflake connection
obj = SnowflakeAccessor(warehouse=SNOW_WH,
database=SNOW_DB,
schema=SNOW_SCHEMA)
#ジョブID生成
max_job_id_query = "SELECT MAX(ID) FROM JOB_STATUS_INFO"
result = obj.execute(max_job_id_query)
max_job_id = [0][0]
if max_job_id is not None:
job_id =max_job_id + 1
else:
job_id = 1
#insert
job_type = "サンプルデータ挿入"
process_name = id + "サンプルデータ挿入処理"
current_phase = 0
total_phases = 4
execution_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
status = "running"
insert_start_query = f"""
INSERT INTO JOB_STATUS_INFO (
ID,TYPE,NAME,CURRPHASE,TOTALPHASE,EXECDATE,STATUS
)
SELECT
'{job_id}',
'{job_type}',
'{process_name}',
'{current_phase}',
'{total_phases}',
'{execution_date}',
'{status}'
;
"""
obj.execute(insert_start_query)
finally:
obj.close()
try:
PHASEDESC = "既存データ削除中"
current_phase = 1
# 洗い替えはpostに対応していないので元のテーブルをTRUNCATEしてからinsertをする
if method == "update":
dpbDbNm = f"{PROJECT}_{ENV}".upper()
i4sf = SnowflakeAccessor(
database=dpbDbNm,
schema="PUBLIC",
role="ACCOUNTADMIN",
)
fquery = f"""
TRUNCATE TABLE "J_{tablename}_{id}"
"""
i4sf.execute(fquery)
i4sf.close()
print("method:", method)
# ind-userのsecretを取得する
query = f"""
SELECT
SECRET
FROM
DPB_USER
WHERE
USER = '{DPB_USER}'
"""
i4sf = SnowflakeAccessor(database=DPB_USER_DATABASE, schema=DPB_USER_SCHEMA)
result = i4sf.execute(query)
for items in result:
secret = items[0]
print("secret:", secret)
finally:
i4sf.close()
rowkey = randomname(32)
for item in data["json"]:
print("data:", item)
num_2 = randomnum(2)
num_13 = randomnum(13)
transaction_id = f"{num_2}-{num_13}"
dt = datetime.datetime.now().isoformat(timespec="milliseconds")
timestamp = f"{dt}Z"
post(
DPB_USER,
secret,
DPL_HOST,
path,
json={
"rowkey": rowkey,
"timestamp": timestamp,
"transaction_id": transaction_id,
"data": item,
},
)
return {"message": "COMPLETE", "rowkey": rowkey}
def randomname(n):
return "".join(random.choices(string.ascii_letters + string.digits, k=n))
def randomnum(n):
return "".join(random.choices(string.digits, k=n))
import datetime
import os
import random
import string
import json
from tools.i4dplAccsecesor import post
from tools.snowflakeAccessor import SnowflakeAccessor
ENV = os.getenv("ENV")
PROJECT = os.getenv("PROJECT")
DPL_HOST = os.getenv("DPL_HOST")
DPB_USER = os.getenv("DPB_USER")
DPB_USER_DATABASE = os.getenv("DPB_USER_DATABASE")
DPB_USER_SCHEMA = os.getenv("DPB_USER_SCHEMA")
def postApiData(data: json):
method = data["method"]
id: str = data["id"]
path = f"/v1/{PROJECT}/{id}"
id = id.upper()
tablename = PROJECT.upper()
print("id:", id)
try:
# snowflake connection
obj = SnowflakeAccessor(warehouse=SNOW_WH,
database=SNOW_DB,
schema=SNOW_SCHEMA)
#ジョブID生成
max_job_id_query = "SELECT MAX(ID) FROM JOB_STATUS_INFO"
result = obj.execute(max_job_id_query)
max_job_id = [0][0]
if max_job_id is not None:
job_id =max_job_id + 1
else:
job_id = 1
#insert
job_type = "サンプルデータ挿入"
process_name = id + "サンプルデータ挿入処理"
current_phase = 0
total_phases = 4
execution_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
status = "running"
insert_start_query = f"""
INSERT INTO JOB_STATUS_INFO (
ID,TYPE,NAME,CURRPHASE,TOTALPHASE,EXECDATE,STATUS
)
SELECT
'{job_id}',
'{job_type}',
'{process_name}',
'{current_phase}',
'{total_phases}',
'{execution_date}',
'{status}'
;
"""
obj.execute(insert_start_query)
finally:
obj.close()
try:
PHASEDESC = "既存データ削除中"
current_phase = 1
# 洗い替えはpostに対応していないので元のテーブルをTRUNCATEしてからinsertをする
if method == "update":
dpbDbNm = f"{PROJECT}_{ENV}".upper()
i4sf = SnowflakeAccessor(
database=dpbDbNm,
schema="PUBLIC",
role="ACCOUNTADMIN",
)
fquery = f"""
TRUNCATE TABLE "J_{tablename}_{id}"
"""
i4sf.execute(fquery)
print("method:", method)
finally:
i4sf.close()
try:
# ind-userのsecretを取得する
query = f"""
SELECT
SECRET
FROM
DPB_USER
WHERE
USER = '{DPB_USER}'
"""
i4sf = SnowflakeAccessor(database=DPB_USER_DATABASE, schema=DPB_USER_SCHEMA)
result = i4sf.execute(query)
for items in result:
secret = items[0]
print("secret:", secret)
except Exception as e:
print(e)
raise InternalServerError
finally:
i4sf.close()
rowkey = randomname(32)
for item in data["json"]:
print("data:", item)
num_2 = randomnum(2)
num_13 = randomnum(13)
transaction_id = f"{num_2}-{num_13}"
dt = datetime.datetime.now().isoformat(timespec="milliseconds")
timestamp = f"{dt}Z"
post(
DPB_USER,
secret,
DPL_HOST,
path,
json={
"rowkey": rowkey,
"timestamp": timestamp,
"transaction_id": transaction_id,
"data": item,
},
)
return {"message": "COMPLETE", "rowkey": rowkey}
def randomname(n):
return "".join(random.choices(string.ascii_letters + string.digits, k=n))
def randomnum(n):
return "".join(random.choices(string.digits, k=n))
import datetime
import os
import random
import string
import json
from tools.i4dplAccsecesor import post
from tools.snowflakeAccessor import SnowflakeAccessor
ENV = os.getenv("ENV")
PROJECT = os.getenv("PROJECT")
DPL_HOST = os.getenv("DPL_HOST")
DPB_USER = os.getenv("DPB_USER")
DPB_USER_DATABASE = os.getenv("DPB_USER_DATABASE")
DPB_USER_SCHEMA = os.getenv("DPB_USER_SCHEMA")
def postApiData(data: json):
method = data["method"]
id: str = data["id"]
path = f"/v1/{PROJECT}/{id}"
id = id.upper()
tablename = PROJECT.upper()
print("id:", id)
try:
# snowflake connection
obj = SnowflakeAccessor(warehouse=SNOW_WH,
database=SNOW_DB,
schema=SNOW_SCHEMA)
#ジョブID生成
max_job_id_query = "SELECT MAX(ID) FROM JOB_STATUS_INFO"
result = obj.execute(max_job_id_query)
max_job_id = [0][0]
if max_job_id is not None:
job_id =max_job_id + 1
else:
job_id = 1
#insert
job_type = "サンプルデータ挿入"
process_name = id + "サンプルデータ挿入処理"
current_phase = 0
total_phases = 4
execution_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
status = "running"
insert_start_query = f"""
INSERT INTO JOB_STATUS_INFO (
ID,TYPE,NAME,CURRPHASE,TOTALPHASE,EXECDATE,STATUS
)
SELECT
'{job_id}',
'{job_type}',
'{process_name}',
'{current_phase}',
'{total_phases}',
'{execution_date}',
'{status}'
;
"""
obj.execute(insert_start_query)
finally:
obj.close()
try:
phasedesc = "既存データ削除中"
current_phase = 1
update_status(phasedesc,current_phase)
# 洗い替えはpostに対応していないので元のテーブルをTRUNCATEしてからinsertをする
if method == "update":
dpbDbNm = f"{PROJECT}_{ENV}".upper()
i4sf = SnowflakeAccessor(
database=dpbDbNm,
schema="PUBLIC",
role="ACCOUNTADMIN",
)
fquery = f"""
TRUNCATE TABLE "J_{tablename}_{id}"
"""
i4sf.execute(fquery)
print("method:", method)
except Exception as e:
message = "既存データ削除エラー"
description = "既存データの削除が失敗しました。"
useaction = "本画面のスクリーンショットを取得し、サポートへお問い合わせください。"
dumpmsg = str(e)
dumpdata = ""
handle_error(message,description,useaction,dumpmsg,dumpdata)
finally:
i4sf.close()
try:
phasedesc = "snowflake接続中"
current_phase = 2
update_status(phasedesc,current_phase)
# ind-userのsecretを取得する
query = f"""
SELECT
SECRET
FROM
DPB_USER
WHERE
USER = '{DPB_USER}'
"""
i4sf = SnowflakeAccessor(database=DPB_USER_DATABASE, schema=DPB_USER_SCHEMA)
result = i4sf.execute(query)
for items in result:
secret = items[0]
print("secret:", secret)
except Exception as e:
message = "インターナルサーバーエラー"
description = "サーバーで問題が発生しています。"
useaction = "本画面のスクリーンショットを取得し、サポートへお問い合わせください。"
dumpmsg = str(e)
dumpdata = ""
handle_error(message,description,useaction,dumpmsg,dumpdata)
raise InternalServerError
finally:
i4sf.close()
rowkey = randomname(32)
for item in data["json"]:
print("data:", item)
num_2 = randomnum(2)
num_13 = randomnum(13)
transaction_id = f"{num_2}-{num_13}"
dt = datetime.datetime.now().isoformat(timespec="milliseconds")
timestamp = f"{dt}Z"
post(
DPB_USER,
secret,
DPL_HOST,
path,
json={
"rowkey": rowkey,
"timestamp": timestamp,
"transaction_id": transaction_id,
"data": item,
},
)
return {"message": "COMPLETE", "rowkey": rowkey}
def randomname(n):
return "".join(random.choices(string.ascii_letters + string.digits, k=n))
def randomnum(n):
return "".join(random.choices(string.digits, k=n))
def update_status(currphase,phasedesc):
try:
update_query = f"""
UPDATE
JOB_STATUS_INFO
SET
CURRPHASE = '{currphase}',
PHASEDESC = ' {phasedesc}',
WHERE
ID = '{job_id}'
;
"""
obj = SnowflakeAccessor(warehouse=SNOW_WH,
database=SNOW_DB,
schema=SNOW_SCHEMA)
obj.execute(update_query)
finally:
obj.close()
def handle_error(message,description,useaction,dumpmsg,dumpdata):
endDate = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
try:
error_query = f"""
INSERT INTO JOB_STATUS_INFO (
ENDDATE,STATUS,MESSAGE,DESCRIPTION,USERACTION,DUMPMSG,DUMPDATA
)
SELECT
'{endDate}',
'error',
'{message}',
'{description}',
'{useaction}',
'{dumpmsg}',
'{dumpdata}'
;
"""
obj = SnowflakeAccessor(warehouse=SNOW_WH,
database=SNOW_DB,
schema=SNOW_SCHEMA)
obj.execute(error_query)
finally:
obj.close()