2024년 4월 8일 월요일

시계열 데이터 예측을 위한 트랜스포머 모델 개발하기

이 글은 시계열 트랜스포머 모델 개념 및 활용 방법을 간략히 정리한다.

트랜스포머 개념도

머리말
트랜스포머의 원리를 이해하고 있다면, 수치로 표현된 시계열 벡터도 라벨링된 시계열 벡터가 있다면, 이를 학습할 수 있다는 것을 이해할 것이다. 

이 글은 트랜스포머 모델을 이용해 시계열을 학습하는 방법을 실습한다. 트랜스포머의 세부적인 기술은 다음 링크 및 레퍼런스를 참고한다. 
개발 환경
이 글을 실습하기 위해서는 기본적으로 다음 개발환경이 준비되어야 한다. 
  • NVIDIA GPU driver
  • Ubuntu or Windows CUDA driver 
  • PyTorch
  • Pandas, matplotlib
본론
시계열 데이터에서 패턴을 학습하고, 다음 값을 예측하는 방법은 여러가지가 있다. 트랜스포머의 경우에도, 트랜스포머 분류 기법을 이용하는 방법, Temporal Fusion Transformer를 사용하는 방법 등이 있다.

여기에서는 기본 개념을 먼저 이해하기 위해, 트랜스포머 분류 기법을 이용해 학습한다. 학습될 데이터는 다음과 같다. 

이 데이터는 페이스북 주식 종가, 개장가 등을 다운로드받은 FB_raw.csv 엑셀파일을 그래프화한 것이다. 데이터셋 크기는 160,681레코드이다. 실제 데이터 구조는 다음과 같다. 

데이터를 미리 다운로드한다.
알고리즘 순서는 다음과 같다.
  1. 주가 시계열 데이터 로딩
  2. 학습 및 테스트 데이터 생성
  3. 벡터 임베딩 함수 정의
  4. 트랜스포머 모델 정의: 포지션 인코딩, 인코더, 주가 예측을 위한 linear full connection 정의. 타겟 마스크 정의
  5. 데이터 학습
  6. 테스트 데이터 예측 결과 확인 
코딩을 위해, 파이썬 파일을 생선한다. 그리고, 다음과 같이 패키지를 임포트한다. 
import torch, torch.nn as nn
import numpy as np
import pandas as pd
import os, time, math
import matplotlib.pyplot as plt
from tqdm import tqdm
os.environ['KMP_DUPLICATE_LIB_OK']='True' # Intel Math Kernel Library use

학습될 데이터 형식을 설정한다. 학습 데이터는 10개, 예측 데이터는 1개이다. 
input_window = 10 # number of input steps
output_window = 1 # number of prediction steps, in this model its fixed to one
batch_size = 250
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") 

페이스북 데이터를 읽고, 그래프형식으로 보여준다. 
# get this python's directory
module_path = os.path.dirname(__file__)
df = pd.read_csv(module_path + '/FB_raw.csv') # data path of facebook stock price (Apr 2019 - Nov 2020)
close = np.array(df['close'])
logreturn = np.diff(np.log(close)) # Transform closing price to log returns, instead of using min-max scaler
csum_logreturn = logreturn.cumsum() # Cumulative sum of log returns

fig, axs = plt.subplots(2, 1)
axs[0].plot(close, color='red')
axs[0].set_title('Closing Price')
axs[0].set_ylabel('Close Price')
axs[0].set_xlabel('Time Steps')

axs[1].plot(csum_logreturn, color='green')
axs[1].set_title('Cumulative Sum of Log Returns')
axs[1].set_xlabel('Time Steps')

fig.tight_layout()
plt.show()

벡터 임베딩 계산을 위해, 트랜스포머의 포지션 인코딩을 정의한다.  
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()       
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]        

트랜스포머 기반 시계열 학습 모델을 정의한다. 트랜스포머 정의 그대로 정의되며, decoder부분만 다르다. 이 부분은 트랜스포머 결과를 받아, 예측값 1개를 생성하는 full connection 레이어다. 
class transformer_seq(nn.Module):
    def __init__(self, feature_size=250, num_layers=1, dropout=0.1):
        super(transformer_seq, self).__init__()
        self.model_type = 'Transformer'
        
        self.src_mask = None
        self.pos_encoder = PositionalEncoding(feature_size)
        self.encoder_layer = nn.TransformerEncoderLayer(d_model=feature_size, nhead=10, dropout=dropout)   
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)        
        self.decoder = nn.Linear(feature_size,1)
        self.init_weights()  # decoder FC층 가중치 초기화

    def init_weights(self):
        initrange = 0.1    
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self,src):
        if self.src_mask is None or self.src_mask.size(0) != len(src):  # 목표 마스크 생성
            device = src.device
            mask = self.generate_square_subsequent_mask(len(src)).to(device)
            self.src_mask = mask

        src = self.pos_encoder(src)
        output = self.transformer_encoder(src,self.src_mask)
        output = self.decoder(output)
        return output

    def generate_square_subsequent_mask(self, sz):  # 목표 마스크 생성 함수
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask        

학습될 데이터를 생성한다. 주가 데이터에서 input은 10개 수치, output은 1개 수치이다.
def create_inout_sequences(input_data, tw):
    inout_seq = []
    L = len(input_data)
    for i in range(L-tw):
        train_seq = input_data[i:i+tw]
        train_label = input_data[i+output_window:i+tw+output_window]
        inout_seq.append((train_seq ,train_label))
    return torch.FloatTensor(inout_seq)

학습될 데이터를 train, test dataset으로 분할하는 함수를 정의한다. 
def get_data(data, split):
    series = data
    
    split = round(split*len(series))
    train_data = series[:split]
    test_data = series[split:]

    train_data = train_data.cumsum()
    train_data = 2*train_data # 학습 데이터 값을 2배 증폭함으로써 학습 정확도를 높인다.
    test_data = test_data.cumsum()

    train_sequence = create_inout_sequences(train_data,input_window)
    train_sequence = train_sequence[:-output_window]

    test_data = create_inout_sequences(test_data,input_window)
    test_data = test_data[:-output_window]

    return train_sequence.to(device), test_data.to(device)

학습될 배치 데이터를 리턴하는 함수를 정의한다.
def get_batch(source, i, batch_size):
    seq_len = min(batch_size, len(source) - 1 - i)
    data = source[i:i+seq_len]    
    input = torch.stack(torch.stack([item[0] for item in data]).chunk(input_window, 1))
    target = torch.stack(torch.stack([item[1] for item in data]).chunk(input_window, 1))
    return input, target

학습 함수를 정의한다.
def train(train_data):
    model.train() # Turn on the evaluation mode
    total_loss = 0.
    start_time = time.time()

    for batch, i in enumerate(range(0, len(train_data) - 1, batch_size)):  # 배치크기만큼 루프
        data, targets = get_batch(train_data, i,batch_size)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, targets)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.7)
        optimizer.step()

        total_loss += loss.item()
        log_interval = int(len(train_data) / batch_size / 5)
        if batch % log_interval == 0 and batch > 0:  # 배치 루프 시 loss, 정확도 출력
            cur_loss = total_loss / log_interval
            elapsed = time.time() - start_time
            print('| epoch {:3d} | {:5d}/{:5d} batches | lr {:02.10f} | {:5.2f} ms | loss {:5.7f}'.format(
                    epoch, batch, len(train_data) // batch_size, scheduler.get_lr()[0], elapsed * 1000 / log_interval,
                    cur_loss))
            total_loss = 0
            start_time = time.time()

평가 함수를 정의한다.
def evaluate(eval_model, data_source):
    eval_model.eval() # Turn on the evaluation mode
    total_loss = 0.
    eval_batch_size = 1000
    with torch.no_grad():
        for i in range(0, len(data_source) - 1, eval_batch_size):
            data, targets = get_batch(data_source, i, eval_batch_size)
            output = eval_model(data)            
            total_loss += len(data[0])* criterion(output, targets).cpu().item()
    return total_loss / len(data_source)

학습 모델 기반 데이터 예측 함수를 정의한다. 
def model_forecast(model, seqence):
    model.eval() 
    total_loss = 0.
    test_result = torch.Tensor(0)    
    truth = torch.Tensor(0)

    seq = np.pad(seqence, (0, 3), mode='constant', constant_values=(0, 0))
    seq = create_inout_sequences(seq, input_window)
    seq = seq[:-output_window].to(device)

    seq, _ = get_batch(seq, 0, 1)
    with torch.no_grad():
        for i in range(0, output_window):            
            output = model(seq[-output_window:])                        
            seq = torch.cat((seq, output[-1:]))

    seq = seq.cpu().view(-1).numpy()
    return seq

실 데이터를 이용한 데이터 예측 함수를 정의한다.
def forecast_seq(model, sequences):
    """Sequences data has to been windowed and passed through device"""
    start_timer = time.time()
    model.eval() 
    forecast_seq = torch.Tensor(0)    
    actual = torch.Tensor(0)
    with torch.no_grad():
        for i in tqdm(range(0, len(sequences) - 1)):
            data, target = get_batch(sequences, i, 1)
            output = model(data)            
            forecast_seq = torch.cat((forecast_seq, output[-1].view(-1).cpu()), 0)
            actual = torch.cat((actual, target[-1].view(-1).cpu()), 0)
    timed = time.time()-start_timer
    print(f"{timed} sec")

    return forecast_seq, actual

학습 데이터를 준비한다.
train_data, val_data = get_data(logreturn, 0.6) # 60% train, 40% test split

모델, Loss함수, 하이퍼파라메터, 스케쥴 등을 정의한다.
model = transformer_seq().to(device)
criterion = nn.MSELoss() # Loss function
lr = 0.00005 # learning rate
epochs =  500 # Number of epochs

optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)

for epoch in range(1, epochs + 1):
    epoch_start_time = time.time()
    train(train_data)
    
    if(epoch % epochs == 0): # 에폭마다 모델 평가
        val_loss = evaluate(model, val_data)
        print('-' * 80)
        print('| end of epoch {:3d} | time: {:5.2f}s | valid loss: {:5.7f}'.format(epoch, (time.time() - epoch_start_time), val_loss))
        print('-' * 80)
    else:   
        print('-' * 80)
        print('| end of epoch {:3d} | time: {:5.2f}s'.format(epoch, (time.time() - epoch_start_time)))
        print('-' * 80)

    scheduler.step() 

학습 후 모델 이용해 데이터 예측하고, 실제 데이터와 비교한다.
test_result, truth = forecast_seq(model, val_data)

plt.plot(truth, color='red', alpha=0.7)
plt.plot(test_result, color='blue', linewidth=0.7)
plt.title('Actual vs Forecast')
plt.legend(['Actual', 'Forecast'])
plt.xlabel('Time Steps')
plt.show()

테스트를 위해 랜덤값을 이용해 비교해 본다. 
r = np.random.randint(100000, 160000)
test_forecast = model_forecast(model, csum_logreturn[r: r+10]) # random 10 sequence length

print(f"forecast sequence: {test_forecast}")
print(f"Actual sequence: {csum_logreturn[r: r+11]}")
torch.save(model.state_dict(), "transformer_ts_20231211.pth")

학습 파일을 로딩한 후, 다른 시계열 데이터셋도 테스트해본다. 
model2 = TransAm() # rename as model2
model2.load_state_dict(torch.load("transformer_ts_20231211.pth"))
model2.to(device)

df2 = pd.read_csv(module_path + '/BA_raw.csv') # 보잉 주식 테스트
close2 = df2['close'].fillna(method = 'ffill')
close2 = np.array(close2)
logreturn2 = np.diff(np.log(close2))

train_data2, val_data2 = get_data(logreturn2, 0.6)
test2_eval = evaluate(model2, val_data2)
print(f'Test 2 loss: {test2_eval :.5f}')

test_result2, truth2 = forecast_seq(model2, val_data2)

plt.plot(truth2, color='red', alpha=0.7)
plt.plot(test_result2, color='blue', linewidth=0.7)
plt.title('Actual vs Forecast')
plt.legend(['Actual', 'Forecast'])
plt.xlabel('Time Steps')
plt.show()

df3 = pd.read_csv(module_path + '/JPM_raw.csv') # JPMorgan Chase & Co 주식 테스트
close3 = df3['close'].fillna(method = 'ffill')
close3 = np.array(close3)
logreturn3 = np.diff(np.log(close3))

train_data3, val_data3 = get_data(logreturn3, 0.6)
test3_eval = evaluate(model2, val_data3)
print(f'Test 3 loss: {test3_eval :.5f}')

test_result3, truth3 = forecast_seq(model2, val_data3)

plt.plot(truth3, color='red', alpha=0.7)
plt.plot(test_result3, color='blue', linewidth=0.7)
plt.title('Actual vs Forecast')
plt.legend(['Actual', 'Forecast'])
plt.xlabel('Time Steps')
plt.show()

결과는 다음과 같이, 잘 학습되어, 데이터셋을 예측하고 있는 것을 알 수 있다. 
학습 과정 화면
테스트 데이터셋 예측 결과(페이스북 주가)

전혀 다른 주식 종목도 예측해 보자. 다음은 보잉 주가이다. 
보잉 주가 예측
다음은 제이슨 모건 주식이다. 
제이슨 모건 주가 예측

결과적으로 학습 모델이 패턴을 잘 예측한다. 

레퍼런스

댓글 없음:

댓글 쓰기