이 글은 LSTM을 이용한 주가 이상 탐지 방법을 간단히 요약한다.
예측 데이터는 The S&P 500 지수이다. 이 데이터는 https://www.kaggle.com/pdquant/sp500-daily-19862018 에서 다운로드 받는다.
데이터를 우선 다음과 같이 다운로드한다.
!gdown --id 10vdMg_RazoIatwrT7azKFX4P02OebU76 --output spx.csv
소스코드는 다음과 같다.
df = pd.read_csv('spx.csv', parse_dates=['date'], index_col='date')
train_size = int(len(df) * 0.95)
test_size = len(df) - train_size
train, test = df.iloc[0:train_size], df.iloc[train_size:len(df)]
print(train.shape, test.shape)
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
scaler = scaler.fit(train[['close']])
train['close'] = scaler.transform(train[['close']])
test['close'] = scaler.transform(test[['close']])
def create_dataset(X, y, time_steps=1):
Xs, ys = [], []
for i in range(len(X) - time_steps):
v = X.iloc[i:(i + time_steps)].values
Xs.append(v)
ys.append(y.iloc[i + time_steps])
return np.array(Xs), np.array(ys)
TIME_STEPS = 30
# reshape to [samples, time_steps, n_features]
X_train, y_train = create_dataset(
train[['close']],
train.close,
TIME_STEPS
)
X_test, y_test = create_dataset(
test[['close']],
test.close,
TIME_STEPS
)
print(X_train.shape)
model = keras.Sequential()
model.add(keras.layers.LSTM(
units=64,
input_shape=(X_train.shape[1], X_train.shape[2])
))
model.add(keras.layers.Dropout(rate=0.2))
model.add(keras.layers.RepeatVector(n=X_train.shape[1]))
model.add(keras.layers.LSTM(units=64, return_sequences=True))
model.add(keras.layers.Dropout(rate=0.2))
model.add(
keras.layers.TimeDistributed(
keras.layers.Dense(units=X_train.shape[2])
)
)
model.compile(loss='mae', optimizer='adam')
history = model.fit(
X_train, y_train,
epochs=10,
batch_size=32,
validation_split=0.1,
shuffle=False
)
X_train_pred = model.predict(X_train)
train_mae_loss = np.mean(np.abs(X_train_pred - X_train), axis=1)
THRESHOLD = 0.65
X_test_pred = model.predict(X_test)
test_mae_loss = np.mean(np.abs(X_test_pred - X_test), axis=1)
test_score_df = pd.DataFrame(index=test[TIME_STEPS:].index)
test_score_df['loss'] = test_mae_loss
test_score_df['threshold'] = THRESHOLD
test_score_df['anomaly'] = test_score_df.loss > test_score_df.threshold
test_score_df['close'] = test[TIME_STEPS:].close
anomalies = test_score_df[test_score_df.anomaly == True]
레퍼런스
댓글 없음:
댓글 쓰기