Open kirin-ri opened 8 months ago
basePath: /v1
definitions:
Bytelength3_10:
description: 'byteLength用parameter
3byteから10byteの文字を許容する
'
example: Test"Data
type: string
x-validations:
byteLength:
max: 10
min: 3
DataCount:
description: 取得対象データ件数
example: 1
type: integer
GetResponse:
properties:
data_count:
$ref: '#/definitions/DataCount'
response_body:
description: レスポンスデータ本体を格納するノード。
items:
$ref: '#/definitions/GetTrn'
type: array
response_limit:
$ref: '#/definitions/ResponseLimit'
result_code:
$ref: '#/definitions/ResultCode'
result_msg:
$ref: '#/definitions/ResultMsg'
required:
- result_code
- result_msg
- data_count
type: object
GetTrn:
properties:
data:
description: JSON形式のデータ(構成内容は問わない)
example:
address: 東京都江東区豊洲3-3-3 豊洲センタービル
company: NTTデータ
type: object
received_time:
$ref: '#/definitions/ReceivedTime'
registered_datetime:
$ref: '#/definitions/RegisteredDateTime'
rowkey:
$ref: '#/definitions/RowKey'
sent_from_id:
$ref: '#/definitions/SentFromId'
sent_from_ip:
$ref: '#/definitions/SentFromIp'
sent_time:
$ref: '#/definitions/SentTime'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
transaction_type:
$ref: '#/definitions/TransactionType'
type: object
Max10:
example: 8
maximum: 10
type: integer
Max15.5:
example: 8.7
format: double
maximum: 15.5
type: number
Min2.2:
example: 4.3
format: double
minimum: 2.2
type: number
Min3:
example: 5
minimum: 3
type: integer
Overwrite:
description: 'データ上書判定フラグ
Trueの場合、同一トランザクションIDのデータが存在したときにデータを上書きする。
'
example: false
type: boolean
ReceivedTime:
description: 登録されている受信日時
example: '2016-02-22T12:30:00.000Z'
type: string
RegisteredDateTime:
description: データレイク登録時刻 ※ISO 8601形式でUTC時刻
example: '2016-02-22T12:30:00.000Z'
type: string
ResponseBody:
properties:
result_code:
$ref: '#/definitions/ResultCode'
result_msg:
$ref: '#/definitions/ResultMsg'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- result_code
- result_msg
- transaction_id
type: object
ResponseLimit:
description: データ取得件数の閾値
example: 5000
type: integer
ResultCode:
description: '0: OK, 0以外の数値は[エラーコード]'
format: int32
type: integer
ResultMsg:
description: レスポンスメッセージ。
example: OK
type: string
RowKey:
description: インプットのユニークキー(データレイクでの主キー)
type: string
SentFromId:
description: 登録されている送信者ID
example: TESTUSER
type: string
SentFromIp:
description: 登録されている送信元IP
example: 255.255.255.255
type: string
SentTime:
description: 登録されている送信日時
example: '2016-02-22T12:30:00.000Z'
type: string
Timestamp:
description: データ生成時刻 ※ISO 8601形式でUTC時刻
example: '2016-02-22T12:30:00.000Z'
pattern: ^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}.\\d{3}Z$
type: string
TransactionId:
description: レスポンスのユニークID(問い合わせ時の調査用)
example: 30-1473940241740
type: string
TransactionType:
description: トランザクションの種類(INSERT/UPDATE/DELETE)
example: INS
type: string
delReq:
properties:
rowkey:
$ref: '#/definitions/RowKey'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- rowkey
- timestamp
- transaction_id
type: object
noValidationPostReq:
properties:
data:
description: JSON形式のデータ(構成内容は問わない)
example:
address: 東京都江東区豊洲3-3-3 豊洲センタービル
company: NTTデータ
type: object
overwrite:
$ref: '#/definitions/Overwrite'
rowkey:
$ref: '#/definitions/RowKey'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- rowkey
type: object
noValidationPutReq:
properties:
data:
description: JSON形式のデータ(構成内容は問わない)
example:
address: 東京都江東区豊洲3-3-3 豊洲センタービル
company: NTTデータ
type: object
rowkey:
$ref: '#/definitions/RowKey'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- rowkey
- transaction_id
type: object
validationData:
properties:
arr_item:
example:
- 1.0
- 2.0
- 3.0
items:
format: double
type: number
type: array
bytelength3_10:
$ref: '#/definitions/Bytelength3_10'
max10:
$ref: '#/definitions/Max10'
max15.5:
$ref: '#/definitions/Max15.5'
min2.2:
$ref: '#/definitions/Min2.2'
min3:
$ref: '#/definitions/Min3'
obj_item:
description: JSON形式のデータ(構成内容は問わない)
example:
Object: Item
boolean: false
type: object
type: object
validationPostReq:
properties:
data:
$ref: '#/definitions/validationData'
overwrite:
$ref: '#/definitions/Overwrite'
rowkey:
$ref: '#/definitions/RowKey'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- rowkey
- data
type: object
validationPutReq:
properties:
data:
$ref: '#/definitions/validationData'
rowkey:
$ref: '#/definitions/RowKey'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- rowkey
- transaction_id
- data
type: object
host: dnasb.stg.dna.prd.is.a.i4square.info
info:
title: デモアプリ
version: 2.0.0
parameters:
DatekeyParam:
description: '参照したい年月を指定します
※ISO 8601形式でUTC時刻
'
in: query
name: datekey
required: false
type: string
DeleteReqParam:
description: data部のValidationチェックは行わない
in: body
name: bodyStr
required: true
schema:
$ref: '#/definitions/delReq'
FromTimestampParam:
description: '参照したい期間(FROM)を指定します
※ISO 8601形式でUTC時刻
'
in: query
name: from_timestamp
required: false
type: string
LimitByTimestampParam:
description: 'Timestampでソートをした際の取得最大件数を指定します
'
in: query
name: limit_by_timestamp
required: false
type: string
NoValidationPostReqParam:
description: data部のValidationチェックは行わない
in: body
name: bodyStr
required: true
schema:
$ref: '#/definitions/noValidationPostReq'
NoValidationPutReqParam:
description: data部のValidationチェックは行わない
in: body
name: bodyStr
required: true
schema:
$ref: '#/definitions/noValidationPutReq'
RowKeyParam:
description: '参照したいRowKeyを指定します
'
in: query
name: rowkey
required: true
type: string
SentFromIdParam:
description: '参照したい送信者IDを指定します。
'
in: query
name: sent_from_id
required: false
type: string
ToTimestampParam:
description: '参照したい期間(TO)を指定します
※ISO 8601形式でUTC時刻
'
in: query
name: to_timestamp
required: false
type: string
TransactionIdParam:
description: '参照したいトランザクションIDを指定します
'
in: query
name: transaction_id
required: false
type: string
TransactionTypeParam:
description: '参照したいトランザクションタイプを指定します
'
in: query
name: transaction_type
required: false
type: string
ValidationPostReqParam:
description: validationチェック用
in: body
name: bodyStr
required: true
schema:
$ref: '#/definitions/validationPostReq'
ValidationPutReqParam:
description: validationチェック用
in: body
name: bodyStr
required: true
schema:
$ref: '#/definitions/validationPutReq'
paths:
/dnasb/components_master:
get:
consumes:
- application/json
parameters:
- $ref: '#/parameters/RowKeyParam'
- $ref: '#/parameters/DatekeyParam'
- $ref: '#/parameters/FromTimestampParam'
- $ref: '#/parameters/ToTimestampParam'
- $ref: '#/parameters/LimitByTimestampParam'
- $ref: '#/parameters/TransactionIdParam'
produces:
- application/json
responses:
'200':
description: OK
schema:
$ref: '#/definitions/GetResponse'
'400':
description: Bad Request
'404':
description: Not Found
'500':
description: Internal Server Error
summary: 構成品目マスタ_BOM取得
post:
consumes:
- application/json
parameters:
- description: validationチェック用
in: body
name: bodyStr
required: true
schema:
properties:
data:
properties:
COMPANY_CODE_TEST:
description: 会社コード
required: true
type: string
x-validations:
byteLength:
max: 255
COMPONENT:
description: 構成品目_子品目
required: true
type: string
x-validations:
byteLength:
max: 255
ITEM_CODE:
description: 品目コード_親品目
required: true
type: string
x-validations:
byteLength:
max: 255
ITEM_CONFIGURATION_LEVEL:
description: 品目構成レベル
required: true
type: string
x-validations:
byteLength:
max: 255
ORDER_RATIO:
description: 発注比率
type: string
x-validations:
byteLength:
max: 255
PARTS_NUMBER:
description: 員数
format: int32
required: true
type: integer
SUPPLIER_CODE:
description: 取引先コード_サプライヤ
required: true
type: string
x-validations:
byteLength:
max: 255
type: object
overwrite:
$ref: '#/definitions/Overwrite'
rowkey:
$ref: '#/definitions/RowKey'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- rowkey
- data
type: object
produces:
- application/json
responses:
'200':
description: OK
schema:
$ref: '#/definitions/ResponseBody'
'400':
description: Bad Request
'500':
description: Internal Server Error
summary: 構成品目マスタ_BOM登録
x-edw:
span: 1
withHeader: true
x-object: true
x-parallels: 10
import requests
import yaml
import json
from jsonschema import validate
from jsonschema.exceptions import ValidationError
# GitHub 上的 YAML 文件 URL
yaml_url = "https://raw.githubusercontent.com/yourusername/yourrepo/yourbranch/yourfile.yml"
def get_yaml_from_github(url):
response = requests.get(url)
if response.status_code == 200:
return yaml.safe_load(response.text)
else:
raise Exception(f"Failed to fetch YAML file: HTTP {response.status_code}")
def convert_yaml_to_json_schema(yaml_rules):
# 这里添加逻辑将 YAML 规则转换为 JSON Schema
# 具体实现取决于 YAML 规则的格式
# 以下是一个简化的示例转换,您可能需要根据实际情况调整
return yaml_rules # 假设 YAML 规则已经是 JSON Schema 兼容格式
def validate_json_data(data, schema):
try:
validate(instance=data, schema=schema)
return True, None # 有效数据
except ValidationError as e:
return False, e.message # 验证失败
def main():
try:
# 从 GitHub 获取 YAML 规则
yaml_rules = get_yaml_from_github(yaml_url)
# 转换 YAML 规则为 JSON Schema
json_schema = convert_yaml_to_json_schema(yaml_rules)
# 假设我们有一些要验证的 JSON 数据
json_data = {
# 这里放入您的测试数据
}
# 验证 JSON 数据
is_valid, error = validate_json_data(json_data, json_schema)
if is_valid:
print("Data is valid.")
# 这里可以继续将数据导入 Snowflake
else:
print(f"Data validation failed: {error}")
# 数据不合规,处理或丢弃
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
main()
/dnasb/t1_priprt_shpt:
get:
consumes:
- application/json
parameters:
- $ref: '#/parameters/RowKeyParam'
- $ref: '#/parameters/DatekeyParam'
- $ref: '#/parameters/FromTimestampParam'
- $ref: '#/parameters/ToTimestampParam'
- $ref: '#/parameters/LimitByTimestampParam'
- $ref: '#/parameters/TransactionIdParam'
produces:
- application/json
responses:
'200':
description: OK
schema:
$ref: '#/definitions/GetResponse'
'400':
description: Bad Request
'404':
description: Not Found
'500':
description: Internal Server Error
summary: 一次サプライヤ_一次部品_出荷実績取得
post:
consumes:
- application/json
parameters:
- description: validationチェック用
in: body
name: bodyStr
required: true
schema:
properties:
data:
properties:
EXEC_DATE:
description: 処理日付
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
required: true
type: string
FACTORY_NAME:
description: 出荷先:生産拠点
required: true
type: string
x-validations:
byteLength:
max: 255
PARTS_NO:
description: 一次部品_部品番号
required: true
type: string
x-validations:
byteLength:
max: 255
PURCHASE_ORDER_NO:
description: PO_注文No
required: true
type: string
x-validations:
byteLength:
max: 255
QTY:
description: 数量
format: int32
required: true
type: integer
SHIPMENT_ATD:
description: 納入日
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
required: true
type: string
SHIP_DATE:
description: 出荷日
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
required: true
type: string
SUPPLIER_NAME:
description: 出荷元:一次サプライヤ
required: true
type: string
x-validations:
byteLength:
max: 255
type: object
overwrite:
$ref: '#/definitions/Overwrite'
rowkey:
$ref: '#/definitions/RowKey'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- rowkey
- data
type: object
produces:
- application/json
responses:
'200':
description: OK
schema:
$ref: '#/definitions/ResponseBody'
'400':
description: Bad Request
'500':
description: Internal Server Error
summary: 一次サプライヤ_一次部品_出荷実績登録
x-edw:
span: 1
withHeader: true
x-object: true
x-parallels: 10
# 受信ファイル読み込み
with open(tmpFilePath) as f:
data = json.load(f)
dataList = []
for item in data:
import json
from marshmallow import ValidationError
# 假设你已经有了动态生成的Schema: DynamicSchema
# 受信文件路径
tmpFilePath = 'path_to_your_json_file.json'
# 读取文件
with open(tmpFilePath) as f:
data = json.load(f)
# 创建一个空列表来收集验证通过的数据
dataList = []
# 创建Schema实例
schema = DynamicSchema()
# 遍历文件中的每个项目
for item in data:
try:
# 验证数据
validated_data = schema.load(item)
# 如果数据验证通过,添加到dataList中
dataList.append(validated_data)
except ValidationError as err:
# 如果验证失败,打印错误信息
print(f"Validation error for item {item}: {err.messages}")
# 现在,dataList包含了所有验证通过的项目
try:
# yaml読み込み
to_path = os.getenv("TO_PATH")
with open(f"{to_path}/input.yml") as file:
obj = yaml.safe_load(file)
# 假设你已经有了动态生成的Schema: DynamicSchema
# jsonファイル読み込み
with open(tmpFilePath) as f:
data = json.load(f)
# 创建一个空列表来收集验证通过的数据
dataList = []
# 创建Schema实例
schema = DynamicSchema()
# 遍历文件中的每个项目
for item in data:
try:
# 验证数据
validated_data = schema.load(item)
# 如果数据验证通过,添加到dataList中
dataList.append(validated_data)
except ValidationError as err:
# 如果验证失败,打印错误信息
print(f"Validation error for item {item}: {err.messages}")
basePath: /v1
definitions:
Bytelength3_10:
description: 'byteLength用parameter
3byteから10byteの文字を許容する
'
example: Test"Data
type: string
x-validations:
byteLength:
max: 10
min: 3
DataCount:
description: 取得対象データ件数
example: 1
type: integer
GetResponse:
properties:
data_count:
$ref: '#/definitions/DataCount'
response_body:
description: レスポンスデータ本体を格納するノード。
items:
$ref: '#/definitions/GetTrn'
type: array
response_limit:
$ref: '#/definitions/ResponseLimit'
result_code:
$ref: '#/definitions/ResultCode'
result_msg:
$ref: '#/definitions/ResultMsg'
required:
- result_code
- result_msg
- data_count
type: object
GetTrn:
properties:
data:
description: JSON形式のデータ(構成内容は問わない)
example:
address: 東京都江東区豊洲3-3-3 豊洲センタービル
company: NTTデータ
type: object
received_time:
$ref: '#/definitions/ReceivedTime'
registered_datetime:
$ref: '#/definitions/RegisteredDateTime'
rowkey:
$ref: '#/definitions/RowKey'
sent_from_id:
$ref: '#/definitions/SentFromId'
sent_from_ip:
$ref: '#/definitions/SentFromIp'
sent_time:
$ref: '#/definitions/SentTime'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
transaction_type:
$ref: '#/definitions/TransactionType'
type: object
Max10:
example: 8
maximum: 10
type: integer
Max15.5:
example: 8.7
format: double
maximum: 15.5
type: number
Min2.2:
example: 4.3
format: double
minimum: 2.2
type: number
Min3:
example: 5
minimum: 3
type: integer
Overwrite:
description: 'データ上書判定フラグ
Trueの場合、同一トランザクションIDのデータが存在したときにデータを上書きする。
'
example: false
type: boolean
ReceivedTime:
description: 登録されている受信日時
example: '2016-02-22T12:30:00.000Z'
type: string
RegisteredDateTime:
description: データレイク登録時刻 ※ISO 8601形式でUTC時刻
example: '2016-02-22T12:30:00.000Z'
type: string
ResponseBody:
properties:
result_code:
$ref: '#/definitions/ResultCode'
result_msg:
$ref: '#/definitions/ResultMsg'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- result_code
- result_msg
- transaction_id
type: object
ResponseLimit:
description: データ取得件数の閾値
example: 5000
type: integer
ResultCode:
description: '0: OK, 0以外の数値は[エラーコード]'
format: int32
type: integer
ResultMsg:
description: レスポンスメッセージ。
example: OK
type: string
RowKey:
description: インプットのユニークキー(データレイクでの主キー)
type: string
SentFromId:
description: 登録されている送信者ID
example: TESTUSER
type: string
SentFromIp:
description: 登録されている送信元IP
example: 255.255.255.255
type: string
SentTime:
description: 登録されている送信日時
example: '2016-02-22T12:30:00.000Z'
type: string
Timestamp:
description: データ生成時刻 ※ISO 8601形式でUTC時刻
example: '2016-02-22T12:30:00.000Z'
pattern: ^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}.\\d{3}Z$
type: string
TransactionId:
description: レスポンスのユニークID(問い合わせ時の調査用)
example: 30-1473940241740
type: string
TransactionType:
description: トランザクションの種類(INSERT/UPDATE/DELETE)
example: INS
type: string
delReq:
properties:
rowkey:
$ref: '#/definitions/RowKey'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- rowkey
- timestamp
- transaction_id
type: object
noValidationPostReq:
properties:
data:
description: JSON形式のデータ(構成内容は問わない)
example:
address: 東京都江東区豊洲3-3-3 豊洲センタービル
company: NTTデータ
type: object
overwrite:
$ref: '#/definitions/Overwrite'
rowkey:
$ref: '#/definitions/RowKey'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- rowkey
type: object
noValidationPutReq:
properties:
data:
description: JSON形式のデータ(構成内容は問わない)
example:
address: 東京都江東区豊洲3-3-3 豊洲センタービル
company: NTTデータ
type: object
rowkey:
$ref: '#/definitions/RowKey'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- rowkey
- transaction_id
type: object
validationData:
properties:
arr_item:
example:
- 1.0
- 2.0
- 3.0
items:
format: double
type: number
type: array
bytelength3_10:
$ref: '#/definitions/Bytelength3_10'
max10:
$ref: '#/definitions/Max10'
max15.5:
$ref: '#/definitions/Max15.5'
min2.2:
$ref: '#/definitions/Min2.2'
min3:
$ref: '#/definitions/Min3'
obj_item:
description: JSON形式のデータ(構成内容は問わない)
example:
Object: Item
boolean: false
type: object
type: object
validationPostReq:
properties:
data:
$ref: '#/definitions/validationData'
overwrite:
$ref: '#/definitions/Overwrite'
rowkey:
$ref: '#/definitions/RowKey'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- rowkey
- data
type: object
validationPutReq:
properties:
data:
$ref: '#/definitions/validationData'
rowkey:
$ref: '#/definitions/RowKey'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- rowkey
- transaction_id
- data
type: object
host: dnasb.stg.dna.prd.is.a.i4square.info
info:
title: デモアプリ
version: 2.0.0
parameters:
DatekeyParam:
description: '参照したい年月を指定します
※ISO 8601形式でUTC時刻
'
in: query
name: datekey
required: false
type: string
DeleteReqParam:
description: data部のValidationチェックは行わない
in: body
name: bodyStr
required: true
schema:
$ref: '#/definitions/delReq'
FromTimestampParam:
description: '参照したい期間(FROM)を指定します
※ISO 8601形式でUTC時刻
'
in: query
name: from_timestamp
required: false
type: string
LimitByTimestampParam:
description: 'Timestampでソートをした際の取得最大件数を指定します
'
in: query
name: limit_by_timestamp
required: false
type: string
NoValidationPostReqParam:
description: data部のValidationチェックは行わない
in: body
name: bodyStr
required: true
schema:
$ref: '#/definitions/noValidationPostReq'
NoValidationPutReqParam:
description: data部のValidationチェックは行わない
in: body
name: bodyStr
required: true
schema:
$ref: '#/definitions/noValidationPutReq'
RowKeyParam:
description: '参照したいRowKeyを指定します
'
in: query
name: rowkey
required: true
type: string
SentFromIdParam:
description: '参照したい送信者IDを指定します。
'
in: query
name: sent_from_id
required: false
type: string
ToTimestampParam:
description: '参照したい期間(TO)を指定します
※ISO 8601形式でUTC時刻
'
in: query
name: to_timestamp
required: false
type: string
TransactionIdParam:
description: '参照したいトランザクションIDを指定します
'
in: query
name: transaction_id
required: false
type: string
TransactionTypeParam:
description: '参照したいトランザクションタイプを指定します
'
in: query
name: transaction_type
required: false
type: string
ValidationPostReqParam:
description: validationチェック用
in: body
name: bodyStr
required: true
schema:
$ref: '#/definitions/validationPostReq'
ValidationPutReqParam:
description: validationチェック用
in: body
name: bodyStr
required: true
schema:
$ref: '#/definitions/validationPutReq'
paths:
/dnasb/fg_prd_m_plan:
get:
consumes:
- application/json
parameters:
- $ref: '#/parameters/RowKeyParam'
- $ref: '#/parameters/DatekeyParam'
- $ref: '#/parameters/FromTimestampParam'
- $ref: '#/parameters/ToTimestampParam'
- $ref: '#/parameters/LimitByTimestampParam'
- $ref: '#/parameters/TransactionIdParam'
produces:
- application/json
responses:
'200':
description: OK
schema:
$ref: '#/definitions/GetResponse'
'400':
description: Bad Request
'404':
description: Not Found
'500':
description: Internal Server Error
summary: 完成品生産計画_月次取得
post:
consumes:
- application/json
parameters:
- description: validationチェック用
in: body
name: bodyStr
required: true
schema:
properties:
data:
properties:
EXEC_DATE:
description: 処理日付
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
required: true
type: string
FACTORY_NAME:
description: 工場_生産拠点
required: true
type: string
x-validations:
byteLength:
max: 255
FINAL_PRODUCT:
description: 完成品
required: true
type: string
x-validations:
byteLength:
max: 255
PRODUCTION_DATE:
description: 生産日
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
required: true
type: string
PRODUCTION_QTY:
description: 生産数量
format: int32
required: true
type: integer
PRODUCTION_REF_DATE:
description: 生産基準日
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
required: true
type: string
type: object
overwrite:
$ref: '#/definitions/Overwrite'
rowkey:
$ref: '#/definitions/RowKey'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- rowkey
- data
type: object
produces:
- application/json
responses:
'200':
description: OK
schema:
$ref: '#/definitions/ResponseBody'
'400':
description: Bad Request
'500':
description: Internal Server Error
summary: 完成品生産計画_月次登録
x-edw:
span: 1
withHeader: true
x-object: true
x-parallels: 10
/dnasb/test_xx:
get:
consumes:
- application/json
parameters:
- $ref: '#/parameters/RowKeyParam'
- $ref: '#/parameters/DatekeyParam'
- $ref: '#/parameters/FromTimestampParam'
- $ref: '#/parameters/ToTimestampParam'
- $ref: '#/parameters/LimitByTimestampParam'
- $ref: '#/parameters/TransactionIdParam'
produces:
- application/json
responses:
'200':
description: OK
schema:
$ref: '#/definitions/GetResponse'
'400':
description: Bad Request
'404':
description: Not Found
'500':
description: Internal Server Error
summary: testakiyama取得
post:
consumes:
- application/json
parameters:
- description: validationチェック用
in: body
name: bodyStr
required: true
schema:
properties:
data:
properties:
TEST:
description: TEST
type: string
x-validations:
byteLength:
max: 255
type: object
overwrite:
$ref: '#/definitions/Overwrite'
rowkey:
$ref: '#/definitions/RowKey'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- rowkey
- data
type: object
produces:
- application/json
responses:
'200':
description: OK
schema:
$ref: '#/definitions/ResponseBody'
'400':
description: Bad Request
'500':
description: Internal Server Error
summary: testakiyama登録
x-edw:
span: 1
withHeader: true
x-object: true
x-parallels: 10
/dnasb/testakiyama_err:
get:
consumes:
- application/json
parameters:
- $ref: '#/parameters/RowKeyParam'
- $ref: '#/parameters/DatekeyParam'
- $ref: '#/parameters/FromTimestampParam'
- $ref: '#/parameters/ToTimestampParam'
- $ref: '#/parameters/LimitByTimestampParam'
- $ref: '#/parameters/TransactionIdParam'
produces:
- application/json
responses:
'200':
description: OK
schema:
$ref: '#/definitions/GetResponse'
'400':
description: Bad Request
'404':
description: Not Found
'500':
description: Internal Server Error
summary: testakiyama取得
post:
consumes:
- application/json
parameters:
- description: validationチェック用
in: body
name: bodyStr
required: true
schema:
properties:
data:
properties:
TEST:
description: TEST
format: float
type: number
type: object
overwrite:
$ref: '#/definitions/Overwrite'
rowkey:
$ref: '#/definitions/RowKey'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- rowkey
- data
type: object
produces:
- application/json
responses:
'200':
description: OK
schema:
$ref: '#/definitions/ResponseBody'
'400':
description: Bad Request
'500':
description: Internal Server Error
summary: testakiyama登録
x-edw:
span: 1
withHeader: true
x-object: true
x-parallels: 10
/dnasb/verification_raise:
get:
consumes:
- application/json
parameters:
- $ref: '#/parameters/RowKeyParam'
- $ref: '#/parameters/DatekeyParam'
- $ref: '#/parameters/FromTimestampParam'
- $ref: '#/parameters/ToTimestampParam'
- $ref: '#/parameters/LimitByTimestampParam'
- $ref: '#/parameters/TransactionIdParam'
produces:
- application/json
responses:
'200':
description: OK
schema:
$ref: '#/definitions/GetResponse'
'400':
description: Bad Request
'404':
description: Not Found
'500':
description: Internal Server Error
summary: 動作検証API_raise_test2取得
post:
consumes:
- application/json
parameters:
- description: validationチェック用
in: body
name: bodyStr
required: true
schema:
properties:
data:
properties:
TEST:
description: TEST
format: int32
type: integer
type: object
overwrite:
$ref: '#/definitions/Overwrite'
rowkey:
$ref: '#/definitions/RowKey'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- rowkey
- data
type: object
produces:
- application/json
responses:
'200':
description: OK
schema:
$ref: '#/definitions/ResponseBody'
'400':
description: Bad Request
'500':
description: Internal Server Error
summary: 動作検証API_raise_test2登録
x-edw:
span: 1
withHeader: true
x-object: true
x-parallels: 10
schemes:
- http
swagger: '2.0'
x-instance: 1
x-memory: 2048M
import os
import yaml
from marshmallow import Schema, fields, validates_schema, ValidationError, validate
# 假设你已经有了从YAML文件加载的内容
yaml_file_path = 'your_yaml_file_path.yml'
sample = 'fg_prd_m_plan'
with open(yaml_file_path, 'r') as file:
yaml_content = yaml.safe_load(file)
# 定位到特定路径下的POST请求的内容
post_content = yaml_content['paths'][f'/dnasb/{sample}']['post']
# 提取请求体(schema)定义
schema_definitions = post_content['parameters'][0]['schema']['properties']['data']['properties']
# 动态生成Schema类
class DynamicSchema(Schema):
pass
# 根据YAML文件动态添加字段到Schema类
for field_name, field_props in schema_definitions.items():
field_type = field_props['type']
validations = field_props.get('x-validations', {})
if field_type == 'string':
max_length = validations.get('byteLength', {}).get('max')
min_length = validations.get('byteLength', {}).get('min')
field_instance = fields.Str(validate=validate.Length(min=min_length, max=max_length))
elif field_type == 'integer':
# 根据需要添加更多类型的处理逻辑
field_instance = fields.Integer()
elif field_type == 'number':
field_instance = fields.Float()
# 将字段添加到DynamicSchema类
setattr(DynamicSchema, field_name, field_instance)
# 现在DynamicSchema已经根据YAML文件中的定义动态生成,可以用来验证数据
def _validationChecks(tmpFilePath: str):
try:
sample = 'fg_prd_m_plan'
# yaml読み込み
to_path = os.getenv("TO_PATH")
with open(f"{to_path}/input.yml") as file:
yaml_content = yaml.safe_load(file)
# 定位到特定路径下的POST请求的内容
post_content = yaml_content['paths'][f'/dnasb/{sample}']['post']
# 提取请求体(schema)定义
schema_definitions = post_content['parameters'][0]['schema']['properties']['data']['properties']
# 动态生成Schema类
class DynamicSchema(Schema):
pass
# 根据YAML文件动态添加字段到Schema类
for field_name, field_props in schema_definitions.items():
field_type = field_props['type']
validations = field_props.get('x-validations', {})
if field_type == 'string':
max_length = validations.get('byteLength', {}).get('max')
min_length = validations.get('byteLength', {}).get('min')
field_instance = fields.Str(
validate=validate.Length(min=min_length, max=max_length))
elif field_type == 'integer':
# 根据需要添加更多类型的处理逻辑
field_instance = fields.Integer()
elif field_type == 'number':
field_instance = fields.Float()
# 将字段添加到DynamicSchema类
setattr(DynamicSchema, field_name, field_instance)
# jsonファイル読み込み
with open(tmpFilePath) as f:
data = json.load(f)
# 创建一个空列表来收集验证通过的数据
dataList = []
# 创建Schema实例
schema = DynamicSchema()
# 遍历文件中的每个项目
for item in data:
try:
# 验证数据
validated_data = schema.load(item)
# 如果数据验证通过,添加到dataList中
dataList.append(validated_data)
except ValidationError as err:
# 如果验证失败,打印错误信息
print(f"Validation error for item {item}: {err.messages}")
except Exception as e:
raise e
def _validationChecks(tmpFilePath: str):
try:
sample = 'fg_prd_m_plan'
# YAML文件读取
to_path = os.getenv("TO_PATH")
with open(f"{to_path}/input.yml") as file:
yaml_content = yaml.safe_load(file)
# 定位到特定路径下的POST请求的内容
post_content = yaml_content['paths'][f'/dnasb/{sample}']['post']
# 提取请求体(schema)定义
schema_definitions = post_content['parameters'][0]['schema']['properties']['data']['properties']
# 动态生成Schema类
class DynamicSchema(Schema):
pass
# 根据YAML文件动态添加字段到Schema类
for field_name, field_props in schema_definitions.items():
field_type = field_props['type']
validations = field_props.get('x-validations', {})
if field_type == 'string':
max_length = validations.get('byteLength', {}).get('max')
min_length = validations.get('byteLength', {}).get('min')
field_instance = fields.Str(validate=validate.Length(min=min_length, max=max_length))
elif field_type == 'integer':
field_instance = fields.Integer()
elif field_type == 'number':
field_instance = fields.Float()
# 将字段添加到DynamicSchema类
setattr(DynamicSchema, field_name, field_instance)
# JSON文件读取
with open(tmpFilePath) as f:
data = json.load(f)
# 创建Schema实例
schema = DynamicSchema()
# 遍历文件中的每个项目并进行验证
for index, item in enumerate(data):
try:
validated_data = schema.load(item)
# 如果数据验证通过,添加到dataList中
print(f"Validation passed for item at index {index}: {validated_data}")
except ValidationError as err:
# 如果验证失败,打印错误信息及数据项标识
item_id = item.get("id", "Unknown ID") # 假设每个数据项都有一个'id'字段作为标识
print(f"Validation error for item at index {index} (ID: {item_id}): {err.messages}")
except Exception as e:
raise e
post_content = yaml_content['paths'][f'/dnasb/{sample}']['post']
KeyError: '/dnasb/fg_prd_m_plan'
if 'paths' in yaml_content:
print("Available paths in the YAML file:")
for path in yaml_content['paths']:
print(path)
Validation error for item at index 20 (ID: Unknown ID): {'PRODUCTION_QTY': ['Unknown field.'], 'PRODUCTION_DATE': ['Unknown field.'], 'FACTORY_NAME': ['Unknown field.'], 'PRODUCTION_REF_DATE': ['Unknown field.'], 'EXEC_DATE': ['Unknown field.'], 'FINAL_PRODUCT': ['Unknown field.']}
import requests
def download_file_from_github(url: str, destination_path: str):
response = requests.get(url)
if response.status_code == 200:
with open(destination_path, 'wb') as file:
file.write(response.content)
print(f"File downloaded successfully to {destination_path}")
else:
print(f"Failed to download file. Status code: {response.status_code}")
# GitHub文件的直链URL
file_url = 'https://raw.githubusercontent.com/user/repository/branch/path/to/file.yml'
# 目标路径
destination = 'path/to/save/file.yml'
download_file_from_github(file_url, destination)
import requests
def download_private_file_from_github(url: str, destination_path: str, token: str):
headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3.raw',
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
with open(destination_path, 'wb') as file:
file.write(response.content)
print("File downloaded successfully.")
else:
print(f"Failed to download file. Status code: {response.status_code} - {response.text}")
# GitHub文件的URL,注意这应该是API的URL格式,不是直接的文件链接
file_url = 'https://api.github.com/repos/user/repository/contents/path/to/file.yml?ref=branch'
# 本地保存路径
destination = 'your/local/path/file.yml'
# 你的GitHub Personal Access Token
git_token = 'your_github_token_here'
download_private_file_from_github(file_url, destination, git_token)
https://github.com/qmonus-test/i4-dpb_yaml/blob/master/ind/dnasb/input.yml
https://api.github.com/repos/qmonus-test/i4-dpb_yaml/contents/ind/dnasb/input.yml
Failed to download file. Status code: 401 - {"message":"Bad credentials","documentation_url":"https://docs.github.com/rest"}
2024-04-02 05:17:04,477 ERROR -20240402-c6f4c40e-485d-497a-a452-ab9080c199bf [Errno 2] No such file or directory: 'tmp/i4-dpb/input_test.yml'
def download_private_file_from_github(url: str, destination_path: str, token: str):
headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3.raw',
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
with open(destination_path, 'wb') as file:
file.write(response.content)
print("File downloaded successfully.")
else:
print(
f"Failed to download file. Status code: {response.status_code} - {response.text}")
# バリデーションチェック
def _validationChecks(tmpFilePath: str):
try:
to_path = os.getenv("TO_PATH")
# GitHub文件的URL,注意这应该是API的URL格式,不是直接的文件链接
file_url = 'https://api.github.com/repos/qmonus-test/i4-dpb_yaml/contents/ind/dnasb/input.yml'
# 本地保存路径
destination = f"{to_path}/input_test.yml"
# 你的GitHub Personal Access Token
git_token = os.getenv("GIT_TOKEN")
download_private_file_from_github(file_url, destination, git_token)
sample = 'fg_prd_d_plan'
# YAML文件读取
with open(f"{to_path}/input_test.yml") as file:
yaml_content = yaml.safe_load(file)
if 'paths' in yaml_content:
print("Available paths in the YAML file:")
for path in yaml_content['paths']:
print(path)
# 定位到特定路径下的POST请求的内容
post_content = yaml_content['paths'][f'/dnasb/{sample}']['post']
# 提取请求体(schema)定义
schema_definitions = post_content['parameters'][0]['schema']['properties']['data']['properties']
# 动态生成Schema类
class DynamicSchema(Schema):
pass
# 根据YAML文件动态添加字段到Schema类
for field_name, field_props in schema_definitions.items():
field_type = field_props['type']
validations = field_props.get('x-validations', {})
if field_type == 'string':
max_length = validations.get('byteLength', {}).get('max')
min_length = validations.get('byteLength', {}).get('min')
field_instance = fields.Str(
validate=validate.Length(min=min_length, max=max_length))
elif field_type == 'integer':
field_instance = fields.Integer()
elif field_type == 'number':
field_instance = fields.Float()
# 将字段添加到DynamicSchema类
setattr(DynamicSchema, field_name, field_instance)
# JSON文件读取
with open(tmpFilePath) as f:
data = json.load(f)
# 创建Schema实例
schema = DynamicSchema()
# 遍历文件中的每个项目并进行验证
for index, item in enumerate(data):
try:
validated_data = schema.load(item)
# 如果数据验证通过,添加到dataList中
print(
f"Validation passed for item at index {index}: {validated_data}")
except ValidationError as err:
# 如果验证失败,打印错误信息及数据项标识
item_id = item.get("id", "Unknown ID") # 假设每个数据项都有一个'id'字段作为标识
print(
f"Validation error for item at index {index} (ID: {item_id}): {err.messages}")
except Exception as e:
raise e
post:
consumes:
- application/json
parameters:
- description: validationチェック用
in: body
name: bodyStr
required: true
schema:
properties:
data:
properties:
EXEC_DATE:
description: 処理日付
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
required: true
type: string
FACTORY_NAME:
description: 工場_生産拠点
required: true
type: string
x-validations:
byteLength:
max: 255
FINAL_PRODUCT:
description: 完成品
required: true
type: string
x-validations:
byteLength:
max: 255
PRODUCTION_DATE:
description: 生産日
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
required: true
type: string
PRODUCTION_QTY:
description: 生産数量
format: int32
required: true
type: integer
PRODUCTION_REF_DATE:
description: 生産基準日
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
required: true
type: string
TEST_COL_A:
description: テスト項目_A
type: string
x-validations:
byteLength:
max: 255
type: object
overwrite:
$ref: '#/definitions/Overwrite'
rowkey:
$ref: '#/definitions/RowKey'
timestamp:
$ref: '#/definitions/Timestamp'
transaction_id:
$ref: '#/definitions/TransactionId'
required:
- rowkey
- data
type: object
produces:
- application/json
responses:
'200':
description: OK
schema:
$ref: '#/definitions/ResponseBody'
'400':
description: Bad Request
'500':
description: Internal Server Error
summary: 完成品生産計画_日次登録
x-edw:
span: 1
withHeader: true
x-object: true
x-parallels: 10
def generate_field(field_props):
# 基于字段属性生成Marshmallow字段
field_type = field_props['type']
if field_type == 'string':
max_length = field_props.get('x-validations', {}).get('byteLength', {}).get('max')
min_length = field_props.get('x-validations', {}).get('byteLength', {}).get('min')
pattern = field_props.get('pattern')
validators = []
if min_length or max_length:
validators.append(validate.Length(min=min_length, max=max_length))
if pattern:
validators.append(validate.Regexp(regex=pattern))
return fields.Str(validate=validators) if validators else fields.Str()
elif field_type == 'integer':
return fields.Integer()
elif field_type == 'number':
return fields.Float()
# 添加其他类型的处理逻辑
return None
def dynamic_schema_generator(schema_definitions):
class DynamicSchema(Schema):
pass
for field_name, field_props in schema_definitions.items():
field_instance = generate_field(field_props)
if field_instance:
setattr(DynamicSchema, field_name, field_instance)
return DynamicSchema
def _validationChecks(tmpFilePath: str):
try:
# 设置和下载文件
to_path = os.getenv("TO_PATH")
file_url = 'https://api.github.com/repos/qmonus-test/i4-dpb_yaml/contents/ind/dnasb/input.yml'
destination = f"{to_path}/input_test.yml"
git_token = os.getenv("GIT_TOKEN")
download_private_file_from_github(file_url, destination, git_token)
# 读取YAML定义并生成动态Schema
sample = 'fg_prd_d_plan'
with open(destination) as file:
yaml_content = yaml.safe_load(file)
post_content = yaml_content['paths'][f'/dnasb/{sample}']['post']
schema_definitions = post_content['parameters'][0]['schema']['properties']['data']['properties']
DynamicSchema = dynamic_schema_generator(schema_definitions)
# 数据验证
with open(tmpFilePath) as f:
data = json.load(f)
schema = DynamicSchema()
for index, item in enumerate(data):
try:
validated_data = schema.load(item)
print(f"Validation passed for item at index {index}: {validated_data}")
except ValidationError as err:
print(f"Validation error for item at index {index}: {err.messages}")
except Exception as e:
print(f"An error occurred: {e}")
def generate_field(field_props):
field_type = field_props['type']
validators = []
if 'pattern' in field_props:
validators.append(validate.Regexp(regex=field_props['pattern']))
if 'x-validations' in field_props:
byteLength = field_props['x-validations'].get('byteLength', {})
if byteLength:
validators.append(validate.Length(min=byteLength.get('min'), max=byteLength.get('max')))
if field_type == 'string':
return fields.Str(validate=validators) if validators else fields.Str()
elif field_type == 'integer':
return fields.Integer()
elif field_type == 'number':
return fields.Float()
return None # for unsupported field types
def dynamic_schema_generator(schema_definitions):
class DynamicSchema(Schema):
pass
for field_name, field_props in schema_definitions.items():
field = generate_field(field_props)
if field:
setattr(DynamicSchema, field_name, field)
return DynamicSchema
for field_name, field_instance in DynamicSchema().fields.items():
print(f"Field Name: {field_name}, Field Instance: {field_instance}")
{'EXEC_DATE': {'description': '処理日付', 'pattern': '^[0-9]{4}-[0-9]{2}-[0-9]{2}$', 'required': True, 'type': 'string'}, 'FACTORY_NAME': {'description': '工場_生産拠点', 'required': True, 'type': 'string', 'x-validations': {'byteLength': {'max': 255}}}, 'FINAL_PRODUCT': {'description': '完成品', 'required': True, 'type': 'string', 'x-validations': {'byteLength': {'max': 255}}}, 'PRODUCTION_DATE': {'description': '生産日', 'pattern': '^[0-9]{4}-[0-9]{2}-[0-9]{2}$', 'required': True, 'type': 'string'}, 'PRODUCTION_QTY': {'description': '生産数量', 'format': 'int32', 'required': True, 'type': 'integer'}, 'PRODUCTION_REF_DATE': {'description': '生産基準日', 'pattern': '^[0-9]{4}-[0-9]{2}-[0-9]{2}$', 'required': True, 'type': 'string'}, 'TEST_COL_A': {'description': 'テスト項目_A', 'type': 'string', 'x-validations': {'byteLength': {'max': 255}}}}
Validation error for item at index 0: {'EXEC_DATE': ['Unknown field.'], 'PRODUCTION_REF_DATE': ['Unknown field.'], 'PRODUCTION_DATE': ['Unknown field.'], 'FACTORY_NAME': ['Unknown field.'], 'FINAL_PRODUCT': ['Unknown field.'], 'PRODUCTION_QTY': ['Unknown field.']}
from marshmallow import Schema, fields, validate
def generate_field(field_name, field_props):
validators = []
# 处理 pattern 属性
if 'pattern' in field_props:
validators.append(validate.Regexp(regex=field_props['pattern']))
# 处理 string 类型的字段,并考虑 max 验证
if field_props['type'] == 'string':
if 'x-validations' in field_props and 'byteLength' in field_props['x-validations']:
max_length = field_props['x-validations']['byteLength'].get('max')
if max_length is not None:
validators.append(validate.Length(max=max_length))
return fields.Str(validate=validators) if validators else fields.Str()
# 处理 integer 类型的字段
elif field_props['type'] == 'integer':
return fields.Integer()
# 处理 number 类型的字段,这里假设 number 对应于 float
elif field_props['type'] == 'number':
return fields.Float()
# 可以继续添加对其他类型字段的支持...
return None # 如果没有匹配的类型,返回 None
# 动态生成 Schema 类
def dynamic_schema_generator(schema_definitions):
class DynamicSchema(Schema):
pass
for field_name, field_props in schema_definitions.items():
field_instance = generate_field(field_name, field_props)
if field_instance:
setattr(DynamicSchema, field_name, field_instance)
return DynamicSchema
from marshmallow import Schema, fields, validate
def generate_field(field_name, field_props):
field_type = field_props['type']
validators = []
# 处理 pattern 属性
pattern = field_props.get('pattern')
if pattern:
validators.append(validate.Regexp(regex=pattern))
# 根据字段类型决定使用哪个 Marshmallow 字段
if field_type == 'string':
# 仅当类型为 string 时处理最大长度
max_length = field_props.get('x-validations', {}).get('byteLength', {}).get('max') if 'x-validations' in field_props else None
if max_length:
validators.append(validate.Length(max=max_length))
return fields.Str(validate=validators) if validators else fields.Str()
elif field_type == 'integer':
# 对于 integer 类型,目前没有特定的 max 长度处理,直接返回 Integer 字段
return fields.Integer()
elif field_type == 'number':
# 对于 number 类型,这里假定它对应于 float,同样没有特定的 max 长度处理
return fields.Float()
# 如果未来需要,可以在此添加对其他类型字段的处理
return None
# 动态生成 Schema 类
def dynamic_schema_generator(schema_definitions):
class DynamicSchema(Schema):
pass
for field_name, field_props in schema_definitions.items():
field_instance = generate_field(field_name, field_props)
if field_instance:
setattr(DynamicSchema, field_name, field_instance)
return DynamicSchema
# 假设schema_definitions是您从YAML文件中解析得到的字段定义
schema_definitions = {
'EXEC_DATE': {'type': 'string', 'pattern': '^[0-9]{4}-[0-9]{2}-[0-9]{2}$'},
'FACTORY_NAME': {'type': 'string', 'x-validations': {'byteLength': {'max': 255}}},
# 添加其他字段定义...
}
# 使用之前提供的dynamic_schema_generator函数来生成DynamicSchema
DynamicSchema = dynamic_schema_generator(schema_definitions)
# 假设您有以下待验证的数据
data_to_validate = {
'EXEC_DATE': '2023-04-01',
'FACTORY_NAME': 'Factory 1',
# 确保包含所有必要的字段...
}
# 创建DynamicSchema的一个实例并进行验证
schema = DynamicSchema()
errors = schema.validate(data_to_validate)
if errors:
print("Validation errors:", errors)
else:
print("Validation successful.")
{'EXEC_DATE': {'type': 'string', 'pattern': '^[0-9]{4}-[0-9]{2}-[0-9]{2}$'}}
Validation errors: {'EXEC_DATE': ['Unknown field.']}
如果您遇到 {'EXEC_DATE': ['Unknown field.']} 这样的验证错误,尽管您确信已经在 DynamicSchema 中正确添加了该字段,那么问题很可能与如何动态地向 Marshmallow Schema 添加字段有关。这通常意味着在创建 Schema 类的过程中,字段没有被正确注册。
为了确保字段能被 Marshmallow 正确识别,我们需要在定义 Schema 类时,直接在类的定义中声明字段。由于您是动态添加字段,这里有一个方法确保这些动态字段被正确处理:
确保字段被正确添加到 DynamicSchema
请确保使用 type 函数动态创建带有字段的类,因为仅仅使用 setattr 可能不足以让 Marshmallow 在内部正确注册这些字段。下面是一个示例,展示如何使用 type 动态定义 Schema 类:
python
Copy code
from marshmallow import Schema, fields, validate
# 动态生成字段的函数
def generate_field(field_props):
validators = []
if 'pattern' in field_props:
validators.append(validate.Regexp(regex=field_props['pattern']))
field_type = field_props['type']
if field_type == 'string':
return fields.Str(validate=validators)
# 可以添加对其他类型的支持...
# 使用type动态创建带有字段的Schema类
def dynamic_schema_generator(schema_definitions):
# 准备一个字典来收集类属性(即字段)
attrs = {}
for field_name, field_props in schema_definitions.items():
# 根据字段属性生成字段实例
field_instance = generate_field(field_props)
if field_instance:
attrs[field_name] = field_instance
# 使用type动态创建类
return type('DynamicSchema', (Schema,), attrs)
def extract_schema_definitions(yaml_content, path):
try:
post_content = yaml_content['paths'][path]['post']
for param in post_content['parameters']:
if param['in'] == 'body':
return param['schema']['properties']
except KeyError as e:
print(f"Error extracting schema definitions: {e}")
return {}
path = '/your/api/path' # 根据你的YAML文件调整路径
schema_definitions = extract_schema_definitions(yaml_content, path)
def download_private_file_from_github(url: str, destination_path: str, token: str):
headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3.raw',
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
with open(destination_path, 'wb') as file:
file.write(response.content)
print("File downloaded successfully.")
else:
print(
f"Failed to download file. Status code: {response.status_code} - {response.text}")
# 动态生成字段的函数
def generate_field(field_props):
validators = []
if 'pattern' in field_props:
validators.append(validate.Regexp(regex=field_props['pattern']))
field_type = field_props['type']
if field_type == 'string':
return fields.Str(validate=validators)
# 可以添加对其他类型的支持...
# 使用type动态创建带有字段的Schema类
def dynamic_schema_generator(schema_definitions):
# 准备一个字典来收集类属性(即字段)
attrs = {}
for field_name, field_props in schema_definitions.items():
# 根据字段属性生成字段实例
field_instance = generate_field(field_props)
if field_instance:
attrs[field_name] = field_instance
# 使用type动态创建类
return type('DynamicSchema', (Schema,), attrs)
def extract_schema_definitions(yaml_content, id):
try:
post_content = yaml_content['paths'][id]['post']
for param in post_content['parameters']:
if param['in'] == 'body':
return param['schema']['properties']
except KeyError as e:
print(f"Error extracting schema definitions: {e}")
return {}
# バリデーションチェック
def _validationChecks(tmpFilePath: str):
try:
to_path = os.getenv("TO_PATH")
# GitHub文件的URL,注意这应该是API的URL格式,不是直接的文件链接
file_url = 'https://api.github.com/repos/qmonus-test/i4-dpb_yaml/contents/ind/dnasb/input.yml'
# 本地保存路径
destination = f"{to_path}/input_test.yml"
# 你的GitHub Personal Access Token
git_token = os.getenv("GIT_TOKEN")
download_private_file_from_github(file_url, destination, git_token)
sample = 'fg_prd_d_plan'
# YAML文件读取
with open(f"{to_path}/input_test.yml") as file:
yaml_content = yaml.safe_load(file)
extract_schema = extract_schema_definitions(
yaml_content, "/dna/fg_prd_d_plan")
DynamicSchema = dynamic_schema_generator(extract_schema)
# JSON文件读取
with open(tmpFilePath) as f:
data = json.load(f)
# 创建Schema实例
schema = DynamicSchema()
# 遍历文件中的每个项目并进行验证
for index, item in enumerate(data):
try:
validated_data = schema.load(item)
print(
f"Validation OK: {validated_data}")
except ValidationError as err:
print(
f"Validation NG: {err.messages}")
except Exception as e:
raise e
def generate_field(field_props):
validators = []
if 'pattern' in field_props:
validators.append(validate.Regexp(regex=field_props['pattern']))
field_type = field_props['type']
if field_type == 'string':
return fields.Str(validate=validators)
# 可以添加对其他类型的支持...
# 使用type动态创建带有字段的Schema类
def dynamic_schema_generator(schema_definitions):
# 准备一个字典来收集类属性(即字段)
attrs = {}
for field_name, field_props in schema_definitions.items():
# 根据字段属性生成字段实例
field_instance = generate_field(field_props)
if field_instance:
attrs[field_name] = field_instance
# 使用type动态创建类
return type('DynamicSchema', (Schema,), attrs)
{'EXEC_DATE': '2022-11-17', 'FACTORY_NAME': '工場A', 'FINAL_PRODUCT': 'FG-A1-001', 'PRODUCTION_DATE': '2022-11-20', 'PRODUCTION_QTY': '70', 'PRODUCTION_REF_DATE': '2022-11-20'}
Validation passed for item at index 0: {'PRODUCTION_QTY': ['Unknown field.']}
{'EXEC_DATE': '2022-11-17', 'FACTORY_NAME': '工場A', 'FINAL_PRODUCT': 'FG-A1-001', 'PRODUCTION_DATE': '2022-11-21', 'PRODUCTION_QTY': '工場A', 'PRODUCTION_REF_DATE': '2022-11-20'}
Validation passed for item at index 1: {'PRODUCTION_QTY': ['Unknown field.']}
{'EXEC_DATE': '2022-11-17', 'FACTORY_NAME': '工場A', 'FINAL_PRODUCT': 'FG-A1-001', 'PRODUCTION_DATE': '2022-11-22', 'PRODUCTION_QTY': '70', 'PRODUCTION_REF_DATE': '2022-11-20'}
# download from github
def download_private_file_from_github(url: str, destination_path: str, token: str):
headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3.raw',
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
with open(destination_path, 'wb') as file:
file.write(response.content)
print("File downloaded successfully.")
else:
print(
f"Failed to download file. Status code: {response.status_code} - {response.text}")
# バリデーションチェック
def _validationChecks(tmpFilePath: str):
try:
to_path = os.getenv("TO_PATH")
# GitHub文件的URL,注意这应该是API的URL格式,不是直接的文件链接
file_url = 'https://api.github.com/repos/qmonus-test/i4-dpb_yaml/contents/ind/dnasb/input.yml'
# 本地保存路径
destination = f"{to_path}/input_test.yml"
# 你的GitHub Personal Access Token
git_token = os.getenv("GIT_TOKEN")
download_private_file_from_github(file_url, destination, git_token)
# 读取YAML定义并生成动态Schema
sample = 'fg_prd_d_plan'
with open(destination) as file:
yaml_content = yaml.safe_load(file)
post_content = yaml_content['paths'][f'/dnasb/{sample}']['post']
schema_definitions = post_content['parameters'][0]['schema']['properties']['data']['properties']
DynamicSchema = dynamic_schema_generator(schema_definitions)
# 数据验证
with open(tmpFilePath) as f:
data = json.load(f)
schema = DynamicSchema()
# 遍历文件中的每个项目并进行验证
for index, item in enumerate(data):
try:
print(item)
validated_data = schema.validate(item)
print(
f"Validation passed for item at index {index}: {validated_data}")
except ValidationError as err:
print(
f"Validation error for item at index {index}: {err.messages}")
except Exception as e:
print(f"An error occurred: {e}")
# 非同期処理
def _execJob(table: str, fileName: str, request_id: str, jobId: str):
# サンプルデータ挿入
try:
updateJobPhase(jobId, 1, "データ登録")
logger.info(f"{request_id} ----- copyIntoTable start")
copyIntoTable(table, fileName)
logger.info(f"{request_id} ----- copyIntoTable end")
except Exception as e:
logger.error(f"{request_id} {e}", exc_info=True)
updateJobError(jobId, e,
"データ登録エラー",
"データ登録処理に失敗しました。",
DEFAULT_USER_ACTION)
raise e
# 登録済JSONファイル削除
try:
updateJobPhase(jobId, 2, "登録済ファイル削除")
logger.info("----- deleteUploadFile start")
_deleteUploadFile(request_id)
logger.info("----- deleteUploadFile end")
except Exception as e:
logger.error(f"{request_id} {e}", exc_info=True)
updateJobError(jobId, e,
"登録済ファイル削除エラー",
"登録済ファイル削除処理に失敗しました。",
DEFAULT_USER_ACTION)
raise e
finishJobStatus(jobId)
def postCopyIntoApiData(data: json, file: werkzeug.datastructures.FileStorage):
request_id = request.request_id
id: str = data["id"]
method = data["method"]
filename = data["filename"]
table = f"J_{PROJECT.upper()}_{id.upper()}"
logger.info(f"id: {id}")
# テーブル存在チェック
try:
_tableExistenceCheck(table)
logicalName = _getApiLogicalName(id)
except tableNotExistException as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'データ投入先のAPIが存在しません。画面を更新の上、再度実行してください。'}
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'snowflake接続に失敗しました。サポートへお問い合わせください。'}
# 受信したファイルの保存
try:
logger.info("----- mkdirAndSaveFile start")
tmpFilePath = _mkdirAndSaveFile(request_id, file)
logger.info("----- mkdirAndSaveFile end")
except incorrectFileExtensionExceprion as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'ファイル拡張子はcsvまたはjsonのみとなります。ファイルを確認の上、再度実行してください。'}
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'アップロードファイルの保存に失敗しました。サポートへお問い合わせください。'}
# バリデーションチェック
try:
logger.info("----- check start")
_validationChecks(tmpFilePath)
logger.info("----- check end")
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'バリデーションチェックエラーです。'}
# ステージ作成
try:
logger.info("----- createInternalStage start")
createInternalStage()
logger.info("----- createInternalStage end")
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'ステージ作成に失敗しました。サポートへお問い合わせください。'}
# 登録用JSONファイル作成
try:
logger.info("----- createUploadFile start")
filePath, fileName, rowkey = _createUploadFile(tmpFilePath,
request_id)
logger.info("----- createUploadFile end")
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": '登録用ファイル作成に失敗しました。サポートへお問い合わせください。'}
# ファイルのステージング
try:
logger.info("----- putDataFile start")
putDataFile(filePath)
logger.info("----- putDataFile end")
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'ファイルの登録に失敗しました。サポートへお問い合わせください。'}
# 登録済データ削除
if method == 'update':
try:
_truncateTable(table)
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": '既存データ削除に失敗しました。サポートへお問い合わせください。'}
# ジョブ開始
processTitle = "サンプルデータ挿入"
processName = f"{processTitle}処理 {logicalName}:{filename}"
externalQuery = {"apiTablePhy": id.upper(),
"putApiDataTran": rowkey}
jobId = beginJobStatus(processTitle, processName, 2, externalQuery)
# 非同期処理開始
fire_and_forget(_execJob, table, fileName, request_id, jobId)
return {"message": "COMPLETE", "jobId": jobId}
def generate_field(field_props):
validators = []
if 'pattern' in field_props:
validators.append(validate.Regexp(regex=field_props['pattern']))
field_type = field_props['type']
if field_type == 'string':
return fields.Str(validate=validators)
# 可以添加对其他类型的支持...
# 使用type动态创建带有字段的Schema类
def dynamic_schema_generator(schema_definitions):
# 准备一个字典来收集类属性(即字段)
attrs = {}
for field_name, field_props in schema_definitions.items():
# 根据字段属性生成字段实例
field_instance = generate_field(field_props)
if field_instance:
attrs[field_name] = field_instance
# 使用type动态创建类
return type('DynamicSchema', (Schema,), attrs)
{'EXEC_DATE': '2022-11-17', 'FACTORY_NAME': '工場A', 'FINAL_PRODUCT': 'FG-A1-001', 'PRODUCTION_DATE': '2022-11-20', 'PRODUCTION_QTY': '70', 'PRODUCTION_REF_DATE': '2022-11-20'}
Validation passed for item at index 0: {}
{'EXEC_DATE': '2022-11-17', 'FACTORY_NAME': '工場A', 'FINAL_PRODUCT': 'FG-A1-001', 'PRODUCTION_DATE': '2022-11-21', 'PRODUCTION_QTY': '工場A', 'PRODUCTION_REF_DATE': '2022-11-20'}
Validation passed for item at index 1: {'PRODUCTION_QTY': ['Not a valid integer.']}
{'EXEC_DATE': '2022-11-17', 'FACTORY_NAME': '工場A', 'FINAL_PRODUCT': 'FG-A1-001', 'PRODUCTION_DATE': '2022-11-22', 'PRODUCTION_QTY': '70', 'PRODUCTION_REF_DATE': '2022-11-20'}
Validation passed for item at index 2: {}
# 遍历文件中的每个项目并进行验证
for index, item in enumerate(data):
try:
print(item)
validated_data = schema.validate(item)
print(
f"Validation passed for item at index {index}: {validated_data}")
except ValidationError as err:
print(
f"Validation error for item at index {index}: {err.messages}")
except Exception as e:
print(f"An error occurred: {e}")
# download from github
def download_private_file_from_github(url: str, destination_path: str, token: str):
headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3.raw',
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
with open(destination_path, 'wb') as file:
file.write(response.content)
print("File downloaded successfully.")
else:
print(
f"Failed to download file. Status code: {response.status_code} - {response.text}")
# バリデーションチェック
def _validationChecks(tmpFilePath: str):
try:
to_path = os.getenv("TO_PATH")
# GitHub文件的URL,注意这应该是API的URL格式,不是直接的文件链接
file_url = 'https://api.github.com/repos/qmonus-test/i4-dpb_yaml/contents/ind/dnasb/input.yml'
# 本地保存路径
destination = f"{to_path}/input_test.yml"
# 你的GitHub Personal Access Token
git_token = os.getenv("GIT_TOKEN")
download_private_file_from_github(file_url, destination, git_token)
# 读取YAML定义并生成动态Schema
sample = 'fg_prd_d_plan'
with open(destination) as file:
yaml_content = yaml.safe_load(file)
post_content = yaml_content['paths'][f'/dnasb/{sample}']['post']
schema_definitions = post_content['parameters'][0]['schema']['properties']['data']['properties']
print(schema_definitions)
DynamicSchema = dynamic_schema_generator(schema_definitions)
# 数据验证
with open(tmpFilePath) as f:
data = json.load(f)
schema = DynamicSchema()
# 遍历文件中的每个项目并进行验证
for index, item in enumerate(data):
try:
print(item)
validated_data = schema.validate(item)
if validated_data:
print(
f"Validation NG {index}: {validated_data}")
else:
print(
f"Validation OK")
except ValidationError as e:
print(f"An error occurred: {e}")
except Exception as e:
print(f"An error occurred: {e}")
# 非同期処理
def _execJob(table: str, fileName: str, request_id: str, jobId: str):
# サンプルデータ挿入
try:
updateJobPhase(jobId, 1, "データ登録")
logger.info(f"{request_id} ----- copyIntoTable start")
copyIntoTable(table, fileName)
logger.info(f"{request_id} ----- copyIntoTable end")
except Exception as e:
logger.error(f"{request_id} {e}", exc_info=True)
updateJobError(jobId, e,
"データ登録エラー",
"データ登録処理に失敗しました。",
DEFAULT_USER_ACTION)
raise e
# 登録済JSONファイル削除
try:
updateJobPhase(jobId, 2, "登録済ファイル削除")
logger.info("----- deleteUploadFile start")
_deleteUploadFile(request_id)
logger.info("----- deleteUploadFile end")
except Exception as e:
logger.error(f"{request_id} {e}", exc_info=True)
updateJobError(jobId, e,
"登録済ファイル削除エラー",
"登録済ファイル削除処理に失敗しました。",
DEFAULT_USER_ACTION)
raise e
finishJobStatus(jobId)
def postCopyIntoApiData(data: json, file: werkzeug.datastructures.FileStorage):
request_id = request.request_id
id: str = data["id"]
method = data["method"]
filename = data["filename"]
table = f"J_{PROJECT.upper()}_{id.upper()}"
logger.info(f"id: {id}")
# テーブル存在チェック
try:
_tableExistenceCheck(table)
logicalName = _getApiLogicalName(id)
except tableNotExistException as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'データ投入先のAPIが存在しません。画面を更新の上、再度実行してください。'}
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'snowflake接続に失敗しました。サポートへお問い合わせください。'}
# 受信したファイルの保存
try:
logger.info("----- mkdirAndSaveFile start")
tmpFilePath = _mkdirAndSaveFile(request_id, file)
logger.info("----- mkdirAndSaveFile end")
except incorrectFileExtensionExceprion as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'ファイル拡張子はcsvまたはjsonのみとなります。ファイルを確認の上、再度実行してください。'}
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'アップロードファイルの保存に失敗しました。サポートへお問い合わせください。'}
# バリデーションチェック
try:
logger.info("----- check start")
_validationChecks(tmpFilePath)
logger.info("----- check end")
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'バリデーションチェックエラーです。'}
# ステージ作成
try:
logger.info("----- createInternalStage start")
createInternalStage()
logger.info("----- createInternalStage end")
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'ステージ作成に失敗しました。サポートへお問い合わせください。'}
# 登録用JSONファイル作成
try:
logger.info("----- createUploadFile start")
filePath, fileName, rowkey = _createUploadFile(tmpFilePath,
request_id)
logger.info("----- createUploadFile end")
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": '登録用ファイル作成に失敗しました。サポートへお問い合わせください。'}
# ファイルのステージング
try:
logger.info("----- putDataFile start")
putDataFile(filePath)
logger.info("----- putDataFile end")
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'ファイルの登録に失敗しました。サポートへお問い合わせください。'}
# 登録済データ削除
if method == 'update':
try:
_truncateTable(table)
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": '既存データ削除に失敗しました。サポートへお問い合わせください。'}
# ジョブ開始
processTitle = "サンプルデータ挿入"
processName = f"{processTitle}処理 {logicalName}:{filename}"
externalQuery = {"apiTablePhy": id.upper(),
"putApiDataTran": rowkey}
jobId = beginJobStatus(processTitle, processName, 2, externalQuery)
# 非同期処理開始
fire_and_forget(_execJob, table, fileName, request_id, jobId)
return {"message": "COMPLETE", "jobId": jobId}
def generate_field(field_props):
validators = []
if 'pattern' in field_props:
validators.append(validate.Regexp(regex=field_props['pattern']))
field_type = field_props['type']
if field_type == 'string':
return fields.Str(validate=validators)
elif field_type == 'integer':
return fields.Integer()
# 可以添加对其他类型的支持...
# 使用type动态创建带有字段的Schema类
def dynamic_schema_generator(schema_definitions):
# 准备一个字典来收集类属性(即字段)
attrs = {}
for field_name, field_props in schema_definitions.items():
# 根据字段属性生成字段实例
field_instance = generate_field(field_props)
if field_instance:
attrs[field_name] = field_instance
# 使用type动态创建类
return type('DynamicSchema', (Schema,), attrs)
def _createUploadFile(tmpFilePath: str, reqId: str, error_data):
# パラメータ作成
rowkey = _randomname(32)
transaction_id = f"{_randomnum(2)}-{_randomnum(13)}"
date = datetime.datetime.now()
tmStamp = f"{date.isoformat(timespec='milliseconds')}Z"
yyyymm = date.strftime("%Y%m")
try:
# 受信ファイル読み込み
with open(tmpFilePath) as f:
data = json.load(f)
dataList = []
for item in data:
tmp = {}
tmp['data'] = item
tmp["datekey"] = yyyymm
tmp["overwrite"] = False
tmp["received_time"] = tmStamp
tmp["registered_datetime"] = None
tmp["rowkey"] = rowkey
tmp["sent_from_id"] = None
tmp["sent_from_ip"] = None
tmp["sent_time"] = tmStamp
tmp["timestamp"] = tmStamp
tmp["transaction_id"] = transaction_id
tmp["transaction_type"] = "INS"
tmp["yyyymm"] = yyyymm
dataList.append(tmp)
# アップロード用JSONファイル作成
fileName = f"{rowkey}.json"
filePath = f'{UPLOAD_FILE_PATH}/{reqId}/{fileName}'
with open(filePath, 'w') as f:
json.dump(dataList, f, indent=2, ensure_ascii=False)
except Exception as e:
raise e
return filePath, fileName, rowkey
[{'EXEC_DATE': '2022-11-17', 'FACTORY_NAME': '工場A', 'FINAL_PRODUCT': 'FG-A1-001', 'PRODUCTION_DATE': '2022-11-21', 'PRODUCTION_QTY': '工場A', 'PRODUCTION_REF_DATE': '2022-11-20'}, {'EXEC_DATE': '2022-11-17', 'FACTORY_NAME': '工場A', 'FINAL_PRODUCT': 'FG-A1-001', 'PRODUCTION_DATE': '2022-11-21', 'PRODUCTION_QTY': '0', 'PRODUCTION_REF_DATE': '2022-11-20場'}]
error_data = []
for index, item in enumerate(data):
try:
validated_data = schema.validate(item)
if validated_data:
error_data.append(item)
print(
f"Validation NG {index}: {validated_data}")
except ValidationError as e:
print(f"An error occurred: {e}")
except Exception as e:
print(f"An error occurred: {e}")
print(error_data)
return error_data
2024-04-02 09:43:34,007 ERROR - 20240402-bfc83600-c916-484b-bc6b-21f5443406e5 list indices must be integers or slices, not str Traceback (most recent call last): File "/home/uenv/q_li/Desktop/catalog-web-app/server/ap/postCopyIntoApiData.py", line 301, in _execJob dumpmsg = f'''{dumpmsg}\nエラーレコード{error_data["line"]}行目: {error_data["error"]}'''.replace( TypeError: list indices must be integers or slices, not str
validated_data = schema.validate(item)
print(validated_data)
if validated_data:
error_data.append({"line": index, "error": item})
{}
{}
{}
{'PRODUCTION_REF_DATE': ['String does not match expected pattern.']}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
error_data以下です。
[{'line': 3, 'error': {'EXEC_DATE': '2022-11-24', 'FACTORY_NAME': '工場A', 'FINAL_PRODUCT': 'FG-A1-001', 'PRODUCTION_DATE': '2022-11-27', 'PRODUCTION_QTY': 300, 'PRODUCTION_REF_DATE': '2022-11-27工場A'}}, {'line': 5, 'error': {'EXEC_DATE': '2022-11-24工場A', 'FACTORY_NAME': '工場A', 'FINAL_PRODUCT': 'FG-A1-001', 'PRODUCTION_DATE': '2022-11-27', 'PRODUCTION_QTY': 300, 'PRODUCTION_REF_DATE': '2022-11-27'}}]
例えば、以下の処理があります。
dataList = []
for index, item in enumerate(data):
if item not in error_data["error"]:
print(item)
dataは{'EXEC_DATE': '2022-11-24', 'FACTORY_NAME': '工場A', 'FINAL_PRODUCT': 'FG-A1-001', 'PRODUCTION_DATE': '2022-11-27', 'PRODUCTION_QTY': 300, 'PRODUCTION_REF_DATE': '2022-11-27工場A'}
こちらのif文はどう書くべき?今のが違います。
import datetime
import os
import random
import shutil
import string
import json
import logging
import werkzeug
import pandas as pd
import gc
import yaml
import requests
from werkzeug.utils import secure_filename
from flask import request
from tools.snowflakeAccessor import SnowflakeAccessor
from tools.snowflakeUtils import createInternalStage
from tools.snowflakeUtils import putDataFile
from tools.snowflakeUtils import copyIntoTable
from tools.asyncUtils import fire_and_forget
from tools.jobStatus import beginJobStatus, updateJobWarning
from tools.jobStatus import finishJobStatus
from tools.jobStatus import updateJobError
from tools.jobStatus import updateJobPhase
from marshmallow import ValidationError, Schema, fields, validate
ENV = os.getenv("ENV")
PROJECT = os.getenv("PROJECT")
TENANT = os.getenv("TENANT")
DPL_HOST = os.getenv("DPL_HOST")
DPB_USER = os.getenv("DPB_USER")
DPB_USER_DATABASE = os.getenv("DPB_USER_DATABASE")
DPB_USER_SCHEMA = os.getenv("DPB_USER_SCHEMA")
SNOW_WH = os.getenv("SNOW_WH")
SNOW_DB = os.getenv("SNOW_DB")
SNOW_SCHEMA = os.getenv("SNOW_SCHEMA")
GIT_ORGANIZATION = os.getenv("GIT_ORGANIZATION")
UPLOAD_FILE_PATH = "tmp/dataFile"
ALLOWED_EXTENSIONS = {'csv', 'json'}
DPB_DB_NAME = f"{PROJECT}_{ENV}".upper()
DEFAULT_USER_ACTION = "本画面のスクリーンショットを取得し、サポートへお問い合わせください。"
TO_PATH = os.getenv("TO_PATH")
GIT_URL_DPB_YAML = f'https://api.github.com/repos/{GIT_ORGANIZATION}/i4-dpb_yaml/contents/{TENANT}/{PROJECT}/input.yml'
# logger初期化
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
class tableNotExistException(Exception):
pass
class incorrectFileExtensionExceprion(Exception):
pass
def _randomname(n):
return "".join(random.choices(string.ascii_letters + string.digits, k=n))
def _randomnum(n):
return "".join(random.choices(string.digits, k=n))
# 対象テーブル存在チェック
def _tableExistenceCheck(table: str):
try:
obj = SnowflakeAccessor(
database=DPB_DB_NAME,
schema="PUBLIC",
role="ACCOUNTADMIN",
)
query = f"""
SHOW TABLES LIKE '{table}';
"""
res = obj.execute(query)
tableData = res.fetchall()[0][0]
if not tableData:
raise tableNotExistException(f"{table} is not exist.")
except Exception as e:
raise e
finally:
obj.close()
# 受信ファイル保存
def _mkdirAndSaveFile(reqId: str, file: werkzeug.datastructures.FileStorage):
# サニタイズ処理
filename = secure_filename(file.filename)
# 拡張子チェック
splitFilename = filename.rsplit('.', 1)
targetExtention = splitFilename[1].lower()
resultCheckExtentions = '.' in filename and targetExtention in ALLOWED_EXTENSIONS
if not resultCheckExtentions:
raise incorrectFileExtensionExceprion(
f"file extention is csv or json only. upload file extention:{targetExtention}")
# 格納用ディレクトリ作成
tmpPath = f"{UPLOAD_FILE_PATH}/{reqId}/rawdata/"
try:
os.makedirs(tmpPath)
except Exception as e:
logger.error(f"Warning:{e}", exc_info=True)
# ファイル作成
try:
tmpFilePath = os.path.join(tmpPath, filename)
file.save(tmpFilePath)
# 受信ファイルがCSVの場合
if targetExtention == 'csv':
tmpFilePath = _convertCsv2Json(tmpFilePath,
tmpPath,
f'{splitFilename[0]}.json')
except Exception as e:
raise e
return tmpFilePath
# アップロード用JSONファイル作成
def _createUploadFile(tmpFilePath: str, reqId: str, error_data, error_count):
# パラメータ作成
rowkey = _randomname(32)
transaction_id = f"{_randomnum(2)}-{_randomnum(13)}"
date = datetime.datetime.now()
tmStamp = f"{date.isoformat(timespec='milliseconds')}Z"
yyyymm = date.strftime("%Y%m")
try:
# 受信ファイル読み込み
with open(tmpFilePath) as f:
data = json.load(f)
dataList = []
error_recordNum = []
count = 0
for index, item in enumerate(data):
count = count + 1
if item not in error_data:
tmp = {}
tmp['data'] = item
tmp["datekey"] = yyyymm
tmp["overwrite"] = False
tmp["received_time"] = tmStamp
tmp["registered_datetime"] = None
tmp["rowkey"] = rowkey
tmp["sent_from_id"] = None
tmp["sent_from_ip"] = None
tmp["sent_time"] = tmStamp
tmp["timestamp"] = tmStamp
tmp["transaction_id"] = transaction_id
tmp["transaction_type"] = "INS"
tmp["yyyymm"] = yyyymm
dataList.append(tmp)
else:
error_recordNum.append(count)
print(error_recordNum)
# アップロード用JSONファイル作成
fileName = f"{rowkey}.json"
filePath = f'{UPLOAD_FILE_PATH}/{reqId}/{fileName}'
with open(filePath, 'w') as f:
json.dump(dataList, f, indent=2, ensure_ascii=False)
except Exception as e:
raise e
return filePath, fileName, rowkey, error_recordNum
# CSV -> JSON
def _convertCsv2Json(filePath: str, dirName: str, fileName: str):
try:
# pandasでcsvファイルをデータフレーム形式で読み込み
df = pd.read_csv(filePath)
tmpFilePath = os.path.join(dirName, fileName)
with open(tmpFilePath, 'w') as f:
# dfをJSONに変換
wData = json.loads(df.to_json(orient='records'))
json.dump(wData, f, indent=2, ensure_ascii=False)
# dfを破棄し、メモリを解放する
del df
gc.collect()
except Exception as e:
raise e
return tmpFilePath
# API論理名取得
def _getApiLogicalName(id: str):
try:
obj = SnowflakeAccessor()
query = f"""
SELECT
API_LOGICAL_NAME
FROM
API_META_INFO
WHERE
API_PHYSICAL_NAME = '{id}'
LIMIT 1;
"""
res = obj.execute(query)
id_logical_name = res.fetchall()[0][0]
if not id_logical_name:
raise tableNotExistException(f"{id}:meta data is not exist.")
except Exception as e:
raise e
finally:
obj.close()
return id_logical_name
# 既存データ削除処理
def _truncateTable(table: str):
try:
i4sf = SnowflakeAccessor(
database=DPB_DB_NAME,
schema="PUBLIC",
role="ACCOUNTADMIN",
)
fquery = f"""
TRUNCATE TABLE
"{table}"
;
"""
i4sf.execute(fquery)
except Exception as e:
raise e
finally:
i4sf.close()
# 受信ファイル、アップロード済みファイル削除処理
def _deleteUploadFile(reqId: str):
try:
shutil.rmtree(f'{UPLOAD_FILE_PATH}/{reqId}')
except Exception as e:
raise e
# input.ymlファイル削除処理
def _deleteYmlFile(destination_path: str):
try:
os.remove(destination_path)
except Exception as e:
raise e
# download from github
def download_private_file_from_github(url: str, destination_path: str, token: str):
try:
headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3.raw',
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
with open(destination_path, 'wb') as file:
file.write(response.content)
print("File downloaded successfully.")
else:
print(
f"Failed to download file. Status code: {response.status_code} - {response.text}")
except Exception as e:
raise e
# バリデーションチェック
def _validationChecks(tmpFilePath: str, id: str):
try:
file_url = GIT_URL_DPB_YAML
# 本地保存路径
destination = f"{TO_PATH}/input_test.yml"
# 你的GitHub Personal Access Token
git_token = os.getenv("GIT_TOKEN")
download_private_file_from_github(file_url, destination, git_token)
# 读取YAML定义并生成动态Schema
with open(destination) as file:
yaml_content = yaml.safe_load(file)
post_content = yaml_content['paths'][f'/dnasb/{id}']['post']
schema_definitions = post_content['parameters'][0]['schema']['properties']['data']['properties']
DynamicSchema = dynamic_schema_generator(schema_definitions)
# 数据验证
with open(tmpFilePath) as f:
data = json.load(f)
schema = DynamicSchema()
# 遍历文件中的每个项目并进行验证
error_data = []
error_count = []
for index, item in enumerate(data):
errors = schema.validate(item)
if errors:
error_count.append(index)
error_data.append(item)
# delete input.yml
_deleteYmlFile(destination)
except Exception as e:
raise e
return error_data, error_count
# 非同期処理
def _execJob(table: str, fileName: str, request_id: str, jobId: str, error_data, error_recordNum):
try:
dumpmsg = ""
message = "データバリデーションエラー"
description = "必要な情報が欠落しているか、データが規定の形式に適合していません"
userAction = "入力内容を確認して再度お試しください"
for num, data in zip(error_recordNum, error_data):
dumpmsg = f'''{dumpmsg}\nエラーレコード{num}行目: {data}'''.replace(
"'", "")
if error_data:
updateJobWarning(jobId, message, description, userAction, dumpmsg)
except Exception as e:
logger.error(f"{request_id} {e}", exc_info=True)
updateJobError(jobId, e,
"データ登録エラー",
"データ登録処理に失敗しました。",
DEFAULT_USER_ACTION)
raise e
# サンプルデータ挿入
try:
updateJobPhase(jobId, 1, "データ登録")
logger.info(f"{request_id} ----- copyIntoTable start")
copyIntoTable(table, fileName)
logger.info(f"{request_id} ----- copyIntoTable end")
except Exception as e:
logger.error(f"{request_id} {e}", exc_info=True)
updateJobError(jobId, e,
"データ登録エラー",
"データ登録処理に失敗しました。",
DEFAULT_USER_ACTION)
raise e
# 登録済JSONファイル削除
try:
updateJobPhase(jobId, 2, "登録済ファイル削除")
logger.info("----- deleteUploadFile start")
_deleteUploadFile(request_id)
logger.info("----- deleteUploadFile end")
except Exception as e:
logger.error(f"{request_id} {e}", exc_info=True)
updateJobError(jobId, e,
"登録済ファイル削除エラー",
"登録済ファイル削除処理に失敗しました。",
DEFAULT_USER_ACTION)
raise e
finishJobStatus(jobId)
def postCopyIntoApiData(data: json, file: werkzeug.datastructures.FileStorage):
request_id = request.request_id
id: str = data["id"]
method = data["method"]
filename = data["filename"]
table = f"J_{PROJECT.upper()}_{id.upper()}"
logger.info(f"id: {id}")
# テーブル存在チェック
try:
_tableExistenceCheck(table)
logicalName = _getApiLogicalName(id)
except tableNotExistException as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'データ投入先のAPIが存在しません。画面を更新の上、再度実行してください。'}
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'snowflake接続に失敗しました。サポートへお問い合わせください。'}
# 受信したファイルの保存
try:
logger.info("----- mkdirAndSaveFile start")
tmpFilePath = _mkdirAndSaveFile(request_id, file)
logger.info("----- mkdirAndSaveFile end")
except incorrectFileExtensionExceprion as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'ファイル拡張子はcsvまたはjsonのみとなります。ファイルを確認の上、再度実行してください。'}
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'アップロードファイルの保存に失敗しました。サポートへお問い合わせください。'}
# バリデーションチェック
try:
logger.info("----- check start")
error_data, error_count = _validationChecks(tmpFilePath, id)
logger.info("----- check end")
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'バリデーションチェックエラーです。'}
# ステージ作成
try:
logger.info("----- createInternalStage start")
createInternalStage()
logger.info("----- createInternalStage end")
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'ステージ作成に失敗しました。サポートへお問い合わせください。'}
# 登録用JSONファイル作成
try:
logger.info("----- createUploadFile start")
filePath, fileName, rowkey, error_recordNum = _createUploadFile(tmpFilePath,
request_id, error_data, error_count)
logger.info("----- createUploadFile end")
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": '登録用ファイル作成に失敗しました。サポートへお問い合わせください。'}
# ファイルのステージング
try:
logger.info("----- putDataFile start")
putDataFile(filePath)
logger.info("----- putDataFile end")
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": 'ファイルの登録に失敗しました。サポートへお問い合わせください。'}
# 登録済データ削除
if method == 'update':
try:
_truncateTable(table)
except Exception as e:
logger.error(f"{e}", exc_info=True)
return {"errorMsg": '既存データ削除に失敗しました。サポートへお問い合わせください。'}
# ジョブ開始
processTitle = "サンプルデータ挿入"
processName = f"{processTitle}処理 {logicalName}:{filename}"
externalQuery = {"apiTablePhy": id.upper(),
"putApiDataTran": rowkey}
jobId = beginJobStatus(processTitle, processName, 2, externalQuery)
# 非同期処理開始
fire_and_forget(_execJob, table, fileName, request_id,
jobId, error_data, error_recordNum)
return {"message": "COMPLETE", "jobId": jobId}
def generate_field(field_props):
validators = []
if 'pattern' in field_props:
validators.append(validate.Regexp(regex=field_props['pattern']))
field_type = field_props['type']
if field_type == 'string':
return fields.Str(validate=validators)
elif field_type == 'integer':
return fields.Integer()
# 可以添加对其他类型的支持...
# 使用type动态创建带有字段的Schema类
def dynamic_schema_generator(schema_definitions):
# 准备一个字典来收集类属性(即字段)
attrs = {}
for field_name, field_props in schema_definitions.items():
# 根据字段属性生成字段实例
field_instance = generate_field(field_props)
if field_instance:
attrs[field_name] = field_instance
# 使用type动态创建类
return type('DynamicSchema', (Schema,), attrs)
# DPLのinput.ymlをダウンロード
def download_private_file_from_github(url: str, destination_path: str, token: str):
try:
headers = {
'Authorization': f'token {token}',
'Accept': 'application/vnd.github.v3.raw',
}
response = requests.get(url, headers=headers)
if response.status_code == 200:
with open(destination_path, 'wb') as file:
file.write(response.content)
shutil.copyfile(
f"{to_path}/{tenant}/input_tpl.yml", f"{to_path}/input.yml")
2024-04-04 05:30:36,088 ERROR -20240404-960bc418-ae11-4cd2-b34c-ff357b295b10 [Errno 2] No such file or directory: 'tmp/i4-dpb/ind/input_test.yml'
Traceback (most recent call last):
File "/app/ap/postCopyIntoApiData.py", line 395, in postCopyIntoApiData
error_data = _validationChecks(tmpFilePath, id)
File "/app/ap/postCopyIntoApiData.py", line 306, in _validationChecks
raise e
File "/app/ap/postCopyIntoApiData.py", line 283, in _validationChecks
download_private_file_from_github(file_url, destination, GIT_TOKEN)
File "/app/ap/postCopyIntoApiData.py", line 273, in download_private_file_from_github
raise e
File "/app/ap/postCopyIntoApiData.py", line 266, in download_private_file_from_github
with open(destination_path, 'wb') as file:
FileNotFoundError: [Errno 2] No such file or directory: 'tmp/i4-dpb/ind/input_test.yml'
# 确保目标路径的文件夹存在
os.makedirs(os.path.dirname(destination_path), exist_ok=True)
uring handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/vendored/requests/adapters.py", line 489, in send
resp = conn.urlopen(
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/vendored/urllib3/connectionpool.py", line 787, in urlopen
retries = retries.increment(
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/vendored/urllib3/util/retry.py", line 550, in increment
raise six.reraise(type(error), error, _stacktrace)
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/vendored/urllib3/packages/six.py", line 769, in reraise
raise value.with_traceback(tb)
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/vendored/urllib3/connectionpool.py", line 703, in urlopen
httplib_response = self._make_request(
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/vendored/urllib3/connectionpool.py", line 449, in _make_request
six.raise_from(e, None)
File "<string>", line 3, in raise_from
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/vendored/urllib3/connectionpool.py", line 444, in _make_request
httplib_response = conn.getresponse()
File "/usr/local/lib/python3.9/http/client.py", line 1377, in getresponse
response.begin()
File "/usr/local/lib/python3.9/http/client.py", line 320, in begin
version, status, reason = self._read_status()
File "/usr/local/lib/python3.9/http/client.py", line 281, in _read_status
line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1")
File "/usr/local/lib/python3.9/socket.py", line 704, in readinto
return self._sock.recv_into(b)
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/vendored/urllib3/contrib/pyopenssl.py", line 334, in recv_into
return self.recv_into(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/vendored/urllib3/contrib/pyopenssl.py", line 324, in recv_into
raise SocketError(str(e))
snowflake.connector.vendored.urllib3.exceptions.ProtocolError: ('Connection aborted.', OSError("(104, 'ECONNRESET')"))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/network.py", line 1018, in _request_exec
raw_ret = session.request(
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/vendored/requests/sessions.py", line 587, in request
resp = self.send(prep, **send_kwargs)
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/vendored/requests/sessions.py", line 701, in send
r = adapter.send(request, **kwargs)
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/vendored/requests/adapters.py", line 547, in send
raise ConnectionError(err, request=request)
snowflake.connector.vendored.requests.exceptions.ConnectionError: ('Connection aborted.', OSError("(104, 'ECONNRESET')"))
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/network.py", line 837, in _request_exec_wrapper
return_object = self._request_exec(
File "/usr/local/lib/python3.9/site-packages/snowflake/connector/network.py", line 1105, in _request_exec
raise RetryRequest(err)
snowflake.connector.network.RetryRequest: ('Connection aborted.', OSError("(104, 'ECONNRESET')"))
Field may not be null