2019년 11월 28일 목요일

LSTM 딥러닝 모델 기반 이상 데이터 탐지

이 글은 LSTM 기반 이상신호 탐지용 딥러닝 모델 개발 방법을 간략히 설명한다. 이상탐지는 장비 모니터링, 사기 탐지, 해킹 등 다양한 분야에서 응용될 수 있다. 이 글은 LSTM(Long Short Term Memory) 딥러닝 모델을 이용한 이상탐지 방법을 간략히 설명하고 구현하는 방법을 설명한다.

머리말
시계열에서 이상탐지는 예상되는 미래값이 실제 얻은 값과 편차를 가지게 될 때 이상으로 판단한다. 학습 단계는 다음과 같다.

1. 시계열 데이터의 과거값들을 이용해, 한단계 바로 뒤의 값들을 예측하도록 학습 데이터를 만든다.
2. 다변량 가우스 분포를 이용해 오차 벡터를 계산한다.
3. 만약, 예측된 값과 실제값의 오차가 다변량 가우스 분포의 끝 부분에 위치해 있다면, 일반적으로 예측될 수 있는 값의 범위를 넘어가므로, 이상이라 판단한다.


이상 패턴 탐지
이상 패턴을 탐지하는 기법은 통계 분야에서 오래전부터 발전해 온 것이다. 대표적인 방법은 다음과 같다. 
  • 정규분포 표준편차 기반 이상 탐지. 예) 표준편차의 3시그마 이상은 이상값이라 판단
  • 경계 기준 기반 이상 탐지. 예) B1 < value < B2
  • kNN, DBScan, RCF(Random Cut Forest) 클러스터링 기법. 예) 클러스터링 그룹 중 sample 수 N 개 이하면 이상값
  • 유클리드, 마할라노비스 거리 기법. 다변수량 상관관계에 따른 거리 계산 후 이상값 탐지 가능
정규분포 표준편차

여기서는 다변수 데이터셋에 대한 이상패턴 탐지 기법을 알아본다.

Mahalanobis 거리
분포내 예측과 오차에 대한 희소한 양을 표시하기 위해 마할라노비스 거리란 함수를 사용한다. 이 거리는 x와 y로 구성된 벡터의 상관성을 계산할 수 있다.. 수식은 다음과 같다. 상세 개념은 여기를 참고한다.

실제 코딩해 보기
간단한 예제를 코딩해 보도록 한다. 아래와 같은 패키지가 우선 설치되어 있어야 한다.
  • ReNom 2.6.2
  • matplotlib 2.2.2
  • pandas 0.23.1
  • numpy 1.14.5
  • scikit-learn 0.19.1
  • scipy 1.1.0
제대로 설치되어 있다면, 다음 코드를 실행해 본다.
%matplotlib inline
import matplotlib.pyplot as plt
import pandas as pd 
import numpy as np 
from sklearn.model_selection import train_test_split
from copy import deepcopy
from sklearn.preprocessing import StandardScaler

import renom as rm
from renom.optimizer import Adam
from renom.cuda import set_cuda_active
set_cuda_active(False)

데이터셋은 다은 링크에서 ECG dataset, qtdb/sel102 ECG dataset 을 사용한다.
http://www.cs.ucr.edu/~eamonn/discords/

ECG data를 정규화하기 위해, 시간을 0 - 5000 부분만 플롯팅해 그 구조를 확인해 본다.
df = pd.read_csv('data/qtdbsel102.txt', header=None, delimiter='\t')
ecg = df.iloc[:,2].values
ecg = ecg.reshape(len(ecg), -1)
print('length of ECG data : ', len(ecg))

# standardize
scaler = StandardScaler()
std_ecg = scaler.fit_transform(ecg)

plt.style.use('ggplot')
plt.figure(figsize=(15,5))
plt.xlabel('time')
plt.ylabel('ECG\'s value')
plt.plot(np.arange(5000), std_ecg[:5000], color='b')
plt.ylim(-3, 3)
x = np.arange(4200,4400)
y1 = [-3]*len(x)
y2 = [3]*len(x)
plt.fill_between(x, y1, y2, facecolor='g', alpha=.3)
plt.show()

이 데이터는 주기적이다. 대략 4250 시간 근처에 주기가 붕괴되어 변화가 있다.

1단계 - LSTM 학습
1단계의 목적은 LSTM으로 정상 데이터를 학습하는 것이다. 그러므로, 5000이후 데이터를 학습용 데이터로 사용한다.
normal_cycle = std_ecg[5000:]

plt.figure(figsize=(10,5))
plt.title("training data")
plt.xlabel('time')
plt.ylabel('ECG\'s value')
plt.plot(np.arange(5000,8000), normal_cycle[:3000], color='b')# stop plot at 8000 times for friendly visual
plt.show()


이제, 다음 코드로 d 길이만큼 서브시퀀스를 생성하고, l 차원의 라벨을 생성한다.
 create data of the "look_back" length from time-series, "ts"
# and the next "pred_length" values as labels
def create_subseq(ts, look_back, pred_length):
    sub_seq, next_values = [], []
    for i in range(len(ts)-look_back-pred_length):  
        sub_seq.append(ts[i:i+look_back])
        next_values.append(ts[i+look_back:i+look_back+pred_length].T[0])
    return sub_seq, next_values

이 예제에서는 d = 10, l = 3이다. 아래 코드를 실행해, sub_seq와 예측해야할 라벨이 있는 next_values 시퀀스를 만든 후, 훈련 및 검증 데이터로 분리한다.
look_back = 10
pred_length = 3

sub_seq, next_values = create_subseq(normal_cycle, look_back, pred_length)


X_train, X_test, y_train, y_test = train_test_split(
    sub_seq, next_values, test_size=0.2)

X_train = np.array(X_train)
X_test = np.array(X_test)
y_train = np.array(y_train)
y_test = np.array(y_test)


train_size = X_train.shape[0]
test_size = X_test.shape[0]
print('train size:{}, test size:{}'.format(train_size, test_size))

LSTM 모델을 정의한다.
# model definition
model = rm.Sequential([
    rm.Lstm(35),
    rm.Relu(),
    rm.Lstm(35),
    rm.Relu(),
    rm.Dense(pred_length)
    ])

LSTM에서 35개 벡터를 출력하고, RELU 함수를 거친 후, 다시 LSTM 35개 벡터 출력, RELU 함수 처리, 3개 벡터값을 예측하기 위해 Dense 레이어를 추가했다.

배치 크기와 최대 세대를 설정하고, 최적화 함수는 ADAM으로 한다.
# params
batch_size = 100
max_epoch = 2000
period = 10 # early stopping checking period
optimizer = Adam()

이제, 다음과 같이 학습시킨다. 참고로, 아래 코드에서 np.random.permutation()은 학습을 잘 시켜주기 위해, 순열을 섞어주는 함수이다(링크). 큰 순서는 다음과 같다.
1. model.train()에서 학습한다.
2. 모든 데이터에 대해, 바로 학습된 모델에 학습데이터를 입력하여, 출력된 z와 실제 라벨값인 batch_y와 차이를 MSE(Mean Square Error)로 계산하여, train_loss 값을 누적한다.
3. 모든 데이터에 대해, 학습된 모델을 이용해 예측 에러가 얼마인지 계산한다.
앞의 과정을 테스트 손실이 특정 조건이 될때까지 반복해 학습시킨다.
# Train Loop
epoch = 0
loss_prev = np.inf

learning_curve, test_curve = [], []

while(epoch < max_epoch):
    epoch += 1

    perm = np.random.permutation(train_size)
    train_loss = 0

    for i in range(train_size // batch_size):
        batch_x = X_train[perm[i*batch_size:(i+1)*batch_size]]
        batch_y = y_train[perm[i*batch_size:(i+1)*batch_size]]

        # Forward propagation
        l = 0
        z = 0
        with model.train():
            for t in range(look_back):
                z = model(batch_x[:,t])
                l = rm.mse(z, batch_y)
            model.truncate()
        l.grad().update(optimizer)
        train_loss += l.as_ndarray()

    train_loss /= (train_size // batch_size)
    learning_curve.append(train_loss)

    # test
    l = 0
    z = 0
    for t in range(look_back):
        z = model(X_test[:,t])
        l = rm.mse(z, y_test)
    model.truncate()
    test_loss = l.as_ndarray()
    test_curve.append(test_loss)

    # check early stopping
    if epoch % period == 0:
        print('epoch:{} train loss:{} test loss:{}'.format(epoch, train_loss, test_loss))
        if test_loss > loss_prev*0.99:
            print('Stop learning')
            break
        else:
            loss_prev = deepcopy(test_loss)

plt.figure(figsize=(10,5))
plt.plot(learning_curve, color='b', label='learning curve')
plt.plot(test_curve, color='orange', label='test curve')
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend(fontsize=20)
plt.show()

실행 결과는 다음과 같다.

2단계 - 가우스 분포 피팅
이상 탐지를 위해 가우스 분포 계산을 한다.
# computing errors
for t in range(look_back):
    pred = model(X_test[:,t])
model.truncate()
errors = y_test - pred

mean = sum(errors)/len(errors)

cov = 0
for e in errors:
    cov += np.dot((e-mean).reshape(len(e), 1), (e-mean).reshape(1, len(e)))
cov /= len(errors)

print('mean : ', mean)
print('cov : ', cov)

mean :  [-0.00471252  0.00561184  0.01125641]
cov :  [[0.00093565 0.00088413 0.00097755]
 [0.00088413 0.00208558 0.0025572 ]
 [0.00097755 0.0025572  0.00498106]]

3단계 - 이상탐지
이 예측 모델이 미지의 데이터에 대해서도 제대로 동작하는 지 검증한다. 우선, 서브 시퀀스와 라벨을 생성하고 에러 벡터를 계산한다.
# calculate Mahalanobis distance
def Mahala_distantce(x,mean,cov):
    d = np.dot(x-mean,np.linalg.inv(cov))
    d = np.dot(d, (x-mean).T)
    return d

# anomaly detection
sub_seq, next_values = create_subseq(std_ecg[:5000], look_back, pred_length)
sub_seq = np.array(sub_seq)
next_values = np.array(next_values)

for t in range(look_back):
    pred = model(sub_seq[:,t])
model.truncate()
errors = next_values - pred

다음으로 Mahalanobis 거리를 출력해 얼마나 이상한 값인지를 판단한다.
m_dist = [0]*look_back 
for e in errors:
    m_dist.append(Mahala_distantce(e,mean,cov))

fig, axes = plt.subplots(nrows=2, figsize=(15,10))

axes[0].plot(std_ecg[:5000],color='b',label='original data')
axes[0].set_xlabel('time')
axes[0].set_ylabel('ECG\'s value' )
axes[0].set_ylim(-3, 3)
x = np.arange(4200,4400)
y1 = [-3]*len(x)
y2 = [3]*len(x)
axes[0].fill_between(x, y1, y2, facecolor='g', alpha=.3)

axes[1].plot(m_dist, color='r',label='Mahalanobis Distance')
axes[1].set_xlabel('time')
axes[1].set_ylabel('Mahalanobis Distance')
axes[1].set_ylim(0, 1000)
y1 = [0]*len(x)
y2 = [1000]*len(x)
axes[1].fill_between(x, y1, y2, facecolor='g', alpha=.3)

plt.legend(fontsize=15)
plt.show()


앞의 그림을 보면 4250 근처에 Mahalanobis 거리가 가장 크다. 이를 통해, 주기적 데이터가 언제 붕괴가 일어나는 지를 확인할 수 있다.

이런 방식으로 어떤 데이터든지 Anomaly Detection이 가능하다.


레퍼런스

댓글 없음:

댓글 쓰기