Open kirin-ri opened 7 months ago
def generate_field(field_props):
field_type = field_props['type']
validators = []
if 'pattern' in field_props:
validators.append(validate.Regexp(regex=field_props['pattern']))
required = field_props.get('required', False)
if 'x-validations' in field_props:
max_length = field_props.get(
'x-validations', {}).get('byteLength', {}).get('max')
if max_length:
validators.append(validate.Length(max=max_length))
if field_type == 'string':
return fields.Str(validate=validators, required=required)
elif field_type == 'integer':
return fields.Integer(validate=validators, required=required)
elif field_type == 'number':
return fields.Float(validate=validators, required=required)
elif field_type == 'boolean':
return fields.Boolean(validate=validators, required=required)
data:
properties:
binary:
description: binary
format: binary
required: true
type: string
boolean:
description: boolean
required: true
type: boolean
byte:
description: byte
format: byte
required: true
type: string
date:
description: date
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}$
required: true
type: string
dateTime:
description: dateTime
pattern: ^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$
required: true
type: string
double:
description: double
format: double
required: true
type: number
float:
description: float
format: float
required: true
type: number
integer:
description: integer
format: int32
required: true
type: integer
long:
description: long
format: int64
required: true
type: integer
string:
description: string
required: true
type: string
x-validations:
byteLength:
max: 255
if field_type == 'string':
return fields.Str(validate=validators, required=required)
elif field_type == 'byte':
validators.append(validate_base64)
return fields.Str(validate=validators, required=required)
elif field_type == 'binary':
# 此处使用Str代表二进制数据,根据实际需求调整
return fields.Str(validate=validators, required=required)
elif field_type == 'boolean':
return fields.Boolean(validate=validators, required=required)
elif field_type in ['double', 'float']:
return fields.Float(validate=validators, required=required)
elif field_type in ['int32', 'integer']:
return fields.Integer(validate=validators, required=required)
elif field_type == 'int64':
validators.append(validate_int64)
return fields.Integer(validate=validators, required=required)
else:
# 如果遇到未知的类型,返回None或根据需要抛出异常
return None
# 自定义验证器:检查Base64编码
def validate_base64(value):
try:
if isinstance(value, str):
base64.b64encode(base64.b64decode(value)).decode() != value
except Exception:
raise ValidationError('Invalid base64 string')
# 自定义验证器:检查数值是否在int64的范围内
def validate_int64(value):
MIN_INT64 = -2**63
MAX_INT64 = 2**63 - 1
if not (MIN_INT64 <= value <= MAX_INT64):
raise ValidationError(f'Value {value} is out of range for int64')
def generate_field(field_props):
field_type = field_props.get('format', field_props['type'])
required = field_props.get('required', False)
validators = []
if 'pattern' in field_props:
validators.append(validate.Regexp(regex=field_props['pattern']))
if 'x-validations' in field_props:
max_length = field_props.get(
'x-validations', {}).get('byteLength', {}).get('max')
if max_length:
validators.append(validate.Length(max=max_length))
if field_type == 'string':
return fields.Str(validate=validators, required=required)
elif field_type == 'byte' or field_type == 'binary':
validators.append(validate_base64)
return fields.Str(validate=validators, required=required)
elif field_type == 'boolean':
return fields.Boolean(validate=validators, required=required)
elif field_type in ['double', 'float']:
return fields.Float(validate=validators, required=required)
elif field_type in ['int32', 'integer']:
return fields.Integer(validate=validators, required=required)
elif field_type == 'int64':
validators.append(validate_int64)
return fields.Integer(validate=validators, required=required)
def validate_base64(value):
try:
if isinstance(value, str):
base64.b64encode(base64.b64decode(value)).decode() != value
except Exception:
raise ValidationError('Invalid base64 string')
def validate_int64(value):
MIN_INT64 = -2**63
MAX_INT64 = 2**63 - 1
if not (MIN_INT64 <= value <= MAX_INT64):
raise ValidationError(f'Value {value} is out of range for int64')
boolean,byte,date,dateTime,double,float,integer,long,string,binary
TRUE,発注元A,2024-01-01,2024-01-01T09:00:00Z,3.1415926,2.71828,2024,2024,発注元A,発注元A
def validate_double(value):
# 例如,您可以为 double 类型定义特定的范围或精度要求
if not (-1e308 <= value <= 1e308):
raise ValidationError('Value is out of range for double')
def validate_float(value):
# 对 float 进行不同的验证,例如限制其为更小的范围
if not (-1e38 <= value <= 1e38):
raise ValidationError('Value is out of range for float')
elif field_type == 'double':
validators.append(validate_double)
return fields.Float(validate=validators)
elif field_type == 'float':
validators.append(validate_float)
return fields.Float(validate=validators)
def validate_int32(value):
MIN_INT32 = -2**31
MAX_INT32 = 2**31 - 1
if not (MIN_INT32 <= value <= MAX_INT32):
raise ValidationError(f'Value {value} is out of range for int32')
if field_type in ['int32', 'integer']:
validators.append(validate_int32) # 为 int32 和 integer 类型添加范围验证
return fields.Integer(validate=validators, required=required)
def validate_int64(value):
MIN_INT64 = -2**63
MAX_INT64 = 2**63 - 1
if not (MIN_INT64 <= value <= MAX_INT64):
raise ValidationError(f'Value {value} is out of range for int64')
def validate_double(value):
if not (-1e308 <= value <= 1e308):
raise ValidationError('Value is out of range for double')
def validate_float(value):
if not (-1e38 <= value <= 1e38):
raise ValidationError('Value is out of range for float')
def validate_int32(value):
MIN_INT32 = -2**31
MAX_INT32 = 2**31 - 1
if not (MIN_INT32 <= value <= MAX_INT32):
raise ValidationError(f'Value {value} is out of range for int32')
boolean
base64 string
string (^[0-9]{4}-[0-9]{2}-[0-9]{2}$)
string (^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$)
float 8バイトの情報量まで
float 4バイトの情報量まで
integer 4バイトの情報量まで
integer 8バイトの情報量まで
string
base64 string
TRUE SGVsbG8= 2024/1/1 2024-01-01T09:00:00Z 3.1415926 2.71828 2024 2024 ValidString SGVsbG8=
2147483648
9223372036854775808
-0.428196834 -2334.188461 -0.428196833699061 -2334.18846139727
0.06517951 293.8348105 0.065179510006247 293.834810504991
1.216788842 11452.11733 1.21678884184228 11452.1173285123
0.880737853 4801.082054 0.880737852867307 4801.08205359479
2.053889672 9259.110457 2.05388967245248 9259.11045734148
1.626373056 15307.02322 1.62637305581117 15307.0232193129
1.535857273 8372.271913 1.53585727254699 8372.27191280868
1.243311405 5604.954242 1.24331140515675 5604.95424248994
-0.273759896 -2576.56081 -0.273759895590889 -2576.56081017426
-0.110280453 -962.3965399 -0.11028045338636 -962.396539926536
-0.475346274 -3645.924643 -0.475346274005157 -3645.92464332615
1.761503972 14566.8066 1.76150397219799 14566.8066026259
-0.515716008 -4500.555505 -0.515716007858462 -4500.55550469016
1.523988963 11689.05537 1.52398896332563 11689.0553716335
0.3250562 2688.061378 0.325056199669602 2688.06137840456
1.38309036 12069.96649 1.38309036033545 12069.9664928764
0.568975086 4364.061315 0.568975085551628 4364.06131549645
-0.161901787 -1338.85138 -0.161901787311353 -1338.85138018802
-0.215330792 -1245.698106 -0.215330791924144 -1245.69810607766
-0.245448015 -1679.546906 -0.245448014837058 -1679.54690558168
1.271759278 8789.277469 1.27175927816964 8789.27746873391
-0.412619295 -2387.020779 -0.412619295271766 -2387.0207788592
1.353470482 9261.501508 1.35347048181578 9261.50150792184
1.536694429 10620.27535 1.53669442876753 10620.2753547309
1.799522174 10410.31496 1.79952217367572 10410.3149557091
1.59448936 10910.7408 1.59448935954299 10910.7408001697
-0.312559978 -2160.138651 -0.312559977935728 -2160.13865112295
0 0 0. 0.
0 0 0. 0.
0 0 0. 0.
0 0 0. 0.
0.449684432 2601.444223 0.449684431586577 2601.44422334798
import datetime
from pyspark.sql.functions import *
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType
import datetime
from scipy.stats import norm
from scipy.stats import skew as sk
from scipy.stats import kurtosis as kt
# 共通機能のパッケージimport
import d2s
# SQL
df_err = d2s.readData(
spark,
dbutils,
sfDatabase="STARTPACK_DEV",
sfSchema="PUBLIC",
query="""
SELECT
PLAN."FINAL_PRODUCT",
PLAN."EXECUTION_DATE",
PLAN."TARGET_DATE",
SUM(PLAN."QUANTITY") AS PLAN,
SUM(COALESCE(PERF."QUANTITY", 0)) AS PERF
FROM
"STARTPACK_PRODUCT_SALES_M_PLAN" PLAN
LEFT JOIN
"STARTPACK_PRODUCT_SALES_M_PERF" PERF
ON PERF."FINAL_PRODUCT" = PLAN."FINAL_PRODUCT"
AND PERF."TARGET_DATE" = PLAN."TARGET_DATE"
GROUP BY
PLAN.FINAL_PRODUCT,
PLAN.EXECUTION_DATE,
PLAN.TARGET_DATE
ORDER BY "FINAL_PRODUCT", "TARGET_DATE"
"""
)
# データ加工
# 「完成品」「region」「実行年月」「計画対象月」単位の計算処理①
df_err = df_err.withColumn("BEFORE_X",
floor(months_between(
date_format(
"TARGET_DATE", "yyyy-MM"),
date_format("EXECUTION_DATE",
"yyyy-MM")))
).withColumn("ERROR",
(df_err.PERF - df_err.PLAN)
).withColumn("ERROR_RATE",
(df_err.PERF -
df_err.PLAN) /
df_err.PLAN
)
df_err = df_err.withColumn("ERROR_AE",
abs(df_err.ERROR)
).withColumn("ERROR_RATE_APE",
abs(df_err.ERROR_RATE)
)
# 実行日時とAnaplan用のユニークキー付与
today = datetime.date.today()
df_err = df_err.withColumn("EXEC_DATE", lit(today)
).withColumn("UID",
concat_ws("_",
"FINAL_PRODUCT",
"TARGET_DATE",
"BEFORE_X")
)
# 「完成品」「region」「xヶ月前」単位の計算処理①
df_stat_1 = df_err.groupBy("FINAL_PRODUCT", "BEFORE_X").agg(
avg("ERROR").alias("ERROR_AVG"),
avg("ERROR_AE").alias("ERROR_MAE"),
avg("ERROR_RATE").alias("ERROR_RATE_AVG"),
avg("ERROR_RATE_APE").alias("ERROR_RATE_MAPE"),
stddev_samp("ERROR").alias("ERROR_STDDEV"),
stddev_samp("ERROR_RATE").alias("ERROR_RATE_STDDEV"),
expr("percentile(ERROR_RATE,0.5)").alias("ERROR_RATE_MEDIAN"),
count("ERROR_RATE").alias("COUNT")
)
# レコード数が1件の場合、標準偏差がNULLとなるため0に置き換え
df_singleCount = df_stat_1.filter("COUNT = 1")
df_multiCount = df_stat_1.filter("COUNT > 1")
df_singleCount = df_singleCount.fillna(0, subset=["ERROR_STDDEV"])
df_singleCount = df_singleCount.fillna(0, subset=["ERROR_RATE_STDDEV"])
df_stat_1 = df_singleCount.unionAll(df_multiCount)
# 変動係数算出
df_stat_1 = df_stat_1.withColumn(
"ERROR_RATE_CV",
df_stat_1["ERROR_RATE_STDDEV"] / df_stat_1["ERROR_RATE_AVG"])
# ERROR_RATE_AVGが0の場合、ERROR_RATE_CVがNULLとなるため0に置き換え
df_stat_1 = df_stat_1.fillna(0, subset=["ERROR_RATE_CV"])
# 実行日時とAnaplan用のユニークキー付与
df_stat_1 = df_stat_1.withColumn("EXEC_DATE", lit(today)
).withColumn("UID",
concat_ws(
"_",
"FINAL_PRODUCT",
"BEFORE_X")
)
# 「完成品」「region」「実行年月」「計画対象月」単位の計算処理②
df_tmp = df_stat_1.select(
"FINAL_PRODUCT", "BEFORE_X",
"ERROR_STDDEV", "ERROR_RATE_STDDEV")
ons = ["FINAL_PRODUCT", "BEFORE_X"]
df_err = df_err.join(df_tmp, on=ons, how="inner")
df_err = df_err.withColumn("ERROR_STANDARDIZE",
df_err.ERROR/df_err.ERROR_STDDEV
).withColumn("ERROR_RATE_STANDARDIZE",
df_err.ERROR /
df_err.ERROR_RATE_STDDEV
)
# ERROR_STDDEVが0の場合、ERROR_STANDARDIZEとERROR_RATE_STANDARDIZEがNULLとなるため0に置き換え
df_err = df_err.fillna(0, subset=["ERROR_STANDARDIZE"])
df_err = df_err.fillna(0, subset=["ERROR_RATE_STANDARDIZE"])
# 「完成品」「region」「xヶ月前」単位の計算処理②
# キーごとに集計した歪度と尖度を計算するヘルパー関数
def calculate_skewness(column):
return sk(column, bias=False)
def calculate_kurtosis(column):
return kt(column, bias=False)
# キーごとに集計
grouped_df = df_err.groupBy("FINAL_PRODUCT", "BEFORE_X").agg(
collect_list("ERROR_STANDARDIZE").alias("ERROR_STANDARDIZE_TMP"),
collect_list("ERROR_RATE_STANDARDIZE").alias(
"ERROR_RATE_STANDARDIZE_TMP"),
count("BEFORE_X").alias("COUNT_TMP")
)
# 歪度と尖度を計算して新しい列に追加
calculate_skewness_udf = udf(calculate_skewness)
calculate_kurtosis_udf = udf(calculate_kurtosis)
# 歪度は集計データ数が3未満の場合は計算できないため、0を入れる
df_stat_2 = grouped_df.withColumn("ERROR_SKEW", when(col(
"COUNT_TMP") < 3, 0).otherwise(
calculate_skewness_udf("ERROR_STANDARDIZE_TMP")))
df_stat_2 = df_stat_2.withColumn("ERROR_RATE_SKEW", when(col(
"COUNT_TMP") < 3, 0).otherwise(
calculate_skewness_udf("ERROR_RATE_STANDARDIZE_TMP")))
# 尖度は集計データ数が4未満の場合は計算できないため、0を入れる
df_stat_2 = df_stat_2.withColumn("ERROR_KURT", when(col(
"COUNT_TMP") < 4, 0).otherwise(
calculate_kurtosis_udf("ERROR_STANDARDIZE_TMP")))
df_stat_2 = df_stat_2.withColumn("ERROR_RATE_KURT", when(col(
"COUNT_TMP") < 4, 0).otherwise(
calculate_kurtosis_udf("ERROR_RATE_STANDARDIZE_TMP")))
# 「完成品」「region」「xヶ月前」単位の計算処理①②を結合
ons = ["FINAL_PRODUCT", "BEFORE_X"]
df_stat = df_stat_1.join(df_stat_2, on=ons, how="inner")
# 出力順並び替え
df_err = df_err.sort("FINAL_PRODUCT",
"EXECUTION_DATE", "TARGET_DATE")
df_err = df_err.select(
"FINAL_PRODUCT", "EXECUTION_DATE", "TARGET_DATE",
"BEFORE_X", "PLAN", "PERF", "ERROR", "ERROR_RATE",
"ERROR_AE", "ERROR_RATE_APE",
"ERROR_STANDARDIZE", "ERROR_RATE_STANDARDIZE",
"EXEC_DATE", "UID")
df_stat = df_stat.sort("FINAL_PRODUCT", "BEFORE_X")
df_stat = df_stat.select(
"FINAL_PRODUCT", "BEFORE_X",
"ERROR_RATE_MEDIAN", "ERROR_RATE_CV", "ERROR_AVG", "ERROR_MAE",
"ERROR_RATE_AVG", "ERROR_RATE_MAPE",
"ERROR_STDDEV", "ERROR_RATE_STDDEV",
"ERROR_SKEW", "ERROR_RATE_SKEW", "ERROR_KURT", "ERROR_RATE_KURT",
"EXEC_DATE", "UID")
display(df_err)
display(df_stat)
PythonException: 'TypeError: unsupported operand type(s) for ** or pow(): 'decimal.Decimal' and 'float'', from
import datetime
from pyspark.sql.functions import *
from pyspark.sql.functions import *
from pyspark.sql.types import DecimalType
import datetime
from scipy.stats import norm
decimal_type = DecimalType(38,10)
# 共通機能のパッケージimport
import d2s
# SQL
df_err = d2s.readData(
spark,
dbutils,
sfDatabase="STARTPACK_DEV",
sfSchema="PUBLIC",
query="""
SELECT
PLAN."FINAL_PRODUCT",
PLAN."EXECUTION_DATE",
PLAN."TARGET_DATE",
SUM(PLAN."QUANTITY") AS PLAN,
SUM(COALESCE(PERF."QUANTITY", 0)) AS PERF
FROM
"STARTPACK_PRODUCT_SALES_M_PLAN" PLAN
LEFT JOIN
"STARTPACK_PRODUCT_SALES_M_PERF" PERF
ON PERF."FINAL_PRODUCT" = PLAN."FINAL_PRODUCT"
AND PERF."TARGET_DATE" = PLAN."TARGET_DATE"
GROUP BY
PLAN.FINAL_PRODUCT,
PLAN.EXECUTION_DATE,
PLAN.TARGET_DATE
ORDER BY "FINAL_PRODUCT", "TARGET_DATE"
"""
)
# データ加工
# 「完成品」「region」「実行年月」「計画対象月」単位の計算処理①
df_err = df_err.withColumn("BEFORE_X",
floor(months_between(
date_format(
"TARGET_DATE", "yyyy-MM"),
date_format("EXECUTION_DATE",
"yyyy-MM")))
).withColumn("ERROR",
(df_err.PERF - df_err.PLAN)
).withColumn("ERROR_RATE",
(df_err.PERF -
df_err.PLAN) /
df_err.PLAN
)
df_err = df_err.withColumn("ERROR_AE",
abs(df_err.ERROR)
).withColumn("ERROR_RATE_APE",
abs(df_err.ERROR_RATE)
)
# 実行日時とAnaplan用のユニークキー付与
today = datetime.date.today()
df_err = df_err.withColumn("EXEC_DATE", lit(today)
).withColumn("UID",
concat_ws("_",
"FINAL_PRODUCT",
"TARGET_DATE",
"BEFORE_X")
)
# 「完成品」「region」「xヶ月前」単位の計算処理①
df_stat_1 = df_err.groupBy("FINAL_PRODUCT", "BEFORE_X").agg(
avg("ERROR").alias("ERROR_AVG"),
avg("ERROR_AE").alias("ERROR_MAE"),
avg("ERROR_RATE").alias("ERROR_RATE_AVG"),
avg("ERROR_RATE_APE").alias("ERROR_RATE_MAPE"),
stddev_samp("ERROR").alias("ERROR_STDDEV"),
stddev_samp("ERROR_RATE").alias("ERROR_RATE_STDDEV"),
expr("percentile(ERROR_RATE,0.5)").alias("ERROR_RATE_MEDIAN"),
count("ERROR_RATE").alias("COUNT")
)
# レコード数が1件の場合、標準偏差がNULLとなるため0に置き換え
df_singleCount = df_stat_1.filter("COUNT = 1")
df_multiCount = df_stat_1.filter("COUNT > 1")
df_singleCount = df_singleCount.fillna(0, subset=["ERROR_STDDEV"])
df_singleCount = df_singleCount.fillna(0, subset=["ERROR_RATE_STDDEV"])
df_stat_1 = df_singleCount.unionAll(df_multiCount)
# 変動係数算出
df_stat_1 = df_stat_1.withColumn(
"ERROR_RATE_CV",
df_stat_1["ERROR_RATE_STDDEV"] / df_stat_1["ERROR_RATE_AVG"])
# ERROR_RATE_AVGが0の場合、ERROR_RATE_CVがNULLとなるため0に置き換え
df_stat_1 = df_stat_1.fillna(0, subset=["ERROR_RATE_CV"])
# 実行日時とAnaplan用のユニークキー付与
df_stat_1 = df_stat_1.withColumn("EXEC_DATE", lit(today)
).withColumn("UID",
concat_ws(
"_",
"FINAL_PRODUCT",
"BEFORE_X")
)
# 「完成品」「region」「実行年月」「計画対象月」単位の計算処理②
df_tmp = df_stat_1.select(
"FINAL_PRODUCT", "BEFORE_X",
"ERROR_STDDEV", "ERROR_RATE_STDDEV")
ons = ["FINAL_PRODUCT", "BEFORE_X"]
df_err = df_err.join(df_tmp, on=ons, how="inner")
df_err = df_err.withColumn("ERROR_STANDARDIZE",
(df_err.ERROR/df_err.ERROR_STDDEV).cast(decimal_type)
)
df_err = df_err.withColumn("ERROR_RATE_STANDARDIZE",
(df_err.ERROR /
df_err.ERROR_RATE_STDDEV).cast(decimal_type)
)
# ERROR_STDDEVが0の場合、ERROR_STANDARDIZEとERROR_RATE_STANDARDIZEがNULLとなるため0に置き換え
df_err = df_err.fillna(0, subset=["ERROR_STANDARDIZE"])
df_err = df_err.fillna(0, subset=["ERROR_RATE_STANDARDIZE"])
# 「完成品」「region」「xヶ月前」単位の計算処理②
# キーごとに集計した歪度と尖度を計算するヘルパー関数
def calculate_skewness(column):
return sk(column, bias=False)
def calculate_kurtosis(column):
return kt(column, bias=False)
# キーごとに集計
grouped_df = df_err.groupBy("FINAL_PRODUCT", "BEFORE_X").agg(
collect_list("ERROR_STANDARDIZE").alias("ERROR_STANDARDIZE_TMP"),
collect_list("ERROR_RATE_STANDARDIZE").alias(
"ERROR_RATE_STANDARDIZE_TMP"),
count("BEFORE_X").alias("COUNT_TMP")
)
# 歪度と尖度を計算して新しい列に追加
calculate_skewness_udf = udf(calculate_skewness)
calculate_kurtosis_udf = udf(calculate_kurtosis)
# 歪度は集計データ数が3未満の場合は計算できないため、0を入れる
df_stat_2 = grouped_df.withColumn("ERROR_SKEW", when(col(
"COUNT_TMP") < 3, 0).otherwise(
calculate_skewness_udf("ERROR_STANDARDIZE_TMP")))
df_stat_2 = df_stat_2.withColumn("ERROR_RATE_SKEW", when(col(
"COUNT_TMP") < 3, 0).otherwise(
calculate_skewness_udf("ERROR_RATE_STANDARDIZE_TMP")))
# 尖度は集計データ数が4未満の場合は計算できないため、0を入れる
df_stat_2 = df_stat_2.withColumn("ERROR_KURT", when(col(
"COUNT_TMP") < 4, 0).otherwise(
calculate_kurtosis_udf("ERROR_STANDARDIZE_TMP")))
df_stat_2 = df_stat_2.withColumn("ERROR_RATE_KURT", when(col(
"COUNT_TMP") < 4, 0).otherwise(
calculate_kurtosis_udf("ERROR_RATE_STANDARDIZE_TMP")))
# 「完成品」「region」「xヶ月前」単位の計算処理①②を結合
ons = ["FINAL_PRODUCT", "BEFORE_X"]
df_stat = df_stat_1.join(df_stat_2, on=ons, how="inner")
# 出力順並び替え
df_err = df_err.sort("FINAL_PRODUCT",
"EXECUTION_DATE", "TARGET_DATE")
df_err = df_err.select(
"FINAL_PRODUCT", "EXECUTION_DATE", "TARGET_DATE",
"BEFORE_X", "PLAN", "PERF", "ERROR", "ERROR_RATE",
"ERROR_AE", "ERROR_RATE_APE",
"ERROR_STANDARDIZE", "ERROR_RATE_STANDARDIZE",
"EXEC_DATE", "UID")
df_stat = df_stat.sort("FINAL_PRODUCT", "BEFORE_X")
df_stat = df_stat.select(
"FINAL_PRODUCT", "BEFORE_X",
"ERROR_RATE_MEDIAN", "ERROR_RATE_CV", "ERROR_AVG", "ERROR_MAE",
"ERROR_RATE_AVG", "ERROR_RATE_MAPE",
"ERROR_STDDEV", "ERROR_RATE_STDDEV",
"ERROR_SKEW", "ERROR_RATE_SKEW", "ERROR_KURT", "ERROR_RATE_KURT",
"EXEC_DATE", "UID")
display(df_err)
display(df_stat)
**1. 問題の概要 データ分析プロジェクトにおいて、PySpark、Snowflake、およびExcelを使用してデータ処理と分析を行っています。データ処理の過程で、PySparkとSnowflakeでの自動データ型変換により、Excelでの計算結果と異なる値が発生しています。
型の推論:異なるデータ型間での演算を可能にするため。 精度の確保:データの精度損失を防ぐため、より広い数値範囲を持つdouble型に変換されます。 桁溢れの防止:大きな数値を適切に処理するため。
Snowflakeのデータ型処理 Snowflakeにデータを挿入する際、FLOAT、DOUBLE、DECIMALなどの数値型が利用されます。特に、精度が要求される場面ではDECIMAL型が選ばれることが一般的ですが、デフォルトの設定やデータ型の指定により、変換処理が発生する可能性があります。
Excelとの計算値の不一致 Excelでは数値計算に独自のアルゴリズムを使用しており、浮動小数点の精度が異なる場合があります。特に、Excelの精度は15桁までとなっており、それ以上の精度を要するデータでは不一致が生じます。
影響と推奨対策 データ型の標準化:PySpark、Snowflake、Excel間でデータ型と精度を統一することが重要です。 明示的なデータ型指定:データ処理スクリプトでデータ型を明示的に指定し、自動変換による影響を最小化します。 結果の検証と調整:最終的なデータ分析結果をExcelと比較検証し、必要に応じて調整を行います。
結論 データ処理における自動データ型変換は、ツール間で計算結果の不一致を引き起こす主な要因です。この問題を解決するためには、データの取り扱いと処理方法を標準化し、各ステージでのデータ精度を確保する必要があります。**
excel期待値
pyspark(DOUBLE型)
snowflake(FLOAT型)
-0.428196833699061
-0.428196833699061
-0.4281968337
-0.27375989559088854
-0.273759895590889
-0.2737598956
0.568975085551628
0.5689750855516278
0.5689750856
0.065179510006247
0.06517951000624675
0.06517951001
Excel は通常、倍精度浮動小数点数(64ビット、IEEE 754標準)を使用し、約15~16桁の十進数精度を提供します。表示された数値 -0.428196833699061 と 0.065179510006247 は、この精度範囲内で正確に示されています。 PySpark(DOUBLE型):
PySpark の DOUBLE 型も倍精度浮動小数点数を使用し、約15~17桁の精度を持っています。数値 -0.428196833699061 はそのままですが、0.06517951000624675 には若干の変動が見られます。これは浮動小数点数の計算で発生する誤差によるものですが、依然として高い精度を保持しています。 Snowflake(FLOAT型):
Snowflake の FLOAT 型(この場合、恐らく FLOAT8、倍精度を想定)でも、倍精度浮動小数点数を使用していますが、表示される桁数が -0.4281968337 と 0.06517951001 となっています。これはSnowflakeが内部的に持つ精度よりも表示精度を少し制限している可能性があります。ただし、Snowflake で FLOAT4(単精度)が使用されている場合、精度はさらに低くなります(約6~9桁)。
精度を最大限に保ちたい場合、PySparkではDecimalTypeを使用することをお勧めします。DecimalTypeは固定小数点数を扱い、必要な精度とスケール(小数点以下の桁数)を指定できます。これにより、浮動小数点数で発生する誤差を避けることができ、金融計算や正確な数値処理に適しています。
DecimalTypeを定義する際には、精度(総桁数)とスケールを指定することが可能です。例えば、精度が10でスケールが5の場合、最大で10桁の数字を扱い、そのうち小数点以下5桁までを保持できます。
0.06517951001
-1080.069325561