BIG_PY
home
Introduce
home

7장 시계열 분석 3, 4

시계열 분석

7.3 순환 신경망(RNN)

순환 신경망(RNN)
RNN은 시간적으로 연속성이 있는 데이터를 처리하려고 고안된 인공 신경망
RNN의 ‘Recurrent(반복되는)’는 이전 은닉층이 현재 은닉층의 입력이 되면서 반복되는 순환구조를 가진다는 의미임
RNN이 기존 네트워크와 다른 점은 ‘기억’을 갖는다는 것
기억은 현재까지 입력 데이터를 요약한 정보이며 새로운 입력이 네트워크로 들어올 때마다 기억은 조금씩 수정되며, 결국 최종적으로 남겨진 기억은 모든 입력 전체를 요약한 정보가 됨
→ 첫 번째 입력(x1)이 들어오면 첫 번째 기억(h1)이 만들어지고, 두 번째 입력(x2)이 들어오면 기존 기억(h1)과 새로운 입력을 참고하여 새 기억(h2)을 만듦
즉, RNN은 외부 입력과 자신의 이전 상태를 입력 받아 현재 상태를 갱신함
RNN 유형
1.
일대일 : 순환이 없기 때문에 RNN이라고 말하기 어려우며, 순방향 네트워크가 대표적 사례
2.
일대다 : 입력이 하나이고, 출력이 다수인 구조로 이미지를 입력해서 이미지에 대한 설명을 문장으로 출력하는 이미지 캡션이 대표적 사례
3.
다대일 : 입력이 다수이고 출력이 하나인 구조로 문장을 입력해서 긍정/부정을 출력하는 감성 분석기에서 사용됨
다대일 구조에 층을 쌓아 올리면 다음과 같은 적층된 구조를 가질 수 있음
4.
다대다 : 입력과 출력이 다수인 구조로, 언어를 번역하는 자동 번역기 등이 대표적 사례
5.
동기화 다대다 : 입력과 출력이 다수인 구조로 문장에서 다음에 나올 단어를 예측하는 언어 모델, 즉 프레임 수준의 비디오 분류가 대표적 사례
RNN 계층과 셀
RNN은 내장된 계층뿐만 아니라 셀 레벨의 API(운영체제가 제공하는 함수의 집합체)도 제공함
RNN 계층이 입력된 배치 순서대로 모두 처리하는 것과 다르게 RNN 셀은 오직 하나의 단계만 처리함
RNN 셀은 RNN 계층의 for loop 구문을 갖는 구조라고 할 수 있음
RNN 계층은 셀을 래핑하여 동일한 셀을 여러 단계에 적용함
셀은 실제 계산에 사용되는 RNN 계층의 구성 요소로, 단일 입력과 과거 상태를 가져와서 새로운 상태를 생성함
셀 유형
-nn.RNNCell : SimpleRNN 층에 대응되는 RNN 셀
-nn.GRUCell : GRU 계층에 대응되는 GRU 셀
-nn.LSTMCell : LSTM 계층에 대응되는 LSTM 셀
RNN 계층과 셀을 분리해서 설명하는 이유?
→ 둘을 분리해야 구현이 가능하기 때문
RNN의 활용 분야는 대표적으로 ‘자연어 처리’가 있음
연속적인 단어들의 나열인 자연어 처리는 음성 인식, 단어의 의미 판단 및 대화 등에 대한 처리가 가능함
이외에도 손글씨, 센서 데이터 등 시계열 데이터 처리에 활용됨

7.4 RNN 구조

RNN은 은닉층 노드들이 연결되어 이전 단계 정보를 은닉층 노드에 저장할 수 있도록 구성한 신경망
xt-1에서 ht-1을 얻고 다음 단계에서 ht-1과 xt를 사용하여 과거 정보와 현재 정보를 모두 반영함
또한, ht와 xt+1의 정보를 이용하여 과거와 현재 정보를 반복해서 반영하는데, 이러한 구조를 요약한 것이 아래의 그림과 같음
RNN에서는 입력층, 은닉층, 출력층 외에 가중치 세 개를 가짐
RNN의 가중치는 Wxh, Whh, Why로 분류됨
Wxh는 입력층에서 은닉층으로 전달되는 가중치
Whh는 t 시점의 은닉층에서 t+1 시점의 은닉층으로 전달되는 가중치
Why는 은닉층에서 출력층으로 전달되는 가중치
가중치 Wxh, Whh, Why는 모든 시점에 동일하다는 것에 주의할 필요가 있음
t 단계에서의 RNN 계산
1.
은닉층 계산을 위해 xt, ht-1이 필요함
RNN에서 은닉층은 일반적으로 하이퍼볼릭 탄젠트 활성화 함수를 사용함
2.
출력층은 심층 신경망과 계산 방법이 동일함
즉, 소프트맥스 함수를 적용함
3.
RNN의 오차는 심층 신경망에서 전방향 학습과 달리 각 단계(t)마다 오차를 측정함
즉, 단계마다 실제 값과 예측 값으로 오차(평균 제곱 오차)를 이용하여 측정함
4.
RNN에서 역전파는 BPTT를 이용하여 모든 단계마다 처음부터 끝까지 역전파함
오차는 각 단계(t)마다 오차를 측정하고 이전 단계로 전달되는데, 이것을 BPTT(오류 역전파)라고 함
오차를 이용하여 Wxh, Whh, Why 및 바이어스를 업데이트하는데 BPTT는 오차가 멀리 전파될 때 계산량이 많아지고 전파되는 양이 점차 적어지는 문제점(기울기 소멸 문제)이 발생함
기울기 소멸 문제를 보완하기 위해 오차를 몇 단계까지만 전파시키는 생략된-BPTT를 사용할 수도 있고, 근본적으로 LSTM 및 GRU를 많이 사용함
IMDB 데이터셋을 사용하여 RNN 계층과 셀 구현
RNN 셀 구현
# 라이브러리 호출 import torch import torchtext import numpy as np import torch.nn as nn import torch.nn.functional as F import time
Python
복사
# 데이터 전처리 start=time.time() TEXT = torchtext.legacy.data.Field(lower=True, fix_length=200, batch_first=False) LABEL = torchtext.legacy.data.Field(sequential=False) # 레이블에 대한 전처리
Python
복사
# 데이터셋 준비 from torchtext.legacy import datasets train_data, test_data = datasets.IMDB.splits(TEXT, LABEL)
Python
복사
데이터셋을 분리하면 훈련 데이터셋 2만 5000개, 테스트 데이터셋이 2만 5000개가 됨
# 훈련 데이터셋 내용 확인 print(vars(train_data.examples[0])) # 데이터셋의 내용을 보고자 할 때는 examples 사용
Python
복사
# 데이터셋 전처리 적용 import string for example in train_data.examples: text = [x.lower() for x in vars(example)['text']] # 소문자로 변경 text = [x.replace("<br","") for x in text] # "<br"을 ""(공백)으로 변경 text = [''.join(c for c in s if c not in string.punctuation) for s in text] # 구두점 제거 text = [s for s in text if s] # 공백 제거 vars(example)['text'] = text for example in test_data.examples: text = [x.lower() for x in vars(example)['text']] text = [x.replace("<br","") for x in text] text = [''.join(c for c in s if c not in string.punctuation) for s in text] text = [s for s in text if s] vars(example)['text'] = text
Python
복사
# 훈련과 검증 데이터셋 분리 import random train_data, valid_data = train_data.split(random_state = random.seed(0), split_ratio=0.8)
Python
복사
# 데이터셋 개수 확인 print(f'Number of training examples: {len(train_data)}') print(f'Number of validation examples: {len(valid_data)}') print(f'Number of testing examples: {len(test_data)}') Number of training examples: 20000 Number of validation examples: 5000 Number of testing examples: 25000
Python
복사
훈련 데이터셋 2만 5000개를 훈련과 검증 용도로 분리했기 때문에 훈련 데이터셋은 2만 개, 검증 데이터셋은 5000개, 테스트 데이터셋은 2만 5000개가 됨
단어 집합은 IMDB 데이터셋에 포함된 단어들을 이용하여 하나의 딕셔너리와 같은 집합을 만드는 것
단어들의 중복은 제거된 상태에서 진행함
# 단어 집합 만들기 TEXT.build_vocab(train_data, max_size=10000, min_freq=10, vectors=None) LABEL.build_vocab(train_data) print(f"Unique tokens in TEXT vocabulary: {len(TEXT.vocab)}") print(f"Unique tokens in LABEL vocabulary: {len(LABEL.vocab)}") Unique tokens in TEXT vocabulary: 10002 Unique tokens in LABEL vocabulary: 3
Python
복사
TEXT는 10002개, LABEL은 세 개의 단어로 구성됨
LABEL은 긍정과 부정 두 개의 값만 있어야 할 것 같은 세 개가 있다고 출력됨
→ 현재 단어 집합의 단어와 그것에 부여된 고유 정수(인덱스)를 확인
# 테스트 데이터셋의 단어 집합 확인 print(LABEL.vocab.stoi)
Python
복사
확인 결과 긍정, 부정 외에 <unk>가 있음
<unk>는 사전에 없는 단어를 의미함 따라서 예제에서 사용하는 것은 pos, neg가 될 것임
# 데이터셋 메모리로 가져오기 BATCH_SIZE = 64 device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') embeding_dim = 100 # 각 단어를 100차원으로 조정(임베딩 계층을 통과한 후 각 벡터의 크기) hidden_size = 300 # 은닉층의 유닛 개수를 300개로 지정함 train_iterator, valid_iterator, test_iterator = torchtext.legacy.data.BucketIterator.splits( (train_data, valid_data, test_data), batch_size = BATCH_SIZE, device = device) # BucketIterator는 데이터로와 쓰임새가 같고 # 배치 크기 단위로 값을 차례대로 꺼내어 메모리로 가져오고 싶을 때 사용함
Python
복사
임베딩 처리를 진행함
# 워드 임베딩 처리 및 RNN 셀 정의 class RNNCell_Encoder(nn.Module): def __init__(self, input_dim, hidden_size): # RNN 셀 구현을 위한 구문 super(RNNCell_Encoder, self).__init__() self.rnn = nn.RNNCell(input_dim, hidden_size) # inputs는 입력 시퀀스로(시퀀스 길이, 배치, 임베딩)의 형태를 가짐 def forward(self, inputs): bz = inputs.shape[1] # 배치를 가져옴 ht = torch.zeros((bz, hidden_size)).to(device) # 배치와 은닉층 뉴런의 크기를 0으로 초기화 for word in inputs: ht = self.rnn(word, ht) # 재귀적으로 발생하는 상태 값을 처리하기 위한 구문 return ht class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.em = nn.Embedding(len(TEXT.vocab.stoi), embeding_dim)# 임베팅 처리를 위한 구문 self.rnn = RNNCell_Encoder(embeding_dim, hidden_size) self.fc1 = nn.Linear(hidden_size, 256) self.fc2 = nn.Linear(256, 3) def forward(self, x): x = self.em(x) x = self.rnn(x) x = F.relu(self.fc1(x)) x = self.fc2(x) return x
Python
복사
옵티마이저와 손실 함수 정의
# 옵티마이저와 손실 함수 정의 model = Net() # model이라는 이름으로 모델을 객체화 model.to(device) loss_fn = nn.CrossEntropyLoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
Python
복사
모델 학습을 위한 함수 생성
# 모델 학습을 위한 함수 정의 def training(epoch, model, trainloader, validloader): correct = 0 total = 0 running_loss = 0 model.train() for b in trainloader: x, y = b.text, b.label # trainloader에서 text와 label을 꺼내 옴 x, y = x.to(device), y.to(device) # CPU 사용할 수 있도록 장치 지정 y_pred = model(x) loss = loss_fn(y_pred, y) # CrossEntropyLoss 손실 함수를 이용하여 오차 계산 optimizer.zero_grad() loss.backward() optimizer.step() with torch.no_grad(): y_pred = torch.argmax(y_pred, dim=1) correct += (y_pred == y).sum().item() total += y.size(0) running_loss += loss.item() # 누적된 오차를 전체 데이터셋으로 나누어서 에포크 단계마다 오차를 구함 epoch_loss = running_loss / len(trainloader.dataset) epoch_acc = correct / total valid_correct = 0 valid_total = 0 valid_running_loss = 0 model.eval() with torch.no_grad(): for b in validloader: x, y = b.text, b.label x, y = x.to(device), y.to(device) y_pred = model(x) loss = loss_fn(y_pred, y) y_pred = torch.argmax(y_pred, dim=1) valid_correct += (y_pred == y).sum().item() valid_total += y.size(0) valid_running_loss += loss.item() epoch_valid_loss = valid_running_loss / len(validloader.dataset) epoch_valid_acc = valid_correct / valid_total print('epoch: ', epoch, 'loss: ', round(epoch_loss, 3), 'accuracy:', round(epoch_acc, 3), 'valid_loss: ', round(epoch_valid_loss, 3), 'valid_accuracy:', round(epoch_valid_acc, 3) ) # 훈련이 진행될 때 에포크마다 정확도와 오차를 출력 return epoch_loss, epoch_acc, epoch_valid_loss, epoch_valid_acc
Python
복사
zero_grad()의 의미가 뭔지, 왜 사용하는지 알아보자
보통 딥러닝에서는 미니 배치+루프 조합을 사용해서 parameter들을 업데이트하는데,
한 루프에서 업데이트를 위해 loss.backward()를 호출하면 각 파라미터들의 .grad 값에 변화도가 저장이 된다.
이후 다음 루프에서 zero_grad()를 하지 않고 역전파를 시키면 이전 루프에서 .grad에 저장된 값이 다음 루프의 업데이트에도 간섭을 해서 원하는 방향으로 학습이 안된다고 한다.
따라서 루프가 한번 돌고 나서 역전파를 하기 전에 반드시 zero_grad()로 .grad 값들을 0으로 초기화시킨 후 학습을 진행해야 한다.
훈련과 검증 데이터셋을 이용한 모델 학습 진행함
# 모델 학습 epochs = 5 train_loss = [] train_acc = [] valid_loss = [] valid_acc = [] for epoch in range(epochs): epoch_loss, epoch_acc, epoch_valid_loss, epoch_valid_acc = training(epoch, model, train_iterator, valid_iterator) train_loss.append(epoch_loss) # 훈련 데이터셋을 모델에 적용했을 때의 오차 train_acc.append(epoch_acc) # 훈련 데이터셋을 모델에 적용했을 때의 정확도 valid_loss.append(epoch_valid_loss) # 검증 데이터셋을 모델에 적용했을 때의 오차 valid_acc.append(epoch_valid_acc) # 검증 데이터셋을 모델에 적용했을 때의 정확도 end = time.time() print(end-start) epoch: 0 loss: 0.011 accuracy: 0.495 valid_loss: 0.011 valid_accuracy: 0.507 epoch: 1 loss: 0.011 accuracy: 0.503 valid_loss: 0.011 valid_accuracy: 0.495 epoch: 2 loss: 0.011 accuracy: 0.508 valid_loss: 0.011 valid_accuracy: 0.494 epoch: 3 loss: 0.011 accuracy: 0.513 valid_loss: 0.011 valid_accuracy: 0.499 epoch: 4 loss: 0.011 accuracy: 0.521 valid_loss: 0.011 valid_accuracy: 0.5 212.57630896568298
Python
복사
에포크를 5로 지정해서 정확도가 낮지만 학습과 검증 데이터셋에 대한 오차가 유사하므로 과적합을 발생하지 않고 있음
테스트 데이터셋에 대해서는 어떤 결과가 나타날지 알아보자
# 모델 예측 함수 정의 def evaluate(epoch, model, testloader): test_correct = 0 test_total = 0 test_running_loss = 0 model.eval() with torch.no_grad(): for b in testloader: x, y = b.text, b.label x, y = x.to(device), y.to(device) y_pred = model(x) loss = loss_fn(y_pred, y) y_pred = torch.argmax(y_pred, dim=1) test_correct += (y_pred == y).sum().item() test_total += y.size(0) test_running_loss += loss.item() epoch_test_loss = test_running_loss / len(testloader.dataset) epoch_test_acc = test_correct / test_total print('epoch: ', epoch, 'test_loss: ', round(epoch_test_loss, 3), 'test_accuracy:', round(epoch_test_acc, 3) ) return epoch_test_loss, epoch_test_acc
Python
복사
# 모델 예측 결과 확인 epochs = 5 test_loss = [] test_acc = [] for epoch in range(epochs): epoch_test_loss, epoch_test_acc = evaluate(epoch, model, test_iterator) test_loss.append(epoch_test_loss) test_acc.append(epoch_test_acc) end = time.time() print(end-start) epoch: 0 test_loss: 0.011 test_accuracy: 0.5 epoch: 1 test_loss: 0.011 test_accuracy: 0.5 epoch: 2 test_loss: 0.011 test_accuracy: 0.5 epoch: 3 test_loss: 0.011 test_accuracy: 0.5 epoch: 4 test_loss: 0.011 test_accuracy: 0.5 353.44589710235596
Python
복사
테스트 데이터셋도 검증 데이터셋의 평가 결과와 크게 다르지 않음
RNN 계층 구현
# 라이브러리 호출 import torch import torchtext import numpy as np import torch.nn as nn import torch.nn.functional as F import time
Python
복사
# 데이터셋 내려받기 및 전처리 start=time.time() TEXT = torchtext.legacy.data.Field(sequential = True, batch_first = True, lower = True) LABEL = torchtext.legacy.data.Field(sequential = False, batch_first = True) from torchtext.legacy import datasets train_data, test_data = datasets.IMDB.splits(TEXT, LABEL) # IMDB 사용 train_data, valid_data = train_data.split(split_ratio = 0.8) TEXT.build_vocab(train_data, max_size=10000, min_freq=10, vectors=None) LABEL.build_vocab(train_data) BATCH_SIZE = 100 device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
Python
복사
BucketIterator()를 이용하여 훈련, 검증, 테스트 데이터셋으로 분리
# 데이터셋 분리 train_iterator, valid_iterator, test_iterator = torchtext.legacy.data.BucketIterator.splits( (train_data, valid_data, test_data), batch_size = BATCH_SIZE, device = device)
Python
복사
# 변수 값 지정 vocab_size = len(TEXT.vocab) # 텍스트 길이 지정 n_classes = 2 # pos(긍정), neg(부정) 지정
Python
복사
# RNN 계층 네트워크 class BasicRNN(nn.Module): def __init__(self, n_layers, hidden_dim, n_vocab, embed_dim, n_classes, dropout_p = 0.2): super(BasicRNN, self).__init__() self.n_layers = n_layers # RNN 계층에 대한 개수 self.embed = nn.Embedding(n_vocab, embed_dim) # 워드 임베딩 적용 self.hidden_dim = hidden_dim self.dropout = nn.Dropout(dropout_p) # 드롭아웃 적용 # RNN 계층에 대한 구문 self.rnn = nn.RNN(embed_dim, self.hidden_dim, num_layers = self.n_layers, batch_first = True) self.out = nn.Linear(self.hidden_dim, n_classes) def forward(self, x): x = self.embed(x) # 문자를 숫자/벡터로 변환 h_0 = self._init_state(batch_size = x.size(0)) # 최초 은닉 상태의 값을 0으로 초기화 x, _ = self.rnn(x, h_0) # RNN 계층을 의미하며, 파라미터로 입력과 이전 은닉 상태의 값을 받음 h_t = x[:, -1, :] # 모든 네트워크를 거쳐서 가장 마지막에 나온 단어의 임베딩 값 self.dropout(h_t) logit = torch.sigmoid(self.out(h_t)) return logit def _init_state(self, batch_size = 1): weight = next(self.parameters()).data # 모델의 파라미터 값을 가져와서 weigh 변수에 저장 return weight.new(self.n_layers, batch_size, self.hidden_dim).zero_() # 크기가 (계층의 개수, 배치 크기, 은닉층의 뉴런/유닛 개수)인 은닉 상태를 생성하여 0으로 초기화한 후 반환
Python
복사
모델에서 사용할 손실 함수와 옵티마이저 설정
# 손실 함수와 옵티마이저 설정 model = BasicRNN(n_layers = 1, hidden_dim = 256, n_vocab = vocab_size, embed_dim = 128, n_classes = n_classes, dropout_p = 0.5) model.to(device) loss_fn = nn.CrossEntropyLoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
Python
복사
훈련 데이터셋을 이용하여 모델을 학습시키기 위한 함수
# 모델 학습 함수 def train(model, optimizer, train_iter): model.train() for b, batch in enumerate(train_iter): x, y = batch.text.to(device), batch.label.to(device) y.data.sub_(1) # 뺄셈에 대한 함수로 _은 inplace 연산을 의미함 # IMDB 레이블의 경우 긍정은 2, 부정은 1의 값을 가짐 optimizer.zero_grad() logit = model(x) loss = F.cross_entropy(logit, y) loss.backward() optimizer.step() if b % 50 == 0: # 훈련 데이터셋의 개수를 50으로 나누어서 나머지가 0이면 출력 print("Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(e, b * len(x), len(train_iter.dataset), 100. * b / len(train_iter), loss.item()))
Python
복사
검증과 테스트 데이터셋을 모델에 적용하여 결과를 예측하기 위한 함수
# 모델 평가 함수 def evaluate(model, val_iter): model.eval() corrects, total, total_loss = 0, 0, 0 for batch in val_iter: x, y = batch.text.to(device), batch.label.to(device) y.data.sub_(1) logit = model(x) loss = F.cross_entropy(logit, y, reduction = "sum") total += y.size(0) total_loss += loss.item() corrects += (logit.max(1)[1].view(y.size()).data == y.data).sum() # 모델의 정확도 avg_loss = total_loss / len(val_iter.dataset) avg_accuracy = corrects / total return avg_loss, avg_accuracy
Python
복사
훈련 데이터셋을 이용하여 모델을 학습시키고 검증 데이터셋을 이용하여 성능(정확도)를 확인함
# 모델 학습 및 평가 BATCH_SIZE = 100 LR = 0.001 EPOCHS = 5 for e in range(1, EPOCHS + 1): train(model, optimizer, train_iterator) val_loss, val_accuracy = evaluate(model, valid_iterator) print("[EPOCH: %d], Validation Loss: %5.2f | Validation Accuracy: %5.2f" % (e, val_loss, val_accuracy))
Python
복사
→ 검증 데이터셋을 모델에 적용한 결과 50%의 정확도를 보이고 있음
높은 예측력이라고 볼 수 없음
테스트 데이터셋을 모델에 적용하여 성능 확인
# 테스트 데이터셋을 이용한 모델 예측 test_loss, test_acc = evaluate(model,test_iterator) print("Test Loss: %5.2f | Test Accuracy: %5.2f" % (test_loss, test_acc)) Test Loss: 0.69 | Test Accuracy: 0.54
Python
복사
→ 테스트 데이터셋에 대해 56%의 정확도를 보이고 있음
여전히 모델 성능이 좋다고 할 수 없고 이런 경우 다른 모델로 변경하여 테스트를 진행해 보아야 함