2024년 4월 26일 금요일

Django, Bootstrap, GIS 지도 기반 IoT 데이터 분석 데쉬보드 만들어보기

이 글은 Django, Bootstrap 기반 데이터 분석 데쉬보드 개발방법을 간략히 정리한 것이다.

IoT 데쉬보드 web app 서버 실행 (example)

요구사항 디자인
다음과 같은 목적의 웹앱 서비스를 가정한다. 
  • GIS 기반 센서 위치 관리
  • IoT 데이터셋 표현
  • IoT 장치 관리
  • IoT 장치 활성화 관리 KPI 표현
  • 계정 관리
  • 기타 메뉴 
이러한 정보를 보여줄 수 있는 데쉬보드 웹앱을 디자인한다. 이 예제는 데쉬보드 레이아웃을 가진 웹앱 프레임 개발 방법을 보여주는 것에 집중한다. 세부 비지니스 로직 및 데이터베이스 모델은 다른 페이지를 참고한다.
개발환경 준비
개발도구
개발에 필요한 도구는 다음으로 한다.
  • UI: bootstrap
  • 웹앱 프레임웍: DJango
  • GIS: leaflet, Cesium
  • 데이터소스: sqlite, spreadsheet, mongodb
  • 호스팅
상세 소스코드는 다음을 참고한다.
웹앱 프로젝트 생성
다음과 같이 장고 웹앱 프로젝트를 생성한다. 
python -m venv myenv
source myenv/bin/activate  
pip install django pandas
django-admin startproject iot_dashboard
cd iot_dashboard
python manage.py startapp dashboard

생성된 프로젝트 폴더 구조는 다음과 같다.

디자인 스타일 고려사항
부트스랩 레이아웃 표현
부트스트랩의 그리드 시스템은 12개 열로 디자인된다. 이는 유연성과 사용 편의성을 제공하기 위한 디자인 결정이다. 반응형 웹사이트를 구축하는 데 많이 사용된다. 

참고로, 12라는 숫자는 많은 약수(1, 2, 3, 4, 6, 12)를 갖고 있어 다양한 열의 조합으로 균등하게 나눌 수 있다. 이를 통해 분수나 번거로운 나머지 없이 다양한 레이아웃을 만들 수 있다.
  • 유연성: 12개의 열을 사용하면 다양한 화면 크기와 디바이스에 적합한 레이아웃을 쉽게 만들 수 있다. 각 요소가 차지하는 열의 수를 조정하여 대형 데스크톱 화면, 태블릿 및 스마트폰에서 잘 보이는 반응형 디자인을 만들 수 있다.
  • 이해하기 쉬움: 12개의 열을 기반으로 한 그리드 시스템은 디자이너와 개발자들에게 직관적이다. 그리드 내에서 요소들이 어떻게 동작할지 시각화하고 계산하기 쉽기 때문에 일관된 레이아웃을 생성하고 유지하기가 간단하다.
  • 디자인 관행: 12개의 열을 사용하는 그리드 시스템은 부트스트랩 이전부터 다양한 그래픽 디자인 및 레이아웃 소프트웨어에서 사용되어 왔다. 
데쉬보드 카드 스타일
데쉬보드에 컨텐츠를 담을 패널을 카드 스타일로 표현할 수 있다. 카드 내에 차트 뿐 아니라 지도 등 그래픽도 표시할 수 있다. 

보통, 일반적인 카드 스타일 구조는 다음과 같다. 
            <div class="row">
               <div class="col-lg-8">
                   <div class="card mb-3">
                       <div class="card-header">
                           <i class="fa fa-map"></i> Leaflet Map
                       </div>
                       <div class="card-body">
                           <div id="leafletMap" style="width:100%; height: 450px"></div>
                       </div>
                   </div>
               </div>
           </div>     

이 코드는 행 스타일 안에 가변 8개 컬럼을 차지(col-lg-8)하고, 중간 수준 마진(card mb-3)를 가지는 카드를 생성한다. 카드는 헤더(card_header)와 본체(card-body)를 가지며, 헤더는 아이콘(<i>) 스타일의 Font Awesome의 map 아이콘을 사용한다. body 내에는 디스플레이할 위젯을 표시할 <div>를 정의한다.

부트스랩은 이와 같은 style tag가 있어, 다양한 UI를 손쉽게 정의할 수 있다. 자세한 내용은 다음을 참고한다.
개발
주요 구현부분만 표현한다. 상세 구현 코드는 앞의 github 링크를 참고한다.

데이터소스 모델 연결 및 차트 표시
본 예시는 데쉬보드 앱 디자인 및 개발 과정을 보여주는 것이므로, 간단한 iot sample dataset을 다음과 같이 models.py에 개발해 놓는다. 
def IoT_model_from_file():
    save_sample_iot_dataset()
    df = pd.read_csv('iot_dataset_sample.csv')
    json_dict = df.to_dict('records')
    return json_dict

def save_sample_iot_dataset():
    # Create a DataFrame
    df = pd.DataFrame({
        'time': [datetime.now() - timedelta(days=i) for i in range(10)],
        'open': [randint(100, 200) for _ in range(10)],
        'high': [randint(200, 300) for _ in range(10)],
        'low': [randint(50, 100) for _ in range(10)],
        'close': [randint(100, 200) for _ in range(10)],
    })

    # Convert the 'time' column to a string in the format 'YYYY-MM-DD'
    df['time'] = df['time'].dt.strftime('%Y-%m-%d')

    # Save the DataFrame to a CSV file
    df.to_csv('iot_dataset_sample.csv', index=False)

IoT 센서 실시간 데이터 표시
특정 카드 내 차트에 실시간으로 데이터를 표현하기 위해서, 장고에서는 html script > view > model 업데이트 과정을 거쳐야 한다. 이 경우는 3초마다 센서 데이터를 화면에 업데이트한다고 가정한다. 이를 위해, 데이터가 준비되면 렌더링될 수 있도록 비동기 처리 요구 방식을 사용한다.

index.html의 script부분에 아래 코드를 추가한다. 
         setInterval(fetchColumnData, 3000); // 3초마다 업데이트
         function fetchColumnData() {
            var xhr = new XMLHttpRequest();
            xhr.open('GET', '/charts?chartType=column', true);
            xhr.onreadystatechange = function () {
               if (xhr.readyState == 4 && xhr.status == 200) {  // 비동기. 데이터 준비 시 호출
                     var columnData = JSON.parse(xhr.responseText);
                     columnChart.options.data[0].dataPoints = columnData;
                     columnChart.render();  // 값을 차트에 업데이트

                     var columnChart_ready = document.getElementById('columnChart_ready');
                     var columnChart_operation = document.getElementById('columnChart_operation');
                     var columnChart_shutdown = document.getElementById('columnChart_shutdown');
                     columnChart_ready.innerHTML = columnData[0].y;
                     columnChart_operation.innerHTML = columnData[1].y;
                     columnChart_shutdown.innerHTML = columnData[2].y;
               }
            };
            xhr.send();
         }

참고로, 이러한 방식은 많은 CPU 부하를 차지하므로, 혹시 이런 방식이 필요하다면, 실시간 업데이트 기능이 필요한 사용자만 사용할 수 있도록 페이지를 분리하거나, 별도 UI 앱을 개발하자.

이외, 리플렛, 세슘 라이브러리를 이용해 2차원, 3차원 화면을 표시한다. 세귬은 미리 API 사용 토큰을 발급받아야 제대로 동작된다. 
Cesium.Ion.defaultAccessToken = 'your_access_token';

실행 결과
앞의 디자인 목적을 고려한 데쉬보드 실행 결과는 다음과 같다.


웹 서비스 배포 및 호스팅
호스팅 방법은 다양하다. 여기에선 최근 인기가 높아지고 있는 cloudtype을 사용해 배포한다.
cloud server setting 모습

배포에 성공하면, 다음과 같이 외부에서도 웹 서비스에 접속 실행될 것이다. 
실행 화면

스마트폰에서 접속하면, bootstrap layout style에 따라 패널이 잘 정렬되어 보여지는 것을 확인할 수 있다.
 
스마트 폰 실행 모습

이외에 유용한 배포 호스팅 서버로 python anywhere, amazon free tier 등이 있다. 아래 링크를 참고한다.
마무리
장고, 부트스랩 등을 이용하면, 손쉽게 이와 같은 데쉬보드를 개발할 수 있다.

레퍼런스

2024년 4월 21일 일요일

랭체인과 임베딩 벡터 데이터베이스 아키텍처 및 알고리즘 분석

이 글은 랭체인과 임베딩 벡터 데이터베이스를 분석한 것이다. 이를 통해, LLM 서비스 개발에 필요한 라이브러리 및 데이터베이스 구조를 이해하고, 다양한 분야에 응용할 수 있다. 
이 글은 다음 라이브러리 및 플랫폼 아키텍처 및 알고리즘을 분석한다. 
  • 랭체인
  • 벡터 데이터베이스
  • OLLAMA
  • LLAMA2.c

이 글에 관련된 용어와 상세 개념은 다음 링크를 참고한다. 

Chroma(크로마)는 AI 지원 오픈소스 벡터 베이터베이스이다. RAG 처리할 때 필수적으로 사용되는 데이터베이스 중 하나이다. 크로마를 이용해 LLM 기반 다양한 앱(지식 서비스 등)을 개발할 수 있다. 
크로마는 임베딩 벡터를 메타데이터와 함께 저장하고, 질의를 통해 해당 임베딩 도큐먼트를 검색할 수 있다. 크로마는 서버로써 동작될 수 있다(데모). 

크로마는 해커 기질과 철학이 섞여 있는 Jeff Huber, Anton Troynikov가 공동 개발하였다. 이들은 이전에 3D computer vision, 로보틱스 분야에 일했던 경험이 있다. 현재, 크로마는 1,800만달러를 펀딩받았고, 다음 라운드를 준비하고 있다. 
공동 개발자 Jeff Huber, Anton Troynikov

크로마 설치는 다음과 같다. 
pip install chromadb

벡터 데이터베이스에 저장되는 단위는 다음과 같다. 
collection = client.create_collectoin(name='test', embedding_function=emb_fn)

collection.add(
    embeddings=[
        [1.1, 2.3, 3.2],
        [4.5, 6.9, 4.4],
        [1.1, 2.3, 3.2]
    ],
    metadatas=[
        {"uri": "img1.png", "style": "style1"},
        {"uri": "img2.png", "style": "style2"},
        {"uri": "img3.png", "style": "style1"}
    ],
    documents=["doc1", "doc2", "doc3"],
    ids=["id1", "id2", "id3"],
)

보는 것 같이, 벡터 좌표계에 위치할 임베딩 벡터, 벡터에 매달아 놓을 메타데이터와 도큐먼트, ID를 하나의 컬랙션 단위로 저장한다. 이를 통해, 벡터 간 유사도, 거리 등을 계산해, 원하는 도큐먼트, 메타데이터 등을 얻을 수 있다. 이때 임베딩 벡터는 미리 학습된 임베딩 모델을 사용할 수 있다. 

질의해 원하는 벡터를 얻으려면, 벡터 공간에서 거리계산이 필수적이다. 이때 사용하는 함수는 다음과 같다. 

컬렉션에 벡터 추가와 질의는 다음과 같다. 
collection.add(
    documents=["doc1", "doc2", "doc3", ...],
    embeddings=[[1.1, 2.3, 3.2], [4.5, 6.9, 4.4], [1.1, 2.3, 3.2], ...],
    metadatas=[{"chapter": "3", "verse": "16"}, {"chapter": "3", "verse": "5"}, {"chapter": "29", "verse": "11"}, ...],
    ids=["id1", "id2", "id3", ...]
)

collection.query(
    query_texts=["doc10", "thus spake zarathustra", ...],
    n_results=10,
    where={"metadata_field": "is_equal_to_this"},
    where_document={"$contains":"search_string"}
)

여기서, where의 metadata_field를 이용해 다음과 같은 조건 비교 연산이 가능하다.
  $eq, $ne, $gt, $gte, $lt, $lte
 
그리고, 논리 연산자인 $and, $or를 지원한다.

크로마는 향후, 워크플로우, 가시화, 질의 계획, 분석 기능을 준비하고 있다. 

Langchain 구조 분석
Langchain(랭체인)은 LLM에 원하는 결과를 얻을 수 있도록, RAG, 튜닝과 같은 기능을 제공하는 라이브러리다. 랭체인 설치는 다음과 같다(참고). 
pip install langchain

랭체인은 모델 입출력, 데이터 검색, 에이전트 지원, 체인, 컨텍스트 메모리 기능을 제공하며, LCEL(LangChain Expression Language)를 이용해 각 구성요소를 유기적으로 연결시킬 수 있다. 다음은 LCEL 예시를 보여준다. 

from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema import BaseOutputParser

# LCEL Example chain
chain = ChatPromptTemplate() | ChatOpenAI() | CustomOutputParser()

이와 더불어, 목적에 맞는 다양한 프롬프트 템플릿, 구조화된 출력을 제공한다.
from langchain.output_parsers.json import SimpleJsonOutputParser

# Create a JSON prompt
json_prompt = PromptTemplate.from_template(
    "Return a JSON object with `birthdate` and `birthplace` key that answers the following question: {question}"
)

# Initialize the JSON parser
json_parser = SimpleJsonOutputParser()

# Create a chain with the prompt, model, and parser
json_chain = json_prompt | model | json_parser

# Stream through the results
result_list = list(json_chain.stream({"question": "When and where was Elon Musk born?"}))

# The result is a list of JSON-like dictionaries
print(result_list)



LLamaIndex 구조 분석
LLamaIndex는 LLM 클라우드에서 원하는 LLM을 검색하여, 모델을 다운로드받고, 사용하기 전까지 필요한 단계를 자동화한 라이브러리이다. 

레퍼런스
부록: RAG 기반 SQL 코딩 에이전트 개발


2024년 4월 11일 목요일

도커 이미지 빌드 시 문제 해결 솔류션 정리

이 글은 도커 이미지 빌드 시 문제 해결 솔류션을 정리한 글이다. 


도커 컨테이너 이미지는 개발, 운영 환경을 독립적으로 실행할 수 있는 가상화를 지원한다. 다만, 도커 이미지 빌드 시 설치되는 수많은 패키지에 의존되므로, 여러 에러가 발생된다. 대표적인 에러는 다음과 같다. 
  1. 'DEBIAN_FRONTEND=noninteractive' not working inside shell script with apt-get
  2. How can I set the default timezone
  3. Docker build failed with Hash Sum mismatch error
  4. Docker build failed with error "Hash Sum mismatch"
  5. throws an error saying "Some index files failed to download. They have been ignored or old ones used instead."
  6. Error while loading conda entry point: conda-libmamba-solver (libarchive.so.19: cannot open shared object file: No such file or directory)
  7. OpenGL, 3D graphics problems in docker
  8. Windows Volume mount
이외에도 매우 다양한데, 사실, 설치되는 패키지들이 많은 도커 이미지는 그만큼 원인이 많다. 스택 오버플로우 관련 댓글을 보면 알겠지만, 누구는 이런 문제로 몇 일이 날라갔다 등의 성토글을 볼 수 있다. 그만큼 원인이 다양하게 조합될 수 있다.

앞의 에러 솔류션만 정리해 본다. 

1번은 도커 빌드 시 [yes/no] 프롬프트 입력 상황에서 발생한다. 도커 빌드 중에는 키보드 입력이 안된다. 그러므로, 다음과 같이 인터렉티브가 없다고 설정하면 해결된다.
ARG DEBIAN_FRONTEND=noninteractive

2번은 타임존 문제로 이는 다음과 같이 설정한다. 타임존이 제대로 설정되어 있지 않으면, 패키지 설치에 실패할 수 있다. 
ENV LANG=C.UTF-8 LC_ALL=C.UTF-8
ENV TZ=Asia/Seoul

3, 4번 해쉬섬 에러 문제는 보통 우분투나 리눅스의 apt-get update 시 발생하는 데, stack overflow에 검색해 보아도 관련 솔류션을 시도해도 잘 처리 안되는 경우가 발생한다. 

이 경우는 다음과 같은 솔류션을 apt-get update 전에 dockerfile에 정의하라고 되어 있는 답변이 많다. 

RUN rm -rf /var/lib/apt/lists/*

RUN echo "Acquire::http::Pipeline-Depth 0;" > /etc/apt/apt.conf.d/99custom && \
    echo "Acquire::http::No-Cache true;" >> /etc/apt/apt.conf.d/99custom && \
    echo "Acquire::BrokenProxy    true;" >> /etc/apt/apt.conf.d/99custom

RUN echo "Acquire::Check-Valid-Until \"false\";\nAcquire::Check-Date \"false\";" | cat > /etc/apt/apt.conf.d/10no--check-valid-until

이렇게 해도 해결안되면, 기본이 되는 이미지를 우분투 과거 버전(예. 20.04), 혹은 미리 패키지 설치된 버전으로 설정하고 다시 시도한다. 
FROM ubuntu:20.04
FROM continuumio/miniconda3

이 문제는 무언가 해당 패키지를 배포하는 서버에서 문제가 있거나, 도커 빌드 컴퓨터의 시간이 안맞는 등의 문제로 예상된다. 

5번은 도커 이미지 빌드 시 동작되는 방식에 원인이 있다. 도커는 빌드 과정을 재활용하기 위해 레이어, 캐쉬 등을 사용하는 데, 가끔 이 부분이 꼬이는 경우가 있다. 이 경우, 다음과 같이 빌드해 본다 .
docker build --no-cache -t <docker_name> .

안되면, 현재 도커 이미지들을 모두 삭제하거나, 다음과 같이 clean, purge를 한다.
docker image rm -f
docker system prune -a -f

6번은 미리 설치된 아나콘다 이미지를 사용하라고 권장한다. 다음은 그 예이다.
FROM continuumio/miniconda3

7번은 호스트 서버가 3D GPU가 지원되지 않으면 아직 뚜렷한 해결방법은 없다. 도커 이미지는 본질적으로 3D GUI용으로 사용되도록 개발된 것이 아니다. 3D GUI로 이미지를 생성, 저장하는 등의 기능은 기본적으로 도커에서는 처리되지 않는다. 
이를 우회적으로 지원하는 몇 가지 방법이 있다. VirtualGL(ref), Nvidia GPU docker 등이 그러하다. 하지만, 도커가 실행되는 호스트 서버에 설치된 그래픽카드에 따라 이런 옵션이 동작되지 않을 수도 있다. 

8번은 윈도우에서 호스트 폴더와 도커 내 폴더를 마운트할 때 발생하는 문제이다. 황당하게도, 우분투에는 문제 없는 볼륨 마운트가 윈도우에는 별도 설정을 해줘야 한다. 다음과 같이 도커 설정 메뉴에서 Resource 메뉴의 File sharing 경로를 추가해야 -v 마운트 옵션이 동작된다. 

아울러, 윈도우에서는 경로 설정이 우분투와 다르므로, 아래와 같이 절대 경로 지정해야 한다.
docker run -v c:/input_data:/app/input -it <docker_image_name> python app_program.py --input ./input/data.json 


레퍼런스

2024년 4월 8일 월요일

시계열 데이터 예측을 위한 트랜스포머 모델 개발하기

이 글은 시계열 트랜스포머 모델 개념 및 활용 방법을 간략히 정리한다.

트랜스포머 개념도

머리말
트랜스포머의 원리를 이해하고 있다면, 수치로 표현된 시계열 벡터도 라벨링된 시계열 벡터가 있다면, 이를 학습할 수 있다는 것을 이해할 것이다. 

이 글은 트랜스포머 모델을 이용해 시계열을 학습하는 방법을 실습한다. 트랜스포머의 세부적인 기술은 다음 링크 및 레퍼런스를 참고한다. 
개발 환경
이 글을 실습하기 위해서는 기본적으로 다음 개발환경이 준비되어야 한다. 
  • NVIDIA GPU driver
  • Ubuntu or Windows CUDA driver 
  • PyTorch
  • Pandas, matplotlib
본론
시계열 데이터에서 패턴을 학습하고, 다음 값을 예측하는 방법은 여러가지가 있다. 트랜스포머의 경우에도, 트랜스포머 분류 기법을 이용하는 방법, Temporal Fusion Transformer를 사용하는 방법 등이 있다.

여기에서는 기본 개념을 먼저 이해하기 위해, 트랜스포머 분류 기법을 이용해 학습한다. 학습될 데이터는 다음과 같다. 

이 데이터는 페이스북 주식 종가, 개장가 등을 다운로드받은 FB_raw.csv 엑셀파일을 그래프화한 것이다. 데이터셋 크기는 160,681레코드이다. 실제 데이터 구조는 다음과 같다. 

데이터를 미리 다운로드한다.
알고리즘 순서는 다음과 같다.
  1. 주가 시계열 데이터 로딩
  2. 학습 및 테스트 데이터 생성
  3. 벡터 임베딩 함수 정의
  4. 트랜스포머 모델 정의: 포지션 인코딩, 인코더, 주가 예측을 위한 linear full connection 정의. 타겟 마스크 정의
  5. 데이터 학습
  6. 테스트 데이터 예측 결과 확인 
코딩을 위해, 파이썬 파일을 생선한다. 그리고, 다음과 같이 패키지를 임포트한다. 
import torch, torch.nn as nn
import numpy as np
import pandas as pd
import os, time, math
import matplotlib.pyplot as plt
from tqdm import tqdm
os.environ['KMP_DUPLICATE_LIB_OK']='True' # Intel Math Kernel Library use

학습될 데이터 형식을 설정한다. 학습 데이터는 10개, 예측 데이터는 1개이다. 
input_window = 10 # number of input steps
output_window = 1 # number of prediction steps, in this model its fixed to one
batch_size = 250
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") 

페이스북 데이터를 읽고, 그래프형식으로 보여준다. 
# get this python's directory
module_path = os.path.dirname(__file__)
df = pd.read_csv(module_path + '/FB_raw.csv') # data path of facebook stock price (Apr 2019 - Nov 2020)
close = np.array(df['close'])
logreturn = np.diff(np.log(close)) # Transform closing price to log returns, instead of using min-max scaler
csum_logreturn = logreturn.cumsum() # Cumulative sum of log returns

fig, axs = plt.subplots(2, 1)
axs[0].plot(close, color='red')
axs[0].set_title('Closing Price')
axs[0].set_ylabel('Close Price')
axs[0].set_xlabel('Time Steps')

axs[1].plot(csum_logreturn, color='green')
axs[1].set_title('Cumulative Sum of Log Returns')
axs[1].set_xlabel('Time Steps')

fig.tight_layout()
plt.show()

벡터 임베딩 계산을 위해, 트랜스포머의 포지션 인코딩을 정의한다.  
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()       
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]        

트랜스포머 기반 시계열 학습 모델을 정의한다. 트랜스포머 정의 그대로 정의되며, decoder부분만 다르다. 이 부분은 트랜스포머 결과를 받아, 예측값 1개를 생성하는 full connection 레이어다. 
class transformer_seq(nn.Module):
    def __init__(self, feature_size=250, num_layers=1, dropout=0.1):
        super(transformer_seq, self).__init__()
        self.model_type = 'Transformer'
        
        self.src_mask = None
        self.pos_encoder = PositionalEncoding(feature_size)
        self.encoder_layer = nn.TransformerEncoderLayer(d_model=feature_size, nhead=10, dropout=dropout)   
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)        
        self.decoder = nn.Linear(feature_size,1)
        self.init_weights()  # decoder FC층 가중치 초기화

    def init_weights(self):
        initrange = 0.1    
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self,src):
        if self.src_mask is None or self.src_mask.size(0) != len(src):  # 목표 마스크 생성
            device = src.device
            mask = self.generate_square_subsequent_mask(len(src)).to(device)
            self.src_mask = mask

        src = self.pos_encoder(src)
        output = self.transformer_encoder(src,self.src_mask)
        output = self.decoder(output)
        return output

    def generate_square_subsequent_mask(self, sz):  # 목표 마스크 생성 함수
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask        

학습될 데이터를 생성한다. 주가 데이터에서 input은 10개 수치, output은 1개 수치이다.
def create_inout_sequences(input_data, tw):
    inout_seq = []
    L = len(input_data)
    for i in range(L-tw):
        train_seq = input_data[i:i+tw]
        train_label = input_data[i+output_window:i+tw+output_window]
        inout_seq.append((train_seq ,train_label))
    return torch.FloatTensor(inout_seq)

학습될 데이터를 train, test dataset으로 분할하는 함수를 정의한다. 
def get_data(data, split):
    series = data
    
    split = round(split*len(series))
    train_data = series[:split]
    test_data = series[split:]

    train_data = train_data.cumsum()
    train_data = 2*train_data # 학습 데이터 값을 2배 증폭함으로써 학습 정확도를 높인다.
    test_data = test_data.cumsum()

    train_sequence = create_inout_sequences(train_data,input_window)
    train_sequence = train_sequence[:-output_window]

    test_data = create_inout_sequences(test_data,input_window)
    test_data = test_data[:-output_window]

    return train_sequence.to(device), test_data.to(device)

학습될 배치 데이터를 리턴하는 함수를 정의한다.
def get_batch(source, i, batch_size):
    seq_len = min(batch_size, len(source) - 1 - i)
    data = source[i:i+seq_len]    
    input = torch.stack(torch.stack([item[0] for item in data]).chunk(input_window, 1))
    target = torch.stack(torch.stack([item[1] for item in data]).chunk(input_window, 1))
    return input, target

학습 함수를 정의한다.
def train(train_data):
    model.train() # Turn on the evaluation mode
    total_loss = 0.
    start_time = time.time()

    for batch, i in enumerate(range(0, len(train_data) - 1, batch_size)):  # 배치크기만큼 루프
        data, targets = get_batch(train_data, i,batch_size)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, targets)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.7)
        optimizer.step()

        total_loss += loss.item()
        log_interval = int(len(train_data) / batch_size / 5)
        if batch % log_interval == 0 and batch > 0:  # 배치 루프 시 loss, 정확도 출력
            cur_loss = total_loss / log_interval
            elapsed = time.time() - start_time
            print('| epoch {:3d} | {:5d}/{:5d} batches | lr {:02.10f} | {:5.2f} ms | loss {:5.7f}'.format(
                    epoch, batch, len(train_data) // batch_size, scheduler.get_lr()[0], elapsed * 1000 / log_interval,
                    cur_loss))
            total_loss = 0
            start_time = time.time()

평가 함수를 정의한다.
def evaluate(eval_model, data_source):
    eval_model.eval() # Turn on the evaluation mode
    total_loss = 0.
    eval_batch_size = 1000
    with torch.no_grad():
        for i in range(0, len(data_source) - 1, eval_batch_size):
            data, targets = get_batch(data_source, i, eval_batch_size)
            output = eval_model(data)            
            total_loss += len(data[0])* criterion(output, targets).cpu().item()
    return total_loss / len(data_source)

학습 모델 기반 데이터 예측 함수를 정의한다. 
def model_forecast(model, seqence):
    model.eval() 
    total_loss = 0.
    test_result = torch.Tensor(0)    
    truth = torch.Tensor(0)

    seq = np.pad(seqence, (0, 3), mode='constant', constant_values=(0, 0))
    seq = create_inout_sequences(seq, input_window)
    seq = seq[:-output_window].to(device)

    seq, _ = get_batch(seq, 0, 1)
    with torch.no_grad():
        for i in range(0, output_window):            
            output = model(seq[-output_window:])                        
            seq = torch.cat((seq, output[-1:]))

    seq = seq.cpu().view(-1).numpy()
    return seq

실 데이터를 이용한 데이터 예측 함수를 정의한다.
def forecast_seq(model, sequences):
    """Sequences data has to been windowed and passed through device"""
    start_timer = time.time()
    model.eval() 
    forecast_seq = torch.Tensor(0)    
    actual = torch.Tensor(0)
    with torch.no_grad():
        for i in tqdm(range(0, len(sequences) - 1)):
            data, target = get_batch(sequences, i, 1)
            output = model(data)            
            forecast_seq = torch.cat((forecast_seq, output[-1].view(-1).cpu()), 0)
            actual = torch.cat((actual, target[-1].view(-1).cpu()), 0)
    timed = time.time()-start_timer
    print(f"{timed} sec")

    return forecast_seq, actual

학습 데이터를 준비한다.
train_data, val_data = get_data(logreturn, 0.6) # 60% train, 40% test split

모델, Loss함수, 하이퍼파라메터, 스케쥴 등을 정의한다.
model = transformer_seq().to(device)
criterion = nn.MSELoss() # Loss function
lr = 0.00005 # learning rate
epochs =  500 # Number of epochs

optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)

for epoch in range(1, epochs + 1):
    epoch_start_time = time.time()
    train(train_data)
    
    if(epoch % epochs == 0): # 에폭마다 모델 평가
        val_loss = evaluate(model, val_data)
        print('-' * 80)
        print('| end of epoch {:3d} | time: {:5.2f}s | valid loss: {:5.7f}'.format(epoch, (time.time() - epoch_start_time), val_loss))
        print('-' * 80)
    else:   
        print('-' * 80)
        print('| end of epoch {:3d} | time: {:5.2f}s'.format(epoch, (time.time() - epoch_start_time)))
        print('-' * 80)

    scheduler.step() 

학습 후 모델 이용해 데이터 예측하고, 실제 데이터와 비교한다.
test_result, truth = forecast_seq(model, val_data)

plt.plot(truth, color='red', alpha=0.7)
plt.plot(test_result, color='blue', linewidth=0.7)
plt.title('Actual vs Forecast')
plt.legend(['Actual', 'Forecast'])
plt.xlabel('Time Steps')
plt.show()

테스트를 위해 랜덤값을 이용해 비교해 본다. 
r = np.random.randint(100000, 160000)
test_forecast = model_forecast(model, csum_logreturn[r: r+10]) # random 10 sequence length

print(f"forecast sequence: {test_forecast}")
print(f"Actual sequence: {csum_logreturn[r: r+11]}")
torch.save(model.state_dict(), "transformer_ts_20231211.pth")

학습 파일을 로딩한 후, 다른 시계열 데이터셋도 테스트해본다. 
model2 = TransAm() # rename as model2
model2.load_state_dict(torch.load("transformer_ts_20231211.pth"))
model2.to(device)

df2 = pd.read_csv(module_path + '/BA_raw.csv') # 보잉 주식 테스트
close2 = df2['close'].fillna(method = 'ffill')
close2 = np.array(close2)
logreturn2 = np.diff(np.log(close2))

train_data2, val_data2 = get_data(logreturn2, 0.6)
test2_eval = evaluate(model2, val_data2)
print(f'Test 2 loss: {test2_eval :.5f}')

test_result2, truth2 = forecast_seq(model2, val_data2)

plt.plot(truth2, color='red', alpha=0.7)
plt.plot(test_result2, color='blue', linewidth=0.7)
plt.title('Actual vs Forecast')
plt.legend(['Actual', 'Forecast'])
plt.xlabel('Time Steps')
plt.show()

df3 = pd.read_csv(module_path + '/JPM_raw.csv') # JPMorgan Chase & Co 주식 테스트
close3 = df3['close'].fillna(method = 'ffill')
close3 = np.array(close3)
logreturn3 = np.diff(np.log(close3))

train_data3, val_data3 = get_data(logreturn3, 0.6)
test3_eval = evaluate(model2, val_data3)
print(f'Test 3 loss: {test3_eval :.5f}')

test_result3, truth3 = forecast_seq(model2, val_data3)

plt.plot(truth3, color='red', alpha=0.7)
plt.plot(test_result3, color='blue', linewidth=0.7)
plt.title('Actual vs Forecast')
plt.legend(['Actual', 'Forecast'])
plt.xlabel('Time Steps')
plt.show()

결과는 다음과 같이, 잘 학습되어, 데이터셋을 예측하고 있는 것을 알 수 있다. 
학습 과정 화면
테스트 데이터셋 예측 결과(페이스북 주가)

전혀 다른 주식 종목도 예측해 보자. 다음은 보잉 주가이다. 
보잉 주가 예측
다음은 제이슨 모건 주식이다. 
제이슨 모건 주가 예측

결과적으로 학습 모델이 패턴을 잘 예측한다. 

레퍼런스