2024년 4월 26일 금요일

공간정보 GIS 기반 IoT 데이터 분석 스타일 데쉬보드 만들고 서비스해보기

이 글은 Django, Bootstrap을 사용해 GIS 기반 IoT 데이터 분석 스타일의 데쉬보드 개발방법을 간략히 정리하고, 개발 방법 후 서비스하는 방법을 보여준다. 이를 통해, 공간정보 기반 IoT 장비를 하나의 데쉬보드로 관리하고, 분석하는 것이 가능하다. 여기서 공간정보는 GIS, BIM, 3D Point cloud data 와 같이 공간상 좌표로 표현되는 모든 정보를 말한다. 상세한 구현 소스 코드는 본 글의 Github링크를 참고할 수 있다. 
IoT 데쉬보드 web app 서버 실행 (example)

이 글을 통해, 다음 내용을 이해할 수 있다. 
  • 부트스트랩 데쉬보드 UI 라이브러리 사용법
  • 장고의 데이터 모델과 웹 UI 간의 연계 방법
  • GIS 맵 가시화 및 이벤트 처리
  • 실시간 IoT 데이터에 대한 동적 UI 처리 방법
이 글의 독자는 장고, 부트스트랩, GIS, IoT 의 기본 개념은 알고 있다고 가정한다. 

요구사항 디자인
다음과 같은 목적의 웹앱 서비스를 가정한다. 
  • GIS 기반 센서 위치 관리
  • IoT 데이터셋 표현
  • IoT 장치 관리
  • IoT 장치 활성화 관리 KPI 표현
  • 계정 관리
  • 기타 메뉴 
이러한 정보를 보여줄 수 있는 데쉬보드 웹앱을 디자인한다. 이 예제는 데쉬보드 레이아웃을 가진 웹앱 프레임 개발 방법을 보여주는 것에 집중한다. 세부 비지니스 로직 및 데이터베이스 모델은 다른 페이지를 참고한다.
개발환경 준비
개발도구
개발에 필요한 도구는 다음으로 한다.
  • UI: bootstrap
  • 웹앱 프레임웍: DJango
  • GIS: leaflet, Cesium
  • 데이터소스: sqlite, spreadsheet, mongodb
구현된 상세 소스코드는 다음을 참고한다.
장고 기반 웹앱 프로젝트 생성
장고(Django)는 파이썬으로 작성된 고수준의 웹 프레임워크로, 웹 애플리케이션 개발을 빠르고 쉽게 할 수 있도록 도와준다. 장고는 "The web framework for perfectionists with deadlines"라는 슬로건을 가지고 있으며, 많은 기능을 내장하고 있어 개발자가 반복적인 작업을 줄이고 핵심 기능에 집중할 수 있도록 한다.

다음과 같이 장고 웹앱 프로젝트를 생성한다. 
python -m venv myenv
source myenv/bin/activate  
pip install django pandas
django-admin startproject iot_dashboard
cd iot_dashboard
python manage.py startapp dashboard

생성된 프로젝트 폴더 구조는 다음과 같다.

디자인 스타일 고려사항
부트스랩 레이아웃 표현
부트스트랩(Bootstrap)은 웹 개발에서 널리 사용되는 프론트엔드 프레임워크로 주로 HTML, CSS, JavaScript로 작성되어 있다. 트위터의 개발자들에 의해 처음 만들어졌으며, 웹 애플리케이션의 개발 속도를 높이고, 반응형 디자인을 쉽게 구현할 수 있도록 도와준다. 

부트스랩의 그리드 시스템은 12개 열로 디자인된다. 이는 유연성과 사용 편의성을 제공하기 위한 디자인 결정이다. 반응형 웹사이트를 구축하는 데 많이 사용된다. 

참고로, 12라는 숫자는 많은 약수(1, 2, 3, 4, 6, 12)를 갖고 있어 다양한 열의 조합으로 균등하게 나눌 수 있다. 이를 통해 분수나 번거로운 나머지 없이 다양한 레이아웃을 만들 수 있다.
  • 유연성: 12개의 열을 사용하면 다양한 화면 크기와 디바이스에 적합한 레이아웃을 쉽게 만들 수 있다. 각 요소가 차지하는 열의 수를 조정하여 대형 데스크톱 화면, 태블릿 및 스마트폰에서 잘 보이는 반응형 디자인을 만들 수 있다.
  • 이해하기 쉬움: 12개의 열을 기반으로 한 그리드 시스템은 디자이너와 개발자들에게 직관적이다. 그리드 내에서 요소들이 어떻게 동작할지 시각화하고 계산하기 쉽기 때문에 일관된 레이아웃을 생성하고 유지하기가 간단하다.
  • 디자인 관행: 12개의 열을 사용하는 그리드 시스템은 부트스트랩 이전부터 다양한 그래픽 디자인 및 레이아웃 소프트웨어에서 사용되어 왔다. 
데쉬보드 카드 스타일
데쉬보드에 컨텐츠를 담을 패널을 카드 스타일로 표현할 수 있다. 카드 내에 차트 뿐 아니라 지도 등 그래픽도 표시할 수 있다. 

보통, 일반적인 카드 스타일 구조는 다음과 같다. 
            <div class="row">
               <div class="col-lg-8">
                   <div class="card mb-3">
                       <div class="card-header">
                           <i class="fa fa-map"></i> Leaflet Map
                       </div>
                       <div class="card-body">
                           <div id="leafletMap" style="width:100%; height: 450px"></div>
                       </div>
                   </div>
               </div>
           </div>     

이 코드는 행 스타일 안에 가변 8개 컬럼을 차지(col-lg-8)하고, 중간 수준 마진(card mb-3)를 가지는 카드를 생성한다. 카드는 헤더(card_header)와 본체(card-body)를 가지며, 헤더는 아이콘(<i>) 스타일의 Font Awesome의 map 아이콘을 사용한다. body 내에는 디스플레이할 위젯을 표시할 <div>를 정의한다.

부트스랩은 이와 같은 style tag가 있어, 다양한 UI를 손쉽게 정의할 수 있다. 자세한 내용은 다음을 참고한다.
개발
주요 구현부분만 표현한다. 상세 구현 코드는 앞의 github 링크를 참고한다.

데이터소스 모델 연결 및 차트 표시
본 예시는 데쉬보드 앱 디자인 및 개발 과정을 보여주는 것이므로, 간단한 iot sample dataset을 다음과 같이 models.py에 개발해 놓는다. 
def IoT_model_from_file():
    save_sample_iot_dataset()
    df = pd.read_csv('iot_dataset_sample.csv')
    json_dict = df.to_dict('records')
    return json_dict

def save_sample_iot_dataset():
    # Create a DataFrame
    df = pd.DataFrame({
        'time': [datetime.now() - timedelta(days=i) for i in range(10)],
        'open': [randint(100, 200) for _ in range(10)],
        'high': [randint(200, 300) for _ in range(10)],
        'low': [randint(50, 100) for _ in range(10)],
        'close': [randint(100, 200) for _ in range(10)],
    })

    # Convert the 'time' column to a string in the format 'YYYY-MM-DD'
    df['time'] = df['time'].dt.strftime('%Y-%m-%d')

    # Save the DataFrame to a CSV file
    df.to_csv('iot_dataset_sample.csv', index=False)

IoT 센서 실시간 데이터 표시
특정 카드 내 차트에 실시간으로 데이터를 표현하기 위해서, 장고에서는 html script > view > model 업데이트 과정을 거쳐야 한다. 이 경우는 3초마다 센서 데이터를 화면에 업데이트한다고 가정한다. 이를 위해, 데이터가 준비되면 렌더링될 수 있도록 비동기 처리 요구 방식을 사용한다.

index.html의 script부분에 아래 코드를 추가한다. 
         setInterval(fetchColumnData, 3000); // 3초마다 업데이트
         function fetchColumnData() {
            var xhr = new XMLHttpRequest();
            xhr.open('GET', '/charts?chartType=column', true);
            xhr.onreadystatechange = function () {
               if (xhr.readyState == 4 && xhr.status == 200) {  // 비동기. 데이터 준비 시 호출
                     var columnData = JSON.parse(xhr.responseText);
                     columnChart.options.data[0].dataPoints = columnData;
                     columnChart.render();  // 값을 차트에 업데이트

                     var columnChart_ready = document.getElementById('columnChart_ready');
                     var columnChart_operation = document.getElementById('columnChart_operation');
                     var columnChart_shutdown = document.getElementById('columnChart_shutdown');
                     columnChart_ready.innerHTML = columnData[0].y;
                     columnChart_operation.innerHTML = columnData[1].y;
                     columnChart_shutdown.innerHTML = columnData[2].y;
               }
            };
            xhr.send();
         }

참고로, 이러한 방식은 많은 CPU 부하를 차지한다. 다른 대안으로, 다음처럼 animation loop를 사용하는 방식이 있다. 

         let lastTime = 0;
         const interval = 3000;
         function animationLoop(timestamp) {
               if (timestamp - lastTime >= interval) {
                  fetchColumnData();
                  lastTime = timestamp;
               }
               requestAnimationFrame(animationLoop);
         }
         requestAnimationFrame(animationLoop);         

이 또한 성능 문제가 있다면, 실시간 업데이트 기능이 필요한 사용자만 사용할 수 있도록 페이지를 분리하거나, 별도 UI 앱을 개발하자.

이외, Leaflet(리플렛), Cesium(세슘) 라이브러리를 이용해 2차원, 3차원 화면을 표시한다. 세귬은 미리 API 사용 토큰을 발급받아야 제대로 동작된다. 
Cesium.Ion.defaultAccessToken = 'your_access_token';

실행 결과
앞의 디자인 목적을 고려한 데쉬보드 실행 결과는 다음과 같다.

번들(buddle)로 컴파일된 자바스크립트 라이브러리 중 하나인 ifc.js의 viewer를 card-body의 id와 연결하여, 다음과 같이 3D 모델을 표현할 수 있다. Cesium도 동일한 방식으로 렌더링될 수 있다.


웹 서비스 배포 및 호스팅
호스팅 방법은 다양하다. 여기에선 최근 인기가 높아지고 있는 cloudtype을 사용해 배포한다.
cloud server setting 모습

배포에 성공하면, 다음과 같이 외부에서도 웹 서비스에 접속 실행될 것이다. 
실행 화면

스마트폰에서 접속하면, bootstrap layout style에 따라 패널이 잘 정렬되어 보여지는 것을 확인할 수 있다.
 
스마트 폰 실행 모습

이외에 유용한 배포 호스팅 서버로 python anywhere, amazon free tier 등이 있다. 아래 링크를 참고한다.
마무리
요즘에는 좋은 개발 도구와 라이브러리가 워낙 많아, 프론트앤드, 백앤드, 미들웨어 스택을 만들때 조합하기가 오히려 혼란스러운 점이 있다. 장고, 부트스랩 등을 이용하면 기본적인 프레임웍에서 이들을 조합할 수 있어, 개발이 편해진다. 

GIS, BIM, 3D Point cloud data 관련 유명한 라이브러리는 CDN 스토리지를 지원하고 있어, import해 사용하기 편하다. 다만, 이런 공간정보 라이브러리는 많은 리소스를 차지하므로, 성능 최적화를 고려해야 사용할 필요가 있다. 

추신. 2005년에 개발된 장고는 저널리즘을 전공한 Adrian Holovaty의 주도로 Joscho Stephan과 함께 개발되었다. 그들은 올바른 저널리즘과 기타 연주를 사랑한다.  
Django 개발자 Joscho Stephan, Adrian Holovaty (patrus)

레퍼런스

2024년 4월 21일 일요일

LLM 모델 통합과 다양한 데이터소스를 지원하는 LangChain 아키텍처 및 동작 메커니즘 분석하기

이 글은 해리슨 체이스가 개발한 오픈소스인 랭체인(LangChain) 아키텍처와 동작 방법을 분석한다. 현재 개발자들 사이에 대중적으로 사용되고 있는 랭체인은 LLM 모델 통합과 다양한 데이터소스를 지원하여, LLM모델의 활용성을 극대화한다. 이 글을 통해, LLM 서비스 개발에 필요한 랭체인의 아키텍처와 동작 원리를 이해할 수 있다.  


LLM 용어와 상세 개념은 다음 링크를 참고한다. 

소개
Langchain(랭체인)은 LLM에 원하는 결과를 얻을 수 있도록, 다양한 프롬프트 입력 및 구조화된 출력, RAG, 튜닝과 같은 기능을 제공하는 라이브러리다. 랭체인 설치는 다음과 같다(참고). 
pip install langchain

다음 장에서는 랭체인의 기본 개념인 프롬프트, 메모리, 에이전트를 간단히 설명하고, 실습을 하도록한다.

작동 개념
목표
랭체인은 LLM이 다양한 작업에 사용할 수 있는 방법을 제공한다. 예를 들어, LLM은 분류, QA, 차트 해석, 그림 생성 등 다양한 곳에 사용될 수 있다. 랭체인은 이를 체계적으로 연결해 실행할 수 있다.
랭체인 작업 중 일부(Pinecone)

프롬프트
프롬프트는 보통 LLM에 대한 명령, 컨텍스트로 구성된다. 컨텍스트는 외부 정보를 의미한다. 그리고, LLM에 질문한다. 이 3개 구조는 다음 그림과 같다.
프롬프트 템플릿 구조(명령, 컨텍스트, 질문과 답변. Pinecone)

명령들: 모델에게 무엇을 하라고 지시함. 어떤 외부 정보가 있고, 이를 이용해 어떻게 출력을 생성하는 지 지시함.

컨텍스트: 외부 정보이며, 프롬프트에 의해 수동 입력, 벡터데이터베이스 검색 혹은 API 등 Tool 에 의해 삽입됨. 

질의: 사용자에 의해 입력되는 질문. 

출력 지시자: 만약 생성되는 텍스트의 첫번째에 나와야 할 것을 지시함. 예를 들어, 파이썬 코드는 반듯이 import 문장으로 시작되어야 함.  

랭체인의 프롬프트 템플릿은 동적으로 사용자 입력에 의해 생성될 수 있다. 
template = """Answer the question based on the context below. If the
question cannot be answered using the information provided answer
with "I don't know".

Context: Large Language Models (LLMs) are the latest models used in NLP.
Their superior performance over smaller models has made them incredibly
useful for developers building NLP enabled applications. These models
can be accessed via Hugging Face's `transformers` library, via OpenAI
using the `openai` library, and via Cohere using the `cohere` library.

Question: {query}

Answer: """

prompt_template = PromptTemplate(
    input_variables=["query"],
    template=template
)

메모리
대화형 메모리는 채팅과 같은 방식으로 질의에 응답할 수 있도록 한다. 이를 통해, 일관된 대화가 가능하다. 
대화형 메모리 역할(좌: 메모리 있음. 우: 메모리 없음)

메모리는 ConversationChain을 통해 적용된다. 
conversation = ConversationChain(llm=llm, memory=ConversationBufferMemory())

이 체인에 내장된 conversation.prompt.template 프롬프트 템플릿을 출력하면, {history}가 포함된 것을 알 수 있다. history는 지금까지 LLM과 주고 받은 질답의 리스트로 입력될 것이다. 

history와 같은 메모리는 LLM에 컨텍스트와 함께 입력되므로, LLM의 시퀀스 토큰 최대 크기만큼만 기억이 가능하다(보통 4k). 다음 함수를 이용해 얼마의 토큰이 사용되었는지 알 수 있다.
from langchain.callbacks import get_openai_callback
def count_tokens(chain, query):
    with get_openai_callback() as cb:
        result = chain.run(query)
        print(f'Spent a total of {cb.total_tokens} tokens')
    return result

count_tokens(
    conversation_buf, 
    "My interest here is to explore the potential of integrating Large Language Models with external knowledge"
)

메모리를 사용하면, 과도한 토큰 사용량으로 인한 비용증가가 발생한다. 그러므로, 이를 요약해 사용하는 ConverationSummaryMemory로 요약한 것을 LLM에 전달할 수도 있다.
conversation = ConversationChain(
llm=llm,
memory=ConversationSummaryMemory(llm=llm)
)

이 경우, 프롬프트 템플릿에는 {summary} 변수가 포함되고, 메모리 요약이 LLM호출시 입력된다. 그럼에도 불구하고, 모든 대화를 요약한다는 것은 비효율적이다. ConversationSummaryBufferMemory 를 이용하면, k개 만큼 대화만 요약해 메모리로 사용한다.

지식 베이스 저장과 검색
LLM에서 학습되지 않은 지식은 외부에서 공급되어야 한다. 이를 위해 벡터데이터베이스에 미리 지식을 저장해 놓거나, Tool을 사용해 지식을 얻을 수 있다. 너무 큰 지식을 LLM에 전달하면 시퀀스 최대 토큰 크기를 넘어가므로, 지식 문서는 RecursiveCharacterTextSplitter 등을 이용해 청크란 작은 단위로 분해한다. 

질문에 대한 지식 검색을 위해, 임베딩 벡터를 이용한다. 이를 이용해 질문에 가장 가까운 의미를 가진 임베딩 벡터를 얻을 수 있다. 임베딩 벡터는 지식을 가리키고 있어, 이를 LLM에 입력해 원하는 답을 얻는다.
텍스트 임베딩 예시(link)

이 지식을 프롬프트 템플릿에 포함해 LLM에 질의하기 위해 RetrievalQA를 사용할 수 있다.
qa = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever()
)

이 경우, 환각이 포함된 답변을 얻을 수 있어, RetrievalQAWithSourcesChain를 이용해 참조 소스를 포함해 답을 얻을 수 있다.

에이전트
ChatGPT 4버전까지는 문맥에 맞는 결과를 생성하지 못할 때가 있었다. 예를 들어, 3.14159 * 3.14159를 계산한다던지, 같은 용어를 사용하지만 새로운 개념일 때가 그렇다. 문맥을 고려하지 않고 확률적으로 그럴듯한 답을 생성하는 것을 환각현상이라 하는 데, 이를 에이전트 기술을 이용해 제어할 수 있다.
에이전트 개념

에이전트는 LLM을 위한 도구(tool)이다. 에이전트를 사용하면, 파이썬 코드를 작성하고 실행할 수 있고, SQL 질의도 가능하다. 

에이전트는 다음 같은 종류가 있다. 
  • zero-shot-react-description: 에이전트와 단일 상호작용을 하며 메모리는 없음
  • conversational-react-description: 이전 제로샷 리액트와 동일하지만 대화 메모리를 지원. 이 프롬프트는 {chat_history}가 포함됨.
  • react-docstore: 랭체인의 docstore를 사용해 정보 검색 및 조회 수행.
  • self-ask-with-search: 다단계 질문을 하여, 그 결과를 모아 추론 수행.
LCEL 언어
앞에서 설명한 바와 같이 랭체인은 모델 입출력, 데이터 검색, 에이전트 지원, 체인, 컨텍스트 메모리 기능을 제공한다. 이를 효과적으로 연결해 호출하도록 랭체인은 LCEL(LangChain Expression Language)를 지원한다. 
LCEL를 이용해 각 구성요소를 유기적으로 연결시킬 수 있다. LCEL은 유닉스 파이프라인 개념을 차용했다. 다음은 LCEL 예시를 보여준다. 
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema import BaseOutputParser

# LCEL 예시
chain = ChatPromptTemplate() | ChatOpenAI() | CustomOutputParser()

이와 더불어, 목적에 맞는 다양한 프롬프트 템플릿, 구조화된 출력을 제공한다.
from langchain.output_parsers.json import SimpleJsonOutputParser

json_prompt = PromptTemplate.from_template(
    "Return a JSON object with `birthdate` and `birthplace` key that answers the following question: {question}"
)
json_parser = SimpleJsonOutputParser() # JSON 파서

# 프롬프트, 모델, 파서 체인 생성
json_chain = json_prompt | model | json_parser  # 유닉스 파이프라인 개념 차용함.

result_list = list(json_chain.stream({"question": "When and where was Elon Musk born?"}))
print(result_list)

마치 유닉스 파이프라인과 같은 이 언어는 Runnables 클래스에서 파생받아 각 단계를 연결할 수 있도록 한다.

다음은 그 예시를 보여준다. 
from langchain_core.runnables import RunnableLambda

def add_five(x):
    return x + 5

def multiply_by_two(x):
    return x * 2

# wrap the functions with RunnableLambda
add_five = RunnableLambda(add_five)
multiply_by_two = RunnableLambda(multiply_by_two)

chain = add_five | multiply_by_two
chain.invoke(3)

LCEL 언어 동작 구조
이 중에 핵심적인 것만 분석해 본다. 우선, LCEL의 동작 방식을 위해 어떤 디자인패턴을 구현하였는 지 확인한다. 이 부분은 runnables 패키지가 담당한다. 이 언어는 유닉스의 파이프라인 처리를 다음과 같이 흉내낸다. 
   z = a | b | c
   z.stream('abc')

이를 위해, 파이썬 문법을 적극 사용하고 있다. 우선 '|' 연산자를 오버로딩(overloading)하기 위해, 파이썬 Runnable 클래스를 정의해 __or__ 연산자를 구현한다. 이 연산자는 self object와 right object 두 객체를 입력받아 list를 만든 후 리턴하는 역할을 한다. 앞의 예시에서 보면, 'a | b'를 실행가능한 객체 리스트로  만들어 리턴한다. 결론적으로 a, b, c객체를 리스트로 만들고, 이 리스트를 z에 할당한다.
z의 stream()이 호출되면, a, b, c를 각각 invoke() 하여 실행하는 방식이다. 그러므로, 다음 그림과 같이, LCEL를 지원하는 객체는 Runnable을 파생받아야 하며, stream, invoke와 같은 주요 함수를 구현해야 한다. 
각 주요 클래스 함수의 동작방식을 좀 더 자세히 살펴본다. LCEL 파이프라인을 구현하는 __or__()함수는 다음과 같이 입력된 객체를 Sequence 형식의 steps 리스트로 담아두는 역할을 한다. 
class Runnable():
    ...
    def __or__(self, other: Union[Runnable[Any, Other], Callable[[Any], Other], ...]):
        return RunnableSequence(self, coerce_to_runnable(other))

    def stream(self, input: Input, config, **kwargs: Optional[Any]) -> Iterator[Output]:
        yield self.invoke(input, config, **kwargs)   # steam 호출 시 invoke 함수 호출

    def invoke(self, input: Input, config: Optional[RunnableConfig] = None):
        callback_manager = get_callback_manager_for_config(config)
        run_manager = callback_manager.on_chain_start(dumpd(self), input)
        for i, step in enumerate(self.steps):
            input = step.invoke(input)  # steps 리스트의 객체 invoke() 호출 후 input 갱신

invoke()가 호출되면, 담아둔 steps 리스트의 객체를 각각 invoke()한다. 이런 방식으로 파이프라인을 구현하고 있다.

프롬프트 템플릿 처리 방식
JSON과 같이 구조화된 출력을 생성할 때는 ...Template 이름을 가진 클래스가 create_template_from_message_type() 함수를 구현한다. 그 과정을 확인해 보자.

def create_template_from_message_type(message_type, template, template_format):
    if message_type in ("human", "user"):  # human, user 일 경우, human template 생성
        message =HumanTemplate.from_template(template, template_format)
    elif message_type in ("ai", "assistant"): # ai 일 경우, AI template 생성
        message = AIMessagePromptTemplate.from_template(template, template_format)
    elif message_type == "system":
        message = SystemMessagePromptTemplate.from_template(...)
    elif message_type == "placeholder":   # 변수 형태로 출력할 경우, placeholder 생성
        if isinstance(template, str):
            var_name = template[1:-1]
            message = MessagesPlaceholder(variable_name=var_name, optional=True)
        elif len(template) == 2 and isinstance(template[1], bool):
            var_name = var_name_wrapped[1:-1]
            message = MessagesPlaceholder(variable_name=var_name, optional=is_optional)
 
프롬프트 템플릿에 따라 적절한 템플릿 객체를 생성해 처리한다. invoke() 했을 때, input에 대한 각각 적합한 프롬프트를 출력한다. 

변수가 담긴 출력을 위해서는 메시지에서 변수를 예측해야 한다. 이 부분은 from_messages_and_schema()가 담당한다. 
def from_messages_and_schema(messages, schema):
        # 자동적으로 메시지로 부터 변수를 추정함
        input_vars: Set[str] = set()
        partial_vars: Dict[str, Any] = {}
        for _message in _messages:
            if isinstance(_message, MessagesPlaceholder) and _message.optional:
                partial_vars[_message.variable_name] = []
            elif isinstance(
                _message, (BaseChatPromptTemplate, BaseMessagePromptTemplate)
            ):
                input_vars.update(_message.input_variables)  # 변수값 업데이트

출력 텍스트에서 구조화된 출력을 위해, 변수와 변수 유형을 검증해야 한다. 이 기능은 PyDantic을 이용해 처리한다. 

앞서 정의된 클래스는 LLM에 입력할 프롬프트를 작성하고, 출력된 결과를 파싱하는 것에 초점을 맞추고 있다.

LLM 모델 호출과 임베딩 데이터베이스 처리
LLM 처리 부분은 llm.py, llms.py 소스에 구현되어 있다. Chain은 전문가가 질문에 답변하며 정보를 업데이트하는 연속된 질의를 담당한다. BaseLLM은 LLM 이 호출될 수 있도록 일반화한 클래스이다. invoke()가 호출되면 프롬프트, 입력과 함께 _agenerate()이 호출된다. 이 함수는 실제 설치된 라마와 같은 LLM혹은 OpenAI의 ChatGPT API를 호출해 출력을 생성하는 역할을 한다. 
class LLMChain(Chain):
    async def _agenerate(self, prompts: List[str], ..., **kwargs: Any) -> LLMResult:

class BaseLLM():
    async def _agenerate(self, prompts: List[str], ..., **kwargs: Any) -> LLMResult:
        return await run_in_executor(None, self._generate, prompts, stop, ..., **kwargs)

LangChain은 임베딩 벡터 데이터베이스를 지원함으로써 LlamaIndex와 함께 RAG를 처리한다. 이를 통해, 환각 현상을 줄여주고, 전문적인 응답이 가능하도록 한다. 이 부분은 VectorStore 기본 클래스에서 담당한다. LangChain은 Chroma 등 다양한 데이터베이스와 임베딩 벡터 간 유사도 계산을 지원한다. 벡터 유사도 검색을 통해, 질문의 배경이 되는 데이터를 미리 LLM에게 전달해준다. 


다음은 임베딩 벡터 데이터베이스의 유사도 검색 함수 구현 일부를 보여준다.
def similarity_search_with_score_by_vector(self, embedding: List[float], k: int = 4, filter: Optional[dict] = None) -> List[Tuple[Document, float]]:
        filter_condition = ""
        if filter is not None:  # 필터의 key, value, conditions을 얻어 SQL 문을 정의함
            conditions = [
                f"metadata->>{key!r} = {value!r}" for key, value in filter.items()
            ]
            filter_condition = f"WHERE {' AND '.join(conditions)}"

        # 쿼리문 생성
        sql_query = f"""
            SELECT *, l2_distance(embedding, :embedding) as distance
            FROM {self.collection_name}
            {filter_condition}
            ORDER BY embedding <-> :embedding
            LIMIT :k
        """

        # 임베딩 검색 파라메터 설정
        params = {"embedding": embedding, "k": k}

        # 쿼리 질의 및 fetch 후 결과 획득
        with self.engine.connect() as conn:
            results: Sequence[Row] = conn.execute(text(sql_query), params).fetchall()

        # 출력 형태로 반환
        documents_with_scores = [
            (
                Document(
                    page_content=result.document,
                    metadata=result.metadata,
                ),
                result.distance if self.embedding_function is not None else None,
            )
            for result in results
        ]
        return documents_with_scores

임베딩 벡터 간 유사도 검색은 cosine 함수, kNN 등 다양한 것이 사용될 수 있다. 

이외에도 에이전트는 HuggingGPT 등을 이용해 멀티모달 도구를 지원하게 할 수도 있다.

좀 더 다양한 에이전트 예시는 다음 링크를 참고한다.

LLamaIndex 구조 분석
LLamaIndex(라마 인덱스)는 LLM 클라우드에서 원하는 LLM을 검색하여, 모델을 다운로드받고, 사용하기 전까지 필요한 단계를 자동화한 라이브러리이다. 사용자 데이터 소스를 LLM에 연결하는 방법을 손쉽게 할 수 있도록 한다. 

LLamaIndex는 RAG 애플리케이션에 대한 벡터 임베딩을 로드, 준비하는 데 도움이 되는 데이터 커넥터, 인덱스 및 쿼리 엔진과 같은 여러 도구를 제공한다. LLamaIndex는 다음 그림과 같이 RAG에 대한 지식 베이스를 생성하고, 이를 LLM에게 전해주는 기능을 구현하고 있다.
LLM과 지식베이스 관계

예를 들어, LLamaIndex는 사용자 데이터인 pdf documents를 load, parse, index, query하는 API를 제공한다. 여기서 index는 임베딩 벡터로 데이터 변환을 지원함을 의미한다. 
LlamaIndex와 RAG 처리 방식

다음은 langchain과 함께 사용한 llama_index의 동작 방식을 보여준다. 

from langchain import Chain
from llama_index import LLaMAIndex

class ProcessingChain(Chain):  # LLM 체인 생성
    def __init__(self):
        super().__init__()

    def _call(self, inputs):
        return {"output": f"Processed: {inputs['input']}"}  # 이해를 위해, 간단한 LLM 출력 가정

processing_chain = ProcessingChain()  # LLM 체인 생성

index = LLaMAIndex()  # 라마 인덱스 생성
# 문서 정의. 예를 들어, PDF, Image등 임베딩 처리될 수 있는 것이면 어떤 데이터도 가능
documents = [
    {"id": 1, "content": "LangChain is a framework for building applications with large language models."},
    {"id": 2, "content": "LLaMAIndex is used for indexing and querying text data."}

# 문서들 인덱싱 처리
for doc in documents:
    processed_content = processing_chain.run({"input": doc["content"]})["output"]
    index.add_document(doc["id"], processed_content)  # 인덱스로 추가

query = "What is LangChain?"  # 질의함
results = index.query(query)     # 결과 리턴

print("Query results:")  # 결과 출력
for result in results:
    print(result)

에이전트 개발 실습
계산기 에이전트를 개발해 보기 위해 간단한 실습 코드를 준비하였다. 

에이전트 개발을 위해 먼저 에이전트가 사용할 도구들을 만든다.
from langchain import OpenAI

llm = OpenAI(
    openai_api_key="OPENAI_API_KEY",
    temperature=0,
    model_name="text-davinci-003"
)

from langchain.chains import LLMMathChain
from langchain.agents import Tool

llm_math = LLMMathChain(llm=llm)

# initialize the math tool
math_tool = Tool(
    name='Calculator',
    func=llm_math.run,
    description='Useful for when you need to answer questions about math.'
)
# when giving tools to LLM, we must pass as list of tools
tools = [math_tool]

이제 에이전트를 앞의 도구를 이용해 생성한다. 여기서 zero-shot 의 의미는 에이전트가 현재 동작에 대해서만 작동되며 메모리는 사용하지 않음을 의미한다. react 프레임웍을 사용해 도구 설명 만을 기반으로 사용할 도구를 결정하게 된다. 
from langchain.agents import initialize_agent

zero_shot_agent = initialize_agent(
    tools=tools,
    llm=llm,
    verbose=True,
    max_iterations=3
)

이제 다음과 같이 제대로 에이전트가 작업을 수행하는 지 테스트해보자. 
zero_shot_agent("what is (4.5*2.1)^2.2?")
zero_shot_agent("if Mary has four apples and Giorgio brings two and a half apple boxes (apple box contains eight apples), how many apples do we have?")

제대로 실행되겠지만, 다음과 같은 질문을 하면 실패할 것이다. 
zero_shot_agent("what is the capital of Norway?")

우리는 단 하나의 계산기 도구만 있으므로, 랭체인에 이미 만들어 놓은, 언어를 해석해 논리적으로 추론하는 도구를 추가해 넣는다. 
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

prompt = PromptTemplate(
    input_variables=["query"],
    template="{query}"
)

llm_chain = LLMChain(llm=llm, prompt=prompt)

# initialize the LLM tool
llm_tool = Tool(
    name='Language Model',
    func=llm_chain.run,
    description='use this tool for general purpose queries and logic'
)

tools.append(llm_tool)

# reinitialize the agent
zero_shot_agent = initialize_agent(
    agent="zero-shot-react-description",
    tools=tools,
    llm=llm,
    verbose=True,
    max_iterations=3
)

이런 방식으로 에이전트를 확장해 나갈 수 있다.

사용자가 도구를 직접 만들 수도 있다. 다음은 원 둘래를 계산하는 도구 정의를 보여준다. 
from langchain.tools import BaseTool
from math import pi
from typing import Union

class CircumferenceTool(BaseTool):
      name = "Circumference calculator"
      description = "use this tool when you need to calculate a circumference using the radius of a circle"

    def _run(self, radius: Union[int, float]):
        return float(radius)*2.0*pi

    def _arun(self, radius: int):
        raise NotImplementedError("This tool does not support async")

다음과 같이 삼각형의 빗변을 계산하기 위해 여러 파라메터를 입력받은 도구도 만들 수 있다. 
from typing import Optional
from math import sqrt, cos, sin

desc = (
    "use this tool when you need to calculate the length of a hypotenuse"
    "given one or two sides of a triangle and/or an angle (in degrees). "
    "To use the tool, you must provide at least two of the following parameters "
    "['adjacent_side', 'opposite_side', 'angle']."
)

class PythagorasTool(BaseTool):
    name = "Hypotenuse calculator"
    description = desc
    
    def _run(
        self,
        adjacent_side: Optional[Union[int, float]] = None,
        opposite_side: Optional[Union[int, float]] = None,
        angle: Optional[Union[int, float]] = None
    ):
        # check for the values we have been given
        if adjacent_side and opposite_side:
            return sqrt(float(adjacent_side)**2 + float(opposite_side)**2)
        elif adjacent_side and angle:
            return adjacent_side / cos(float(angle))
        elif opposite_side and angle:
            return opposite_side / sin(float(angle))
        else:
            return "Could not calculate the hypotenuse of the triangle. Need two or more of `adjacent_side`, `opposite_side`, or `angle`."
    
    def _arun(self, query: str):
        raise NotImplementedError("This tool does not support async")

tools = [PythagorasTool()]

랭체인 구조 분석
langchain 구조를 분석하기 위해, github clone 후 UML로 모델링해 본다. 주요 패키지는 다음과 같다. 

cli는 langchain의 command line interface, core는 langchain의 핵심 구현 코드가 정의된다. 이 부분은 다음 패키지로 구성된다. 

참고로, 이 패키지들은 다음 그림의 일부이다. 
Langchain v.0.2.0 패키지

마무리
LangChain을 개발한 해리슨 체이스(Harrison Chase)은 머신러닝 프로그래머로 일하며, LLM을 사용할 때 불편한 점을 재빨리 개선한 사람이다. 그는 2022년 말 랭체인(LangChain)을 공동 설립했다. 그는 누구보다도 파이썬 언어를 잘 알고 있었던 것 같다. 앞서 분석한 대로, 파이썬의 일반화 구문을 적극 활용해 누구나 GPT-4와 같은 대규모 언어 모델로 구동되는 앱을 단 20줄의 코드로 개발할 수 있는 LangChain을 개발했다. 2023년 3월 1000만 달러의 시드 투자를 주도했고, 불과 몇 주 뒤 2000만 달러 규모의 투자를 진행하였다. 이때 이미 회사는 93k 팔로워를 보유하고 있었다. 대중 속의 그는 앤드류 응과 같은 AI 석학과 잘 소통하는 모습을 보인다. 

이렇게 빠르게 성장한 LangChain에 대한 첫 번째 커밋은 프롬프트 템플릿을 위한 Python의 가벼운 래퍼 클래스로써 시작된 것이다. 그는 2022년 초에 Notion 채팅 구현 시 LLM만으로는 쉽지 않다는 것을 발견했다. 이 문제를 가만두지 않고, LangChain 라이브러리를 재빨리 개선했다. 

현재 LangChain은 LLM과 더불어 전문적인 채팅, 에이전트 서비스, 정보 생성 서비스 개발에 필수적인 기술이 되었다. 

레퍼런스


2024년 4월 11일 목요일

도커 이미지 빌드 시 문제 해결 솔류션 정리

이 글은 도커 이미지 빌드 시 문제 해결 솔류션을 정리한 글이다. 


도커 컨테이너 이미지는 개발, 운영 환경을 독립적으로 실행할 수 있는 가상화를 지원한다. 다만, 도커 이미지 빌드 시 설치되는 수많은 패키지에 의존되므로, 여러 에러가 발생된다. 대표적인 에러는 다음과 같다. 
  1. 'DEBIAN_FRONTEND=noninteractive' not working inside shell script with apt-get
  2. How can I set the default timezone
  3. Docker build failed with Hash Sum mismatch error
  4. Docker build failed with error "Hash Sum mismatch"
  5. throws an error saying "Some index files failed to download. They have been ignored or old ones used instead."
  6. Error while loading conda entry point: conda-libmamba-solver (libarchive.so.19: cannot open shared object file: No such file or directory)
  7. OpenGL, 3D graphics problems in docker
  8. Windows Volume mount
이외에도 매우 다양한데, 사실, 설치되는 패키지들이 많은 도커 이미지는 그만큼 원인이 많다. 스택 오버플로우 관련 댓글을 보면 알겠지만, 누구는 이런 문제로 몇 일이 날라갔다 등의 성토글을 볼 수 있다. 그만큼 원인이 다양하게 조합될 수 있다.

앞의 에러 솔류션만 정리해 본다. 

1번은 도커 빌드 시 [yes/no] 프롬프트 입력 상황에서 발생한다. 도커 빌드 중에는 키보드 입력이 안된다. 그러므로, 다음과 같이 인터렉티브가 없다고 설정하면 해결된다.
ARG DEBIAN_FRONTEND=noninteractive

2번은 타임존 문제로 이는 다음과 같이 설정한다. 타임존이 제대로 설정되어 있지 않으면, 패키지 설치에 실패할 수 있다. 
ENV LANG=C.UTF-8 LC_ALL=C.UTF-8
ENV TZ=Asia/Seoul

3, 4번 해쉬섬 에러 문제는 보통 우분투나 리눅스의 apt-get update 시 발생하는 데, stack overflow에 검색해 보아도 관련 솔류션을 시도해도 잘 처리 안되는 경우가 발생한다. 

이 경우는 다음과 같은 솔류션을 apt-get update 전에 dockerfile에 정의하라고 되어 있는 답변이 많다. 

RUN rm -rf /var/lib/apt/lists/*

RUN echo "Acquire::http::Pipeline-Depth 0;" > /etc/apt/apt.conf.d/99custom && \
    echo "Acquire::http::No-Cache true;" >> /etc/apt/apt.conf.d/99custom && \
    echo "Acquire::BrokenProxy    true;" >> /etc/apt/apt.conf.d/99custom

RUN echo "Acquire::Check-Valid-Until \"false\";\nAcquire::Check-Date \"false\";" | cat > /etc/apt/apt.conf.d/10no--check-valid-until

이렇게 해도 해결안되면, 기본이 되는 이미지를 우분투 과거 버전(예. 20.04), 혹은 미리 패키지 설치된 버전으로 설정하고 다시 시도한다. 
FROM ubuntu:20.04
FROM continuumio/miniconda3

이 문제는 무언가 해당 패키지를 배포하는 서버에서 문제가 있거나, 도커 빌드 컴퓨터의 시간이 안맞는 등의 문제로 예상된다. 

5번은 도커 이미지 빌드 시 동작되는 방식에 원인이 있다. 도커는 빌드 과정을 재활용하기 위해 레이어, 캐쉬 등을 사용하는 데, 가끔 이 부분이 꼬이는 경우가 있다. 이 경우, 다음과 같이 빌드해 본다 .
docker build --no-cache -t <docker_name> .

안되면, 현재 도커 이미지들을 모두 삭제하거나, 다음과 같이 clean, purge를 한다.
docker image rm -f
docker system prune -a -f

6번은 미리 설치된 아나콘다 이미지를 사용하라고 권장한다. 다음은 그 예이다.
FROM continuumio/miniconda3

7번은 호스트 서버가 3D GPU가 지원되지 않으면 아직 뚜렷한 해결방법은 없다. 도커 이미지는 본질적으로 3D GUI용으로 사용되도록 개발된 것이 아니다. 3D GUI로 이미지를 생성, 저장하는 등의 기능은 기본적으로 도커에서는 처리되지 않는다. 
이를 우회적으로 지원하는 몇 가지 방법이 있다. VirtualGL(ref), Nvidia GPU docker 등이 그러하다. 하지만, 도커가 실행되는 호스트 서버에 설치된 그래픽카드에 따라 이런 옵션이 동작되지 않을 수도 있다. 

8번은 윈도우에서 호스트 폴더와 도커 내 폴더를 마운트할 때 발생하는 문제이다. 황당하게도, 우분투에는 문제 없는 볼륨 마운트가 윈도우에는 별도 설정을 해줘야 한다. 다음과 같이 도커 설정 메뉴에서 Resource 메뉴의 File sharing 경로를 추가해야 -v 마운트 옵션이 동작된다. 

아울러, 윈도우에서는 경로 설정이 우분투와 다르므로, 아래와 같이 절대 경로 지정해야 한다.
docker run -v c:/input_data:/app/input -it <docker_image_name> python app_program.py --input ./input/data.json 


레퍼런스

2024년 4월 8일 월요일

시계열 데이터 예측을 위한 트랜스포머 모델 개발하기

이 글은 시계열 트랜스포머 모델 개념 및 활용 방법을 간략히 정리한다. 이 글에서는 시계열 데이터셋을 학습 및 예측하기 해 트랜스포머와 TFT(Temporal Fusion Transformers) 방식을 사용다. 

머리말
트랜스포머의 원리를 이해하고 있다면, 수치로 표현된 시계열 벡터도 라벨링된 시계열 벡터가 있다면, 이를 학습할 수 있다는 것을 이해할 것이다. 

이 글은 트랜스포머 모델을 이용해 시계열을 학습하는 방법을 실습한다. 트랜스포머의 세부적인 기술은 다음 링크 및 레퍼런스를 참고한다. 
개발 환경
이 글을 실습하기 위해서는 기본적으로 다음 개발환경이 준비되어야 한다. 
  • NVIDIA GPU driver
  • Ubuntu or Windows CUDA driver 
  • PyTorch
  • Pandas, matplotlib
트랜스포머 기반 데이터 예측
시계열 데이터에서 패턴을 학습하고, 다음 값을 예측하는 방법은 여러가지가 있다. 트랜스포머의 경우에도, 트랜스포머 분류 기법을 이용하는 방법, Temporal Fusion Transformer를 사용하는 방법 등이 있다.

여기에서는 기본 개념을 먼저 이해하기 위해, 트랜스포머 분류 기법을 이용해 학습한다. 


학습될 데이터는 다음과 같다. 

이 데이터는 페이스북 주식 종가, 개장가 등을 다운로드받은 FB_raw.csv 엑셀파일을 그래프화한 것이다. 데이터셋 크기는 160,681레코드이다. 실제 데이터 구조는 다음과 같다. 

데이터를 미리 다운로드한다.
알고리즘 순서는 다음과 같다.
  1. 주가 시계열 데이터 로딩
  2. 학습 및 테스트 데이터 생성
  3. 벡터 임베딩 함수 정의
  4. 트랜스포머 모델 정의: 포지션 인코딩, 인코더, 주가 예측을 위한 linear full connection 정의. 타겟 마스크 정의
  5. 데이터 학습
  6. 테스트 데이터 예측 결과 확인 
코딩을 위해, 파이썬 파일을 생선한다. 그리고, 다음과 같이 패키지를 임포트한다. 
import torch, torch.nn as nn
import numpy as np
import pandas as pd
import os, time, math
import matplotlib.pyplot as plt
from tqdm import tqdm
os.environ['KMP_DUPLICATE_LIB_OK']='True' # Intel Math Kernel Library use

학습될 데이터 형식을 설정한다. 학습 데이터는 10개, 예측 데이터는 1개이다. 
input_window = 10 # number of input steps
output_window = 1 # number of prediction steps, in this model its fixed to one
batch_size = 250
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") 

페이스북 데이터를 읽고, 그래프형식으로 보여준다. 
# get this python's directory
module_path = os.path.dirname(__file__)
df = pd.read_csv(module_path + '/FB_raw.csv') # data path of facebook stock price (Apr 2019 - Nov 2020)
close = np.array(df['close'])
logreturn = np.diff(np.log(close)) # Transform closing price to log returns, instead of using min-max scaler
csum_logreturn = logreturn.cumsum() # Cumulative sum of log returns

fig, axs = plt.subplots(2, 1)
axs[0].plot(close, color='red')
axs[0].set_title('Closing Price')
axs[0].set_ylabel('Close Price')
axs[0].set_xlabel('Time Steps')

axs[1].plot(csum_logreturn, color='green')
axs[1].set_title('Cumulative Sum of Log Returns')
axs[1].set_xlabel('Time Steps')

fig.tight_layout()
plt.show()

벡터 임베딩 계산을 위해, 트랜스포머의 포지션 인코딩을 정의한다.  
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()       
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:x.size(0), :]        

트랜스포머 기반 시계열 학습 모델을 정의한다. 트랜스포머 정의 그대로 정의되며, decoder부분만 다르다. 이 부분은 트랜스포머 결과를 받아, 예측값 1개를 생성하는 full connection 레이어다. 
class transformer_seq(nn.Module):
    def __init__(self, feature_size=250, num_layers=1, dropout=0.1):
        super(transformer_seq, self).__init__()
        self.model_type = 'Transformer'
        
        self.src_mask = None
        self.pos_encoder = PositionalEncoding(feature_size)
        self.encoder_layer = nn.TransformerEncoderLayer(d_model=feature_size, nhead=10, dropout=dropout)   
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)        
        self.decoder = nn.Linear(feature_size,1)
        self.init_weights()  # decoder FC층 가중치 초기화

    def init_weights(self):
        initrange = 0.1    
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self,src):
        if self.src_mask is None or self.src_mask.size(0) != len(src):  # 목표 마스크 생성
            device = src.device
            mask = self.generate_square_subsequent_mask(len(src)).to(device)
            self.src_mask = mask

        src = self.pos_encoder(src)
        output = self.transformer_encoder(src, self.src_mask)
        output = self.decoder(output)
        return output

    def generate_square_subsequent_mask(self, sz):  # 목표 마스크 생성 함수
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask        

학습될 데이터를 생성한다. 주가 데이터에서 input은 10개 수치, output은 1개 수치이다.
def create_inout_sequences(input_data, tw):
    inout_seq = []
    L = len(input_data)
    for i in range(L-tw):
        train_seq = input_data[i:i+tw]
        train_label = input_data[i+output_window:i+tw+output_window]
        inout_seq.append((train_seq ,train_label))
    return torch.FloatTensor(inout_seq)

학습될 데이터를 train, test dataset으로 분할하는 함수를 정의한다. 
def get_data(data, split):
    series = data
    
    split = round(split*len(series))
    train_data = series[:split]
    test_data = series[split:]

    train_data = train_data.cumsum()
    train_data = 2*train_data # 학습 데이터 값을 2배 증폭함으로써 학습 정확도를 높인다.
    test_data = test_data.cumsum()

    train_sequence = create_inout_sequences(train_data,input_window)
    train_sequence = train_sequence[:-output_window]

    test_data = create_inout_sequences(test_data,input_window)
    test_data = test_data[:-output_window]

    return train_sequence.to(device), test_data.to(device)

학습될 배치 데이터를 리턴하는 함수를 정의한다.
def get_batch(source, i, batch_size):
    seq_len = min(batch_size, len(source) - 1 - i)
    data = source[i:i+seq_len]    
    input = torch.stack(torch.stack([item[0] for item in data]).chunk(input_window, 1))
    target = torch.stack(torch.stack([item[1] for item in data]).chunk(input_window, 1))
    return input, target

학습 함수를 정의한다.
def train(train_data):
    model.train() # Turn on the evaluation mode
    total_loss = 0.
    start_time = time.time()

    for batch, i in enumerate(range(0, len(train_data) - 1, batch_size)):  # 배치크기만큼 루프
        data, targets = get_batch(train_data, i,batch_size)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, targets)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.7)
        optimizer.step()

        total_loss += loss.item()
        log_interval = int(len(train_data) / batch_size / 5)
        if batch % log_interval == 0 and batch > 0:  # 배치 루프 시 loss, 정확도 출력
            cur_loss = total_loss / log_interval
            elapsed = time.time() - start_time
            print('| epoch {:3d} | {:5d}/{:5d} batches | lr {:02.10f} | {:5.2f} ms | loss {:5.7f}'.format(
                    epoch, batch, len(train_data) // batch_size, scheduler.get_lr()[0], elapsed * 1000 / log_interval,
                    cur_loss))
            total_loss = 0
            start_time = time.time()

평가 함수를 정의한다.
def evaluate(eval_model, data_source):
    eval_model.eval() # Turn on the evaluation mode
    total_loss = 0.
    eval_batch_size = 1000
    with torch.no_grad():
        for i in range(0, len(data_source) - 1, eval_batch_size):
            data, targets = get_batch(data_source, i, eval_batch_size)
            output = eval_model(data)            
            total_loss += len(data[0])* criterion(output, targets).cpu().item()
    return total_loss / len(data_source)

학습 모델 기반 데이터 예측 함수를 정의한다. 
def model_forecast(model, seqence):
    model.eval() 
    total_loss = 0.
    test_result = torch.Tensor(0)    
    truth = torch.Tensor(0)

    seq = np.pad(seqence, (0, 3), mode='constant', constant_values=(0, 0))
    seq = create_inout_sequences(seq, input_window)
    seq = seq[:-output_window].to(device)

    seq, _ = get_batch(seq, 0, 1)
    with torch.no_grad():
        for i in range(0, output_window):            
            output = model(seq[-output_window:])                        
            seq = torch.cat((seq, output[-1:]))

    seq = seq.cpu().view(-1).numpy()
    return seq

실 데이터를 이용한 데이터 예측 함수를 정의한다.
def forecast_seq(model, sequences):
    """Sequences data has to been windowed and passed through device"""
    start_timer = time.time()
    model.eval() 
    forecast_seq = torch.Tensor(0)    
    actual = torch.Tensor(0)
    with torch.no_grad():
        for i in tqdm(range(0, len(sequences) - 1)):
            data, target = get_batch(sequences, i, 1)
            output = model(data)            
            forecast_seq = torch.cat((forecast_seq, output[-1].view(-1).cpu()), 0)
            actual = torch.cat((actual, target[-1].view(-1).cpu()), 0)
    timed = time.time()-start_timer
    print(f"{timed} sec")

    return forecast_seq, actual

학습 데이터를 준비한다.
train_data, val_data = get_data(logreturn, 0.6) # 60% train, 40% test split

모델, Loss함수, 하이퍼파라메터, 스케쥴 등을 정의한다.
model = transformer_seq().to(device)
criterion = nn.MSELoss() # Loss function
lr = 0.00005 # learning rate
epochs =  500 # Number of epochs

optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)

for epoch in range(1, epochs + 1):
    epoch_start_time = time.time()
    train(train_data)
    
    if(epoch % epochs == 0): # 에폭마다 모델 평가
        val_loss = evaluate(model, val_data)
        print('-' * 80)
        print('| end of epoch {:3d} | time: {:5.2f}s | valid loss: {:5.7f}'.format(epoch, (time.time() - epoch_start_time), val_loss))
        print('-' * 80)
    else:   
        print('-' * 80)
        print('| end of epoch {:3d} | time: {:5.2f}s'.format(epoch, (time.time() - epoch_start_time)))
        print('-' * 80)

    scheduler.step() 

학습 후 모델 이용해 데이터 예측하고, 실제 데이터와 비교한다.
test_result, truth = forecast_seq(model, val_data)

plt.plot(truth, color='red', alpha=0.7)
plt.plot(test_result, color='blue', linewidth=0.7)
plt.title('Actual vs Forecast')
plt.legend(['Actual', 'Forecast'])
plt.xlabel('Time Steps')
plt.show()

테스트를 위해 랜덤값을 이용해 비교해 본다. 
r = np.random.randint(100000, 160000)
test_forecast = model_forecast(model, csum_logreturn[r: r+10]) # random 10 sequence length

print(f"forecast sequence: {test_forecast}")
print(f"Actual sequence: {csum_logreturn[r: r+11]}")
torch.save(model.state_dict(), "transformer_ts_20231211.pth")

학습 파일을 로딩한 후, 다른 시계열 데이터셋도 테스트해본다. 
model2 = TransAm() # rename as model2
model2.load_state_dict(torch.load("transformer_ts_20231211.pth"))
model2.to(device)

df2 = pd.read_csv(module_path + '/BA_raw.csv') # 보잉 주식 테스트
close2 = df2['close'].fillna(method = 'ffill')
close2 = np.array(close2)
logreturn2 = np.diff(np.log(close2))

train_data2, val_data2 = get_data(logreturn2, 0.6)
test2_eval = evaluate(model2, val_data2)
print(f'Test 2 loss: {test2_eval :.5f}')

test_result2, truth2 = forecast_seq(model2, val_data2)

plt.plot(truth2, color='red', alpha=0.7)
plt.plot(test_result2, color='blue', linewidth=0.7)
plt.title('Actual vs Forecast')
plt.legend(['Actual', 'Forecast'])
plt.xlabel('Time Steps')
plt.show()

df3 = pd.read_csv(module_path + '/JPM_raw.csv') # JPMorgan Chase & Co 주식 테스트
close3 = df3['close'].fillna(method = 'ffill')
close3 = np.array(close3)
logreturn3 = np.diff(np.log(close3))

train_data3, val_data3 = get_data(logreturn3, 0.6)
test3_eval = evaluate(model2, val_data3)
print(f'Test 3 loss: {test3_eval :.5f}')

test_result3, truth3 = forecast_seq(model2, val_data3)

plt.plot(truth3, color='red', alpha=0.7)
plt.plot(test_result3, color='blue', linewidth=0.7)
plt.title('Actual vs Forecast')
plt.legend(['Actual', 'Forecast'])
plt.xlabel('Time Steps')
plt.show()

결과는 다음과 같이, 잘 학습되어, 데이터셋을 예측하고 있는 것을 알 수 있다. 
학습 과정 화면
테스트 데이터셋 예측 결과(페이스북 주가)

전혀 다른 주식 종목도 예측해 보자. 다음은 보잉 주가이다. 
보잉 주가 예측
다음은 제이슨 모건 주식이다. 
제이슨 모건 주가 예측

결과적으로 학습 모델이 패턴을 잘 예측한다. 

TFT 기반 데이터 예측
이번에는 TFT 기반으로 데이터를 예측한다. TFT는 다음을 입력으로 사용하여 미래를 예측한다.
  • 길이 k의 윈도우 내에 있는 과거 목표 값 y
  • 알 수 없는 입력 z와 알려진 입력 x로 구성된 시간 종속 입력 

TFT의 특징은 다음과 같다.
  • 시계열이 보유할 수 있는 장기 패턴을 식별. 가장 관련성이 높은 패턴의 우선 순위를 지정하는 시간적 다중 헤드 어텐션 블록을 지원. 각 어텐션 헤드는 다른 시간적 패턴에 초점을 맞출 수 있음
  • LSTM sequence-to-sequence 인코더/디코더는 더 짧은 패턴을 요약함. LSTM 블록은 주변 값과 시간 단계의 관계를 식별하기 위한 것임
  • 중요하지 않고 사용되지 않는 입력을 제거하기 위한 게이트 잔여 네트워크 블록이 사용됨(GRN)
  • 과적합을 방지하기 위해 노드를 무작위로 삭제함
Temporal Fusion Transformer 개념도

우선 훈련에 사용될 UCI의 전력 부하 데이터세트를 다운로드 한다. 이 데이터는 15분 빈도로 370명 사용자의 전력 소비량(kW)가 기록되어 있다(2011-2014년). 이를 전처리하여, 목표변수(라벨링값)를 시간별로 집계하고, 새로운 특징(월, 일, 시간)을 생성한 후, 학습할 데이터셋을 추출하여 TFT 모델을 훈련한다. 

데이터는 다음과 같이 얻는다.
wget https://archive.ics.uci.edu/ml/machine-learning-databases/00321/LD2011_2014.txt.zip

압축해서 풀어보면, 다음과 같이 전력량이 시간, 사용자 별로 표시된 것을 알 수 있다.

이제 다음 코드를 입력한다.
import warnings, pickle
import numpy as np, pandas as pd, matplotlib.pyplot as plt, seaborn as sns
import torch, lightning.pytorch as pl
from lightning.pytorch.callbacks import EarlyStopping, LearningRateMonitor
from lightning.pytorch.loggers import TensorBoardLogger
from pytorch_forecasting import Baseline, TemporalFusionTransformer, TimeSeriesDataSet
from pytorch_forecasting.data import GroupNormalizer
from pytorch_forecasting.metrics import MAE, SMAPE, PoissonLoss, QuantileLoss
from pytorch_forecasting import TimeSeriesDataSet

data = pd.read_csv('LD2011_2014.txt', index_col=0, sep=';', decimal=',')
data.index = pd.to_datetime(data.index)
data.sort_index(inplace=True)
print(data.head(5))

# 다섯명의 사용자에 대한 전력 소모량을 모아 학습 데이터셋을 만든다
data = data.resample('1h').mean().replace(0., np.nan)
earliest_time = data.index.min()
df=data[['MT_002', 'MT_004', 'MT_005', 'MT_006', 'MT_008' ]]
print(df.head(5))
df_list = []

for label in df:
    ts = df[label]

    start_date = min(ts.fillna(method='ffill').dropna().index)
    end_date = max(ts.fillna(method='bfill').dropna().index)

    active_range = (ts.index >= start_date) & (ts.index <= end_date)
    ts = ts[active_range].fillna(0.)

    tmp = pd.DataFrame({'power_usage': ts})
    date = tmp.index

    tmp['hours_from_start'] = (date - earliest_time).seconds / 60 / 60 + (date - earliest_time).days * 24
    tmp['hours_from_start'] = tmp['hours_from_start'].astype('int')
  
    tmp['days_from_start'] = (date - earliest_time).days
    tmp['date'] = date
    tmp['consumer_id'] = label
    tmp['hour'] = date.hour
    tmp['day'] = date.day
    tmp['day_of_week'] = date.dayofweek
    tmp['month'] = date.month

    #stack all time series vertically
    df_list.append(tmp)

time_df = pd.concat(df_list).reset_index(drop=True)

# match results in the original paper
time_df = time_df[(time_df['days_from_start'] >= 1096)
                & (time_df['days_from_start'] < 1346)].copy()

# 학습할 모델은 1주일(7 * 24) 데이터셋을 사용해 다음 24시간의 전력 사용량을 예측한다. 
# Hyperparameters
# batch size=64
# number heads=4, hidden sizes=160, lr=0.001, gr_clip=0.1
max_prediction_length = 24
max_encoder_length = 7*24
training_cutoff = time_df["hours_from_start"].max() - max_prediction_length

training = TimeSeriesDataSet(
    time_df[lambda x: x.hours_from_start <= training_cutoff],
    time_idx="hours_from_start",
    target="power_usage",
    group_ids=["consumer_id"],
    min_encoder_length=max_encoder_length // 2, 
    max_encoder_length=max_encoder_length,
    min_prediction_length=1,
    max_prediction_length=max_prediction_length,
    static_categoricals=["consumer_id"],
    time_varying_known_reals=["hours_from_start","day","day_of_week", "month", 'hour'],
    time_varying_unknown_reals=['power_usage'],
    target_normalizer=GroupNormalizer(
        groups=["consumer_id"], transformation="softplus"
    ),  # we normalize by group
    add_relative_time_idx=True,
    add_target_scales=True,
    add_encoder_length=True,
)

validation = TimeSeriesDataSet.from_dataset(training, time_df, predict=True, stop_randomization=True)

# create dataloaders for  our model
batch_size = 64 
train_dataloader = training.to_dataloader(train=True, batch_size=batch_size, num_workers=0)
val_dataloader = validation.to_dataloader(train=False, batch_size=batch_size * 10, num_workers=0)

# 기준 모델을 만듬. 
actuals = torch.cat([y for x, (y, weight) in iter(val_dataloader)]).to("cuda")
baseline_predictions = Baseline().predict(val_dataloader)
print((actuals - baseline_predictions).abs().mean().item())

# PyTorch Lightning 라이브러리에서 Trainer 를 사용해 TFT를 훈련함.
early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=5, verbose=True, mode="min")
lr_logger = LearningRateMonitor()  
logger = TensorBoardLogger("lightning_logs")  

trainer = pl.Trainer(
    max_epochs=45,
    accelerator='gpu', 
    devices=1,
    enable_model_summary=True,
    gradient_clip_val=0.1,
    callbacks=[lr_logger, early_stop_callback],
    logger=logger)

tft = TemporalFusionTransformer.from_dataset(
    training,
    learning_rate=0.001,
    hidden_size=160,
    attention_head_size=4,
    dropout=0.1,
    hidden_continuous_size=160,
    output_size=7,  # there are 7 quantiles by default: [0.02, 0.1, 0.25, 0.5, 0.75, 0.9, 0.98]
    loss=QuantileLoss(),
    log_interval=10, 
    reduce_on_plateau_patience=4)

trainer.fit(
    tft,
    train_dataloaders=train_dataloader,
    val_dataloaders=val_dataloader)

# 학습 모델을 로딩하고, 예측 그래프 출력함.
best_model_path = trainer.checkpoint_callback.best_model_path
print(best_model_path)
best_tft = TemporalFusionTransformer.load_from_checkpoint(best_model_path)

actuals = torch.cat([y[0] for x, y in iter(val_dataloader)]).to('cuda')
predictions = best_tft.predict(val_dataloader)

#average p50 loss overall
print((actuals - predictions).abs().mean().item())
#average p50 loss per time series
print((actuals - predictions).abs().mean(axis=1))

raw_predictions = best_tft.predict(val_dataloader, mode="raw", return_x=True)
print(raw_predictions._fields)
#('output', 'x', 'index', 'decoder_lengths', 'y')

print('\n')
print(raw_predictions.output._fields)
print('\n')
print(raw_predictions.output.prediction.shape)

for idx in range(5):  # plot all 5 consumers
    fig, ax = plt.subplots(figsize=(10, 4))
    best_tft.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=QuantileLoss(),ax=ax)

fig, ax = plt.subplots(figsize=(10, 5))

실행 결과는 다음과 같다. 근사하게 학습된 것을 확인할 수 있다.

훈련 범위 밖의 데이터도 예측해 본다. 
    raw_prediction= best_tft.predict(
        training.filter(lambda x: (x.consumer_id == "MT_004") & (x.time_idx_first_prediction == 26512)),
        mode="raw",
        return_x=True,
    )
    best_tft.plot_prediction(raw_prediction.x, raw_prediction.output, idx=0, ax=ax)


TFT의 학습 모델 예측에 대한 어텐션 지표 등을 확인해 본다. 
    #encoder data is the last lookback window: we get the last 1 week (168 datapoints) for all 5 consumers = 840 total datapoints
    encoder_data = time_df[lambda x: x.hours_from_start > x.hours_from_start.max() - max_encoder_length]
    last_data = time_df[lambda x: x.hours_from_start == x.hours_from_start.max()]

    decoder_data = pd.concat(
        [last_data.assign(date=lambda x: x.date + pd.offsets.Hour(i)) for i in range(1, max_prediction_length + 1)],
        ignore_index=True,
    )

    #fix the new columns
    decoder_data["hours_from_start"] = (decoder_data["date"] - earliest_time).dt.seconds / 60 / 60 + (decoder_data["date"] - earliest_time).dt.days * 24
    decoder_data['hours_from_start'] = decoder_data['hours_from_start'].astype('int')
    decoder_data["hours_from_start"] += encoder_data["hours_from_start"].max() + 1 - decoder_data["hours_from_start"].min()

    decoder_data["month"] = decoder_data["date"].dt.month.astype(np.int64)
    decoder_data["hour"] = decoder_data["date"].dt.hour.astype(np.int64)
    decoder_data["day"] = decoder_data["date"].dt.day.astype(np.int64)
    decoder_data["day_of_week"] = decoder_data["date"].dt.dayofweek.astype(np.int64)

    new_prediction_data = pd.concat([encoder_data, decoder_data], ignore_index=True)

    fig, ax = plt.subplots(figsize=(10, 5))

    #create out-of-sample predictions for MT_002
    new_prediction_data=new_prediction_data.query(" consumer_id == 'MT_002'")
    new_raw_predictions = best_tft.predict(new_prediction_data, mode="raw", return_x=True)
    best_tft.plot_prediction(new_raw_predictions.x, new_raw_predictions.output, idx=0, show_future_observed=False, ax=ax);

    raw_predictions= best_tft.predict(val_dataloader, mode="raw", return_x=True)
    interpretation = best_tft.interpret_output(raw_predictions.output, reduction="sum")
    best_tft.plot_interpretation(interpretation)

    #Analysis on the training set
    predictions = best_tft.predict(train_dataloader, return_x=True)
    predictions_vs_actuals = best_tft.calculate_prediction_actual_by_variable(predictions.x, predictions.output)
    best_tft.plot_prediction_actual_by_variable(predictions_vs_actuals)

다음과 같이 TFT의 처리 성능과 관련된 지표를 확인할 수 있다. 


마무리
앞에 예제들은 충분한 데이터를 통해 좋은 조건에서 학습한 것이다. 일반적인 연구 데이터나 현장 데이터를 시계열 데이터 딥러닝 모델에 학습할 때는 다음 사항을 고려해야 한다.
  • 누락, 노이즈 등 데이터 제거 필요
  • 적은 양의 데이터는 증강 필요
  • 데이터 양에 따른 적절한 배치 사이즈 설정
  • 데이터 양에 따른 적적한 파라메터 설정
  • 데이터 정규화 고려
  • 데이터 공변성 문제 고려

레퍼런스