pavulurig / stock-price-prediction-transformer

Tesal Stock Price Prediction Using Transformer
MIT License
9 stars 3 forks source link

I add a new feature Volume,But get bad result #1

Open sounds opened 8 months ago

sounds commented 8 months ago

Hi,I Try to add new feature like Volume,but get bad result. so I copy Close as 2nd feature to get same result,but failed. Can you help me find some err of my code ?

import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, LayerNormalization, MultiHeadAttention, Dropout, GlobalAveragePooling1D
from sklearn.metrics import mean_squared_error
import math
import matplotlib.pyplot as plt

# Load and prepare the dataset
file_path = '../data/TSLA.csv'  # Make sure to have your dataset ready
df = pd.read_csv(file_path)
df['Open'] = df['Close']

# data = df[['Close']].values
# scaler = MinMaxScaler(feature_range=(0, 1))
# data_scaled = scaler.fit_transform(data)

# data = df[['Close', 'Volume']].values  # 现在包括 Close 和 Volume 两列
data = df[['Close', 'Open']].values  # 现在包括 Close 和 Volume 两列
scaler = MinMaxScaler(feature_range=(0, 1))
data_scaled = scaler.fit_transform(data)

print(data_scaled)

# def create_dataset(dataset, time_step=1):
#     dataX, dataY = [], []
#     for i in range(len(dataset) - time_step - 1):
#         a = dataset[i:(i + time_step), 0]
#         dataX.append(a)
#         dataY.append(dataset[i + time_step, 0])
#     return np.array(dataX), np.array(dataY)

def create_dataset(dataset, time_step=1):
    dataX, dataY = [], []
    for i in range(len(dataset) - time_step - 1):
        a = dataset[i:(i + time_step), :]  # 修改这里,以包括所有特征
        dataX.append(a)
        dataY.append(dataset[i + time_step, 0])  # 标签仍然是 Close 的下一个值,因此这里保持不变
    return np.array(dataX), np.array(dataY)

# Parameters
time_step = 100
training_size = int(len(data_scaled) * 0.67)
test_size = len(data_scaled) - training_size
train_data, test_data = data_scaled[0:training_size,:], data_scaled[training_size:len(data_scaled),:]

X_train, y_train = create_dataset(train_data, time_step)
X_test, y_test = create_dataset(test_data, time_step)

# Reshape input for the model
# X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], 1)
# X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 1)

X_train = X_train.reshape(X_train.shape[0], X_train.shape[1],2)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], 2)

# Transformer Block
def transformer_encoder(inputs, head_size, num_heads, ff_dim, dropout=0):
    x = LayerNormalization(epsilon=1e-6)(inputs)
    x = MultiHeadAttention(key_dim=head_size, num_heads=num_heads, dropout=dropout)(x, x)
    x = Dropout(dropout)(x)
    res = x + inputs

    x = LayerNormalization(epsilon=1e-6)(res)
    x = Dense(ff_dim, activation="relu")(x)
    x = Dropout(dropout)(x)
    x = Dense(inputs.shape[-1])(x)
    return x + res

# Model Definition
# inputs = Input(shape=(X_train.shape[1], X_train.shape[2]))
inputs = Input(shape=(X_train.shape[1], X_train.shape[2]))  # 确保 X_train.shape[2] 现在是 2
x = transformer_encoder(inputs, head_size=256, num_heads=4, ff_dim=4, dropout=0.1)
x = GlobalAveragePooling1D(data_format='channels_first')(x)
x = Dropout(0.1)(x)
x = Dense(20, activation="relu")(x)
outputs = Dense(1, activation="linear")(x)

model = Model(inputs=inputs, outputs=outputs)
model.compile(optimizer="adam", loss="mean_squared_error")

# Model Summary
model.summary()

# Train the model
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=50, batch_size=64, verbose=1)

# Make predictions
train_predict = model.predict(X_train)
test_predict = model.predict(X_test)

# Prepare for inverse transform by expanding the predictions to match the scaler's shape
train_predict_expanded = np.zeros((len(train_predict), 2))  # 创建一个形状匹配scaler的新数组
train_predict_expanded[:, 0] = train_predict[:, 0]  # 填充预测到第一列,假定Close价格在第一列
train_predict_inversed = scaler.inverse_transform(train_predict_expanded)[:, 0]  # 逆变换并取出Close价格部分

test_predict_expanded = np.zeros((len(test_predict), 2))
test_predict_expanded[:, 0] = test_predict[:, 0]
test_predict_inversed = scaler.inverse_transform(test_predict_expanded)[:, 0]

# 注意:这里修正了对y_train和y_test逆变换的处理
# 首先确保y_train和y_test是正确形状
y_train_rescaled = scaler.inverse_transform(np.concatenate((y_train.reshape(-1, 1), np.zeros_like(y_train).reshape(-1, 1)), axis=1))[:, 0]
y_test_rescaled = scaler.inverse_transform(np.concatenate((y_test.reshape(-1, 1), np.zeros_like(y_test).reshape(-1, 1)), axis=1))[:, 0]

# Evaluate the model (Optional: Calculate RMSE or other metrics)
train_rmse = math.sqrt(mean_squared_error(y_train_rescaled, train_predict_inversed))
test_rmse = math.sqrt(mean_squared_error(y_test_rescaled, test_predict_inversed))

print(f"Train RMSE: {train_rmse}")
print(f"Test RMSE: {test_rmse}")

# Plotting the results
# Adjust the time_step offset for plotting
trainPredictPlot = np.empty_like(data_scaled)
trainPredictPlot[:, :] = np.nan
trainPredictPlot[time_step:len(train_predict_inversed)+time_step, 0] = train_predict_inversed  # 确保这里使用逆变换后的预测

# Shift test predictions for plotting
testPredictPlot = np.empty_like(data_scaled)
testPredictPlot[:, :] = np.nan
testPredictPlot[len(train_predict_inversed)+(time_step*2)+1:len(data_scaled)-1, 0] = test_predict_inversed  # 同上

# Plot baseline and predictions
plt.figure(figsize=(12, 6))
plt.plot(scaler.inverse_transform(data_scaled)[:, 0], label='Actual Stock Price')  # 确保只绘制Close价格
plt.plot(trainPredictPlot[:, 0], label='Train Predict')  # 修正索引以匹配绘图数据
plt.plot(testPredictPlot[:, 0], label='Test Predict')
plt.title('Stock Price Prediction using Transformer')
plt.xlabel('Time')
plt.ylabel('Stock Price')
plt.legend()
plt.show()
sounds commented 8 months ago

it seems that you do twice inverse_transform for train_predict.


# Make predictions
train_predict = model.predict(X_train)
test_predict = model.predict(X_test)

# Inverse transform predictions
train_predict = scaler.inverse_transform(train_predict)
test_predict = scaler.inverse_transform(test_predict)

# Evaluate the model (Optional: Calculate RMSE or other metrics)
train_rmse = math.sqrt(mean_squared_error(y_train, scaler.inverse_transform(train_predict.reshape(-1, 1))))
test_rmse = math.sqrt(mean_squared_error(y_test, scaler.inverse_transform(test_predict.reshape(-1, 1))))

print(f"Train RMSE: {train_rmse}")
print(f"Test RMSE: {test_rmse}")