Open njs03332 opened 1 year ago
# 다섯개의 은닉층과 출력층으로 구성된 회귀용 MLP 모델
# 맨 위의 은닉층에 보조 출력을 가지고, 이 보조 출력에 연결된 손실을 이하 '재구성 손실'이라 함
class ReconstructingRegressor(tf.keras.Model):
def __init__(self, output_dim, **kwargs):
super().__init__(**kwargs)
# 생성자가 다섯 개의 은닉층과 하나의 출력층으로 구성된 심층 신경망 생성
self.hidden = [tf.keras.layers.Dense(30, activation='selu',
kernel_initializer='lecun_normal')
for _ in range(5)]
self.out = tf.keras.layers.Dense(output_dim)
# 완전 연결 층을 하나 더 추가하여 모델의 입력을 재구성하는 데 사용
# 완전 연결 층의 유닛 개수 = 입력 개수
def build(self, batch_input_shape):
n_inputs = batch_input_shape[-1]
self.reconstruct = tf.keras.layers.Dense(n_inputs)
# 현재는 이슈 .. https://github.com/tensorflow/tensorflow/issues/46858 로 인해 코드 돌릴 때 주석 처리 필요
# super().build(batch_input_shape)
def call(self, inputs):
Z = inputs
# 1. 입력이 다섯 개의 은닉층에 모두 통과
for layer in self.hidden:
Z = layer(Z)
# 2. 그 다음 결괏값을 재구성 층에 전달하여 재구성을 만들기
reconstruction = self.reconstruct(Z)
# 3. 재구성 손실(재구성과 입력 사이의 평균 제곱 오차)을 계산
recon_loss = tf.reduce_mean(tf.square(reconstruction - inputs))
# 4. add_loss() 메서드를 사용해 모델의 손실 리스트에 추가
self.add_loss(0.05 * recon_loss)
# 5. 마지막에서 은닉층의 출력을 출력층에 전달하여 얻은 출력값을 반환
return self.out(Z)
class ReconstructingRegressor(keras.Model):
def __init__(self, output_dim, **kwargs):
super().__init__(**kwargs)
...
self.reconstruction_mean = keras.metrics.Mean(name="reconstruction_error")
def build(self, batch_input_shape):
...
def call(self, inputs, training=None):
Z = inputs
for layer in self.hidden:
Z = layer(Z)
reconstruction = self.reconstruct(Z)
self.recon_loss = 0.05 * tf.reduce_mean(tf.square(reconstruction - inputs))
if training:
result = self.reconstruction_mean(recon_loss)
self.add_metric(result)
return self.out(Z)
# 모델 동작 재정의
def train_step(self, data):
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x)
loss = self.compiled_loss(y, y_pred, regularization_losses=[self.recon_loss])
gradients = tape.gradient(loss, self.trainable_variables)
self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
return {m.name: m.result() for m in self.metrics}
# 1. 간단한 모델 작성
l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
keras.layers.Dense(30, activation="elu", kernel_initializer="he_normal",
kernel_regularizer=l2_reg),
keras.layers.Dense(1, kernel_regularizer=l2_reg)
])
# 2. 훈련 세트에서 샘플 배치를 랜덤하게 추출하는 함수
def random_batch(X, y, batch_size=32):
idx = np.random.randint(len(X), size=batch_size)
return X[idx], y[idx]
# 3. 현재 스텝 수, 전체 스텝 횟수, 에포크 시작부터 평균 손실 등 훈련 상태 출력하는 함수
def print_status_bar(iteration, total, loss, metrics=None):
metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result())
for m in [loss] + (metrics or [])])
end = "" if iteration < total else "\n"
print("\r{}/{} - ".format(iteration, total) + metrics, end=end)
# 4. prograss bar & print
def progress_bar(iteration, total, size=30):
running = iteration < total
c = ">" if running else "="
p = (size - 1) * iteration // total
fmt = "{{:-{}d}}/{{}} [{{}}]".format(len(str(total)))
params = [iteration, total, "=" * p + c + "." * (size - p - 1)]
return fmt.format(*params)
def print_status_bar(iteration, total, loss, metrics=None, size=30):
metrics = " - ".join(["{}: {:.4f}".format(m.name, m.result())
for m in [loss] + (metrics or [])])
end = "" if iteration < total else "\n"
print("\r{} - {}".format(progress_bar(iteration, total), metrics), end=end)
# 5. 하이퍼파라미터 정의
n_epochs = 5
batch_size = 32
n_steps = len(X_train) // batch_size
optimizer = keras.optimizers.Nadam(learning_rate=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]
# 6. 사용자 정의 훈련 반복을 위한 준비 완료
# - 두 개의 반복문 중첩 epochs, batch
# - 훈련 세트에서 배치는 랜덤하게 샘플링
# - tf.GradientTape() 블럭에서 배치 하나를 위한 예측을 만들고 손실을 계산
# - 평균 손실과 지표를 업데이트하고 상태 막대를 출력
# - 매 에포크 끝에서 상태 막대를 다시 출력하여 완료를 나타내고 줄바꿈
# - 마지막으로 평균 손실과 지표값을 초기화
for epoch in range(1, n_epochs + 1):
print("Epoch {}/{}".format(epoch, n_epochs))
for step in range(1, n_steps + 1):
X_batch, y_batch = random_batch(X_train_scaled, y_train)
with tf.GradientTape() as tape:
y_pred = model(X_batch)
main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
loss = tf.add_n([main_loss] + model.losses)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
# 모델에 가중치 제한 추가할 때
for variable in model.variables:
if variable.constraint is not None:
variable.assign(variable.constraint(variable))
mean_loss(loss)
for metric in metrics:
metric(y_batch, y_pred)
print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
for metric in [mean_loss] + metrics:
metric.reset_states()
# 간단한 함수 예
def f(w1, w2):
return 3 * w1**2 + 2 * w1 * w2
w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
z = f(w1, w2)
gradients = tape.gradient(z,[w1,w2])
```python
>>> gradients
[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
<tf.Tensor: shape=(), dtype=float32, numpy=10.0>]
tf.GradientTape()
블록 안에 최소한만 담아야 함gradient()
를 한번이라도 호출하는 순간 자동으로 테이프가 지워지기에 2번 이상 호출하면 에러가 발생
tf.GradientTape(persistent=True)
로 선언tf.Constant
를 이용한다면 None
이 반환됨
tape.watch()
를 써서 강제할 수 있음c1, c2 = tf.constant(5.), tf.constant(3.)
with tf.GradientTape() as tape:
tape.watch(c1)
tape.watch(c2)
z = f(c1, c2)
gradients = tape.gradient(z, [c1, c2])
jacobian()
메서드 호출tf.stop_gradient()
함수 사용
def f(w1, w2):
return 3 * w1 ** 2 + tf.stop_gradient(2 * w1 * w2)
with tf.GradientTape() as tape: z = f(w1, w2)
tape.gradient(z, [w1, w2])
- 그레이디언트 계산시 이따금 수치적인 이슈가 발생할 수 있음
- 예를 들어 큰 입력에 대해 `my_softplus()` 함수의 그레이디언트를 계산하면 NaN이 반환됨
- 부동소수점 정밀도 오류로 인해 자동 미분이 무한 나누기 무한을 계산하게 되기 때문
- 수치적으로 안전한 softplus의 도함수 1/(1+1/exp(x))를 해석적으로 구할 수 있음 -> custom gradient로 안전한 함수를 사용하도록 구현 가능
```python
@tf.custom_gradient
def my_better_softplus(z):
exp = tf.exp(z)
def my_softplus_gradients(grad):
# 연쇄 법칙에 따라 전달된 그레이디언트와 my_softplus() 함수의 그레이디언트를 곱함
return grad / (1 + 1 / exp)
return tf.math.log(exp + 1), my_softplus_gradients
assign roles -c 1 2 3 -s 0317
0 | 1 | 2 | |
---|---|---|---|
member | 한단비 | 김유리 | 주선미 |
chapter | 1 | 2 | 3 |