Open RidhwanAmin opened 2 years ago
Hi, you can create a Deep ESN by setting _numlayers, e.g.:
model = ESN(input_size=10, hidden_size=1000, num_layers=3, output_size=2)
Thank you for your comment. But I encounter new problem where I do not know where to put the washout into the encoder decoder model. The code is beased on lstm encoder-decoder for lstm. `import numpy as np import random import torch import torch.nn as nn from torch import optim
class Encoder(nn.Module):
def __init__(self, input_size, hidden_size, num_layers = 5):
super(Encoder, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.washout = washout
self.ESN = ESN(input_size = input_size, output_size = input_size, hidden_size = hidden_size, num_layers = num_layers)
def forward(self, x):
flat = x.view(x.shape[0], x.shape[1], self.input_size)
out, h = self.lstm(flat)
return out, h
class Decoder(nn.Module):
def __init__(self, input_size, hidden_size, output_size = 1, num_layers = 5):
super(Decoder, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.output_size = output_size
self.washout = washout
self.ESN = ESN(input_size = input_size, hidden_size = hidden_size,output_size=output_size, num_layers = num_layers)
self.linear = nn.Linear(hidden_size, output_size)
def forward(self, x, h):
out, h = self.lstm(x.unsqueeze(0), h)
y = self.linear(out.squeeze(0))
return y, h
class EncoderDecoder(nn.Module):
def __init__(self, hidden_size, input_size = 1, output_size = 1):
super(EncoderDecoder, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.encoder = Encoder(input_size = input_size, hidden_size = hidden_size)
self.decoder = Decoder(input_size = input_size, hidden_size = hidden_size, output_size = output_size)
def train_model(
self, train,washout, target, epochs, target_len, method = 'recursive',
tfr = 0.5, lr = 0.01, dynamic_tf = False
):
losses = np.full(epochs, np.nan)
optimizer = optim.Adam(self.parameters(), lr = lr)
criterion = nn.MSELoss()
for e in range(epochs):
predicted = torch.zeros(target_len, train.shape[1], train.shape[2])
optimizer.zero_grad()
_, enc_h = self.encoder(train, washout)
dec_in = train[-1, :, :]
dec_h = enc_h
if method == 'recursive':
for t in range(target_len):
dec_out, dec_h = self.decoder(dec_in, dec_h)
predicted[t] = dec_out
dec_in = dec_out
if method == 'teacher_forcing':
# use teacher forcing
if random.random() < tfr:
for t in range(target_len):
dec_out, dec_h = self.decoder(dec_in, dec_h)
predicted[t] = dec_out
dec_in = target[t, :, :]
# predict recursively
else:
for t in range(target_len):
dec_out, dec_h = self.decoder(dec_in, dec_h)
predicted[t] = dec_out
dec_in = dec_out
if method == 'mixed_teacher_forcing':
# predict using mixed teacher forcing
for t in range(target_len):
dec_out, dec_h = self.decoder(dec_in, dec_h)
predicted[t] = dec_out
# predict with teacher forcing
if random.random() < tfr:
dec_in = target[t, :, :]
# predict recursively
else:
dec_in = dec_out
loss = criterion(predicted, target)
loss.backward()
optimizer.step()
losses[e] = loss.item()
if e % 10 == 0:
print(f'Epoch {e}/{epochs}: {round(loss.item(), 4)}')
# dynamic teacher forcing
if dynamic_tf and tfr > 0:
tfr = tfr - 0.02
return losses
def predict(self, x, target_len):
y = torch.zeros(target_len, x.shape[1], x.shape[2])
_, enc_h = self.encoder(x)
dec_in = x[-1, :, :]
dec_h = enc_h
for t in range(target_len):
dec_out, dec_h = self.decoder(dec_in, dec_h)
y[t] = dec_out
dec_in = dec_out
return y
`
You should pass washout as argument when you call the ESN forward method.
Thanks again for the comment. Here I changed the previous code. ` import numpy as np import random import torch import torch.nn as nn from torch import optim
class Encoder(nn.Module):
def __init__(self, input_size, hidden_size, num_layers = 5):
super(Encoder, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.ESN = ESN(input_size = input_size, output_size = input_size, hidden_size = hidden_size, num_layers = num_layers)
def forward(self, x, washout = [100]):
flat = x.view(x.shape[0], x.shape[1], self.input_size)
out, h = self.ESN(flat, washout)
return out, h
class Decoder(nn.Module):
def __init__(self, input_size, hidden_size, output_size = 1, num_layers = 5):
super(Decoder, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.output_size = output_size
self.ESN = ESN(input_size = input_size, hidden_size = hidden_size,output_size=output_size, num_layers = num_layers)
self.linear = nn.Linear(hidden_size, output_size)
def forward(self, x, h, washout = [100]):
out, h = self.ESN(x.unsqueeze(0), h, washout)
y = self.linear(out.squeeze(0))
return y, h
class EncoderDecoder(nn.Module):
def __init__(self, hidden_size, input_size = 1, output_size = 1):
super(EncoderDecoder, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.encoder = Encoder(input_size = input_size, hidden_size = hidden_size)
self.decoder = Decoder(input_size = input_size, hidden_size = hidden_size, output_size = output_size)
def train_model(
self, train, target, epochs, target_len, method = 'recursive',
tfr = 0.5, lr = 0.01, dynamic_tf = False
):
losses = np.full(epochs, np.nan)
optimizer = optim.Adam(self.parameters(), lr = lr)
criterion = nn.MSELoss()
for e in range(epochs):
predicted = torch.zeros(target_len, train.shape[1], train.shape[2])
optimizer.zero_grad()
_, enc_h = self.encoder(train)
dec_in = train[-1, :, :]
dec_h = enc_h
if method == 'recursive':
for t in range(target_len):
dec_out, dec_h = self.decoder(dec_in, dec_h)
predicted[t] = dec_out
dec_in = dec_out
if method == 'teacher_forcing':
# use teacher forcing
if random.random() < tfr:
for t in range(target_len):
dec_out, dec_h = self.decoder(dec_in, dec_h)
predicted[t] = dec_out
dec_in = target[t, :, :]
# predict recursively
else:
for t in range(target_len):
dec_out, dec_h = self.decoder(dec_in, dec_h)
predicted[t] = dec_out
dec_in = dec_out
if method == 'mixed_teacher_forcing':
# predict using mixed teacher forcing
for t in range(target_len):
dec_out, dec_h = self.decoder(dec_in, dec_h)
predicted[t] = dec_out
# predict with teacher forcing
if random.random() < tfr:
dec_in = target[t, :, :]
# predict recursively
else:
dec_in = dec_out
loss = criterion(predicted, target)
loss.backward()
optimizer.step()
losses[e] = loss.item()
if e % 10 == 0:
print(f'Epoch {e}/{epochs}: {round(loss.item(), 4)}')
# dynamic teacher forcing
if dynamic_tf and tfr > 0:
tfr = tfr - 0.02
return losses
def predict(self, x, target_len):
y = torch.zeros(target_len, x.shape[1], x.shape[2])
_, enc_h = self.encoder(x)
dec_in = x[-1, :, :]
dec_h = enc_h
for t in range(target_len):
dec_out, dec_h = self.decoder(dec_in, dec_h)
y[t] = dec_out
dec_in = dec_out
return y
But when I tried to train the model using:
model = EncoderDecoder(hidden_size = hidden_size) model.train() model.train_model(x_train, y_train, epochs, ts_target_len, method = 'mixed_teacher_forcing', tfr = .05, lr = .005)
I encountered error like this :
45 for b in range(tensor.size(1)):
---> 46 if washout[b] > 0:
47 tmp = tensor[washout[b]:seq_lengths[b], b].clone()
48 tensor[:seq_lengths[b] - washout[b], b] = tmp
Have you got any idea why the error occurred ?
I have some update on this issue. I have succeeded to use PyTorch-esn on encoder decoder for time series prediction. But the problem is that the accuracy is too low and I wonder what the reason behind this as I have done echo state network using Tensorflow but the accuracy is not bad as using pytorch. Here is my code :
`import numpy as np import random import torch import torch.nn as nn from torch import optim
class Encoder(nn.Module):
#def __init__(self, input_size, hidden_size, num_layers = 1):
def __init__(self, input_size, hidden_size, num_layers=2,
nonlinearity='tanh', batch_first=False, leaking_rate=1,
spectral_radius=0.9, w_ih_scale=1, density=1,):
super(Encoder, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
if nonlinearity == 'tanh':
mode = 'RES_TANH'
elif nonlinearity == 'relu':
mode = 'RES_RELU'
elif nonlinearity == 'id':
mode = 'RES_ID'
else:
raise ValueError("Unknown nonlinearity '{}'".format(nonlinearity))
self.batch_first = batch_first
self.leaking_rate = leaking_rate
self.spectral_radius = spectral_radius
if type(w_ih_scale) != torch.Tensor:
self.w_ih_scale = torch.ones(input_size + 1)
self.w_ih_scale *= w_ih_scale
else:
self.w_ih_scale = w_ih_scale
self.density = density
self.lstm = Reservoir(mode, input_size, hidden_size, num_layers,
leaking_rate, spectral_radius,
self.w_ih_scale, density,
batch_first=batch_first)
#self.lstm = nn.LSTM(input_size = input_size, hidden_size = hidden_size, num_layers = num_layers)
#self.lstm = ESN(input_size = input_size, hidden_size = hidden_size, num_layers = num_layers, output_size= input_size, readout_training='gd')
def forward(self, x):
#washout_rate = 0.05
#washout_list = [int(washout_rate * x.size(0))] * x.size(1)
#flat = x.view(x.shape[0], x.shape[1], self.input_size)
#out, h = self.lstm(flat, washout_list)
flat = x.view(x.shape[0], x.shape[1], self.input_size)
output, hidden = self.lstm(flat)
return output, hidden
class Decoder(nn.Module):
def __init__(self, input_size, hidden_size, output_size = 1, num_layers = 2):
super(Decoder, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.output_size = output_size
#self.lstm = nn.LSTM(input_size = input_size, hidden_size = hidden_size, num_layers = num_layers)
self.lstm = ESN(input_size = input_size, hidden_size = hidden_size, num_layers = num_layers, output_size= output_size)
#self.linear = nn.Linear(input_size, output_size)
def forward(self, x, h):
washout_rate = 0.1
washout_list = [int(washout_rate * x.unsqueeze(0).size(0))] * x.unsqueeze(0).size(1)
out, h = self.lstm(x.unsqueeze(0), washout_list, h)
#out_new = out.view(out.size(0), -1)
#y = self.linear(out.squeeze(0))
#y = self.linear(out_new)
#y = self.linear(out)
return out.squeeze(0), h
class EncoderDecoder(nn.Module):
def __init__(self, hidden_size, input_size = 1, output_size = 1):
super(EncoderDecoder, self).__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.encoder = Encoder(input_size = input_size, hidden_size = hidden_size)
self.decoder = Decoder(input_size = input_size, hidden_size = hidden_size, output_size = output_size)
def train_model(
self, train, target, epochs, target_len, method = 'recursive',
tfr = 0.5, lr = 0.01, dynamic_tf = False
):
losses = np.full(epochs, np.nan)
optimizer = optim.Adam(self.parameters(), lr = lr)
criterion = nn.MSELoss()
for e in range(epochs):
predicted = torch.zeros(target_len, train.shape[1], train.shape[2])
optimizer.zero_grad()
_, enc_h = self.encoder(train)
dec_in = train[-1, :, :]
dec_h = enc_h
if method == 'recursive':
for t in range(target_len):
dec_out, dec_h = self.decoder(dec_in, dec_h)
predicted[t] = dec_out
dec_in = dec_out
if method == 'teacher_forcing':
# use teacher forcing
if random.random() < tfr:
for t in range(target_len):
dec_out, dec_h = self.decoder(dec_in, dec_h)
predicted[t] = dec_out
dec_in = target[t, :, :]
# predict recursively
else:
for t in range(target_len):
dec_out, dec_h = self.decoder(dec_in, dec_h)
predicted[t] = dec_out
dec_in = dec_out
if method == 'mixed_teacher_forcing':
# predict using mixed teacher forcing
for t in range(target_len):
dec_out, dec_h = self.decoder(dec_in, dec_h)
predicted[t] = dec_out
# predict with teacher forcing
if random.random() < tfr:
dec_in = target[t, :, :]
# predict recursively
else:
dec_in = dec_out
loss = criterion(predicted, target)
loss.backward()
optimizer.step()
losses[e] = loss.item()
if e % 10 == 0:
print(f'Epoch {e}/{epochs}: {round(loss.item(), 4)}')
# dynamic teacher forcing
if dynamic_tf and tfr > 0:
tfr = tfr - 0.02
return losses
def forward(self, x, target_len):
y = torch.zeros(target_len, x.shape[1], x.shape[2])
_, enc_h = self.encoder(x)
dec_in = x[-1, :, :]
dec_h = enc_h
for t in range(target_len):
dec_out, dec_h = self.decoder(dec_in, dec_h)
y[t] = dec_out
dec_in = dec_out
return y`
I attached the whole code of encode decoder for time series forecasting : https://colab.research.google.com/drive/1RL1L1b-5Fi7P9p-EOpzQ-w7mZRPK4nKm?usp=sharing
How this can be implemented towards sequence to sequence time series using ESN of more than 1 layer (Deep ESN)?