이 글은 Django 기반 데이터 분석 데쉬보드 개발방법을 간략히 정리한 것이다.
2024년 4월 26일 금요일
2024년 4월 21일 일요일
랭체인과 임베딩 벡터 데이터베이스 아키텍처 및 알고리즘 분석
이 글은 랭체인과 임베딩 벡터 데이터베이스를 분석한 것이다. 이를 통해, LLM 서비스 개발에 필요한 라이브러리 및 데이터베이스 구조를 이해하고, 다양한 분야에 응용할 수 있다.
이 글은 다음 라이브러리 및 플랫폼 아키텍처 및 알고리즘을 분석한다.
- 랭체인
- 벡터 데이터베이스
- OLLAMA
- LLAMA2.c
- 오픈소스 기반 LLM의 민주화, LLAMA-2 논문 분석 및 기술 요약하기
- 머신러닝 딥러닝 신경망 개념, 종류 및 개발
- 어텐션 기반 트랜스포머 딥러닝 모델 이해, 활용 사례 및 파치토치를 통한 간단한 사용방법 소개
- 트랜스포머 디코더 핵심 코드 구현을 통한 동작 메커니즘 이해하기
- 오픈소스 기반 LLM의 민주화, LLAMA-2 논문 분석 및 기술 요약
- 생성AI 멀티모달 모델 개발의 시작. OpenAI의 CLIP모델 이해, 코드 분석, 개발, 사용하기
- Computer vision deep learning: computer vision based on deep learning lecture materials, github
- GPU 없는 로컬에서 서비스 가능한 경량 소형 LLM, LLAMA2.c 빌드, 실행, 학습 및 코드 분석하기
- 대중화된 멀티모달 생성AI 모델, Stable Diffusion 아키텍처 분석과 동작 원리 이해
머리말
본론
마무리
레퍼런스
- 오픈소스 기반 LLM의 민주화, LLAMA-2 논문 분석 및 기술 요약하기
- 머신러닝 딥러닝 신경망 개념, 종류 및 개발
- 어텐션 기반 트랜스포머 딥러닝 모델 이해, 활용 사례 및 파치토치를 통한 간단한 사용방법 소개
- 트랜스포머 디코더 핵심 코드 구현을 통한 동작 메커니즘 이해하기
- 오픈소스 기반 LLM의 민주화, LLAMA-2 논문 분석 및 기술 요약
- 생성AI 멀티모달 모델 개발의 시작. OpenAI의 CLIP모델 이해, 코드 분석, 개발, 사용하기
- Computer vision deep learning: computer vision based on deep learning lecture materials, github
- GPU 없는 로컬에서 서비스 가능한 경량 소형 LLM, LLAMA2.c 빌드, 실행, 학습 및 코드 분석하기
- 대중화된 멀티모달 생성AI 모델, Stable Diffusion 아키텍처 분석과 동작 원리 이해
2024년 4월 11일 목요일
도커 이미지 빌드 시 문제 해결 솔류션 정리
이 글은 도커 이미지 빌드 시 문제 해결 솔류션을 정리한 글이다.
도커 컨테이너 이미지는 개발, 운영 환경을 독립적으로 실행할 수 있는 가상화를 지원한다. 다만, 도커 이미지 빌드 시 설치되는 수많은 패키지에 의존되므로, 여러 에러가 발생된다. 대표적인 에러는 다음과 같다.
- 'DEBIAN_FRONTEND=noninteractive' not working inside shell script with apt-get
- How can I set the default timezone
- Docker build failed with Hash Sum mismatch error
- Docker build failed with error "Hash Sum mismatch"
- throws an error saying "Some index files failed to download. They have been ignored or old ones used instead."
- Error while loading conda entry point: conda-libmamba-solver (libarchive.so.19: cannot open shared object file: No such file or directory)
- OpenGL, 3D graphics problems in docker
- Windows Volume mount
이외에도 매우 다양한데, 사실, 설치되는 패키지들이 많은 도커 이미지는 그만큼 원인이 많다. 스택 오버플로우 관련 댓글을 보면 알겠지만, 누구는 이런 문제로 몇 일이 날라갔다 등의 성토글을 볼 수 있다. 그만큼 원인이 다양하게 조합될 수 있다.
앞의 에러 솔류션만 정리해 본다.
ARG DEBIAN_FRONTEND=noninteractive
2번은 타임존 문제로 이는 다음과 같이 설정한다. 타임존이 제대로 설정되어 있지 않으면, 패키지 설치에 실패할 수 있다.
ENV LANG=C.UTF-8 LC_ALL=C.UTF-8
ENV TZ=Asia/Seoul
3, 4번 해쉬섬 에러 문제는 보통 우분투나 리눅스의 apt-get update 시 발생하는 데, stack overflow에 검색해 보아도 관련 솔류션을 시도해도 잘 처리 안되는 경우가 발생한다.
이 경우는 다음과 같은 솔류션을 apt-get update 전에 dockerfile에 정의하라고 되어 있는 답변이 많다.
RUN rm -rf /var/lib/apt/lists/*
RUN echo "Acquire::http::Pipeline-Depth 0;" > /etc/apt/apt.conf.d/99custom && \
echo "Acquire::http::No-Cache true;" >> /etc/apt/apt.conf.d/99custom && \
echo "Acquire::BrokenProxy true;" >> /etc/apt/apt.conf.d/99custom
RUN echo "Acquire::Check-Valid-Until \"false\";\nAcquire::Check-Date \"false\";" | cat > /etc/apt/apt.conf.d/10no--check-valid-until
이렇게 해도 해결안되면, 기본이 되는 이미지를 우분투 과거 버전(예. 20.04), 혹은 미리 패키지 설치된 버전으로 설정하고 다시 시도한다.
FROM ubuntu:20.04
FROM continuumio/miniconda3
이 문제는 무언가 해당 패키지를 배포하는 서버에서 문제가 있거나, 도커 빌드 컴퓨터의 시간이 안맞는 등의 문제로 예상된다.
5번은 도커 이미지 빌드 시 동작되는 방식에 원인이 있다. 도커는 빌드 과정을 재활용하기 위해 레이어, 캐쉬 등을 사용하는 데, 가끔 이 부분이 꼬이는 경우가 있다. 이 경우, 다음과 같이 빌드해 본다 .
docker build --no-cache -t <docker_name> .
안되면, 현재 도커 이미지들을 모두 삭제하거나, 다음과 같이 clean, purge를 한다.
docker image rm -f
docker system prune -a -f
6번은 미리 설치된 아나콘다 이미지를 사용하라고 권장한다. 다음은 그 예이다.
FROM continuumio/miniconda3
7번은 호스트 서버가 3D GPU가 지원되지 않으면 아직 뚜렷한 해결방법은 없다. 도커 이미지는 본질적으로 3D GUI용으로 사용되도록 개발된 것이 아니다. 3D GUI로 이미지를 생성, 저장하는 등의 기능은 기본적으로 도커에서는 처리되지 않는다.
이를 우회적으로 지원하는 몇 가지 방법이 있다. VirtualGL(ref), Nvidia GPU docker 등이 그러하다. 하지만, 도커가 실행되는 호스트 서버에 설치된 그래픽카드에 따라 이런 옵션이 동작되지 않을 수도 있다.
8번은 윈도우에서 호스트 폴더와 도커 내 폴더를 마운트할 때 발생하는 문제이다. 황당하게도, 우분투에는 문제 없는 볼륨 마운트가 윈도우에는 별도 설정을 해줘야 한다. 다음과 같이 도커 설정 메뉴에서 Resource 메뉴의 File sharing 경로를 추가해야 -v 마운트 옵션이 동작된다.
아울러, 윈도우에서는 경로 설정이 우분투와 다르므로, 아래와 같이 절대 경로 지정해야 한다.
docker run -v c:/input_data:/app/input -it <docker_image_name> python app_program.py --input ./input/data.json
2024년 4월 8일 월요일
시계열 데이터 예측을 위한 트랜스포머 모델 개발하기
이 글은 시계열 트랜스포머 모델 개념 및 활용 방법을 간략히 정리한다.
트랜스포머 개념도
머리말
트랜스포머의 원리를 이해하고 있다면, 수치로 표현된 시계열 벡터도 라벨링된 시계열 벡터가 있다면, 이를 학습할 수 있다는 것을 이해할 것이다.
이 글은 트랜스포머 모델을 이용해 시계열을 학습하는 방법을 실습한다. 트랜스포머의 세부적인 기술은 다음 링크 및 레퍼런스를 참고한다.
- 머신러닝 딥러닝 신경망 개념, 종류 및 개발
- 어텐션 기반 트랜스포머 딥러닝 모델 이해, 활용 사례 및 파치토치를 통한 간단한 사용방법 소개
- 트랜스포머 디코더 핵심 코드 구현을 통한 동작 메커니즘 이해하기
- 오픈소스 기반 LLM의 민주화, LLAMA-2 논문 분석 및 기술 요약
- 생성AI 멀티모달 모델 개발의 시작. OpenAI의 CLIP모델 이해, 코드 분석, 개발, 사용하기
- Computer vision deep learning: computer vision based on deep learning lecture materials, github
개발 환경
이 글을 실습하기 위해서는 기본적으로 다음 개발환경이 준비되어야 한다.
- NVIDIA GPU driver
- Ubuntu or Windows CUDA driver
- PyTorch
- Pandas, matplotlib
본론
시계열 데이터에서 패턴을 학습하고, 다음 값을 예측하는 방법은 여러가지가 있다. 트랜스포머의 경우에도, 트랜스포머 분류 기법을 이용하는 방법, Temporal Fusion Transformer를 사용하는 방법 등이 있다.
여기에서는 기본 개념을 먼저 이해하기 위해, 트랜스포머 분류 기법을 이용해 학습한다. 학습될 데이터는 다음과 같다.
이 데이터는 페이스북 주식 종가, 개장가 등을 다운로드받은 FB_raw.csv 엑셀파일을 그래프화한 것이다. 데이터셋 크기는 160,681레코드이다. 실제 데이터 구조는 다음과 같다.
데이터를 미리 다운로드한다.
알고리즘 순서는 다음과 같다.
- 주가 시계열 데이터 로딩
- 학습 및 테스트 데이터 생성
- 벡터 임베딩 함수 정의
- 트랜스포머 모델 정의: 포지션 인코딩, 인코더, 주가 예측을 위한 linear full connection 정의. 타겟 마스크 정의
- 데이터 학습
- 테스트 데이터 예측 결과 확인
import torch, torch.nn as nn
import numpy as np
import pandas as pd
import os, time, math
import matplotlib.pyplot as plt
from tqdm import tqdm
os.environ['KMP_DUPLICATE_LIB_OK']='True' # Intel Math Kernel Library use
학습될 데이터 형식을 설정한다. 학습 데이터는 10개, 예측 데이터는 1개이다.
input_window = 10 # number of input steps
output_window = 1 # number of prediction steps, in this model its fixed to one
batch_size = 250
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
페이스북 데이터를 읽고, 그래프형식으로 보여준다.
# get this python's directory
module_path = os.path.dirname(__file__)
df = pd.read_csv(module_path + '/FB_raw.csv') # data path of facebook stock price (Apr 2019 - Nov 2020)
close = np.array(df['close'])
logreturn = np.diff(np.log(close)) # Transform closing price to log returns, instead of using min-max scaler
csum_logreturn = logreturn.cumsum() # Cumulative sum of log returns
fig, axs = plt.subplots(2, 1)
axs[0].plot(close, color='red')
axs[0].set_title('Closing Price')
axs[0].set_ylabel('Close Price')
axs[0].set_xlabel('Time Steps')
axs[1].plot(csum_logreturn, color='green')
axs[1].set_title('Cumulative Sum of Log Returns')
axs[1].set_xlabel('Time Steps')
fig.tight_layout()
plt.show()
벡터 임베딩 계산을 위해, 트랜스포머의 포지션 인코딩을 정의한다.
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
return x + self.pe[:x.size(0), :]
트랜스포머 기반 시계열 학습 모델을 정의한다. 트랜스포머 정의 그대로 정의되며, decoder부분만 다르다. 이 부분은 트랜스포머 결과를 받아, 예측값 1개를 생성하는 full connection 레이어다.
class transformer_seq(nn.Module):
def __init__(self, feature_size=250, num_layers=1, dropout=0.1):
super(transformer_seq, self).__init__()
self.model_type = 'Transformer'
self.src_mask = None
self.pos_encoder = PositionalEncoding(feature_size)
self.encoder_layer = nn.TransformerEncoderLayer(d_model=feature_size, nhead=10, dropout=dropout)
self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
self.decoder = nn.Linear(feature_size,1)
self.init_weights() # decoder FC층 가중치 초기화
def init_weights(self):
initrange = 0.1
self.decoder.bias.data.zero_()
self.decoder.weight.data.uniform_(-initrange, initrange)
def forward(self,src):
if self.src_mask is None or self.src_mask.size(0) != len(src): # 목표 마스크 생성
device = src.device
mask = self.generate_square_subsequent_mask(len(src)).to(device)
self.src_mask = mask
src = self.pos_encoder(src)
output = self.transformer_encoder(src,self.src_mask)
output = self.decoder(output)
return output
def generate_square_subsequent_mask(self, sz): # 목표 마스크 생성 함수
mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
return mask
학습될 데이터를 생성한다. 주가 데이터에서 input은 10개 수치, output은 1개 수치이다.
def create_inout_sequences(input_data, tw):
inout_seq = []
L = len(input_data)
for i in range(L-tw):
train_seq = input_data[i:i+tw]
train_label = input_data[i+output_window:i+tw+output_window]
inout_seq.append((train_seq ,train_label))
return torch.FloatTensor(inout_seq)
학습될 데이터를 train, test dataset으로 분할하는 함수를 정의한다.
def get_data(data, split):
series = data
split = round(split*len(series))
train_data = series[:split]
test_data = series[split:]
train_data = train_data.cumsum()
train_data = 2*train_data # 학습 데이터 값을 2배 증폭함으로써 학습 정확도를 높인다.
test_data = test_data.cumsum()
train_sequence = create_inout_sequences(train_data,input_window)
train_sequence = train_sequence[:-output_window]
test_data = create_inout_sequences(test_data,input_window)
test_data = test_data[:-output_window]
return train_sequence.to(device), test_data.to(device)
학습될 배치 데이터를 리턴하는 함수를 정의한다.
def get_batch(source, i, batch_size):
seq_len = min(batch_size, len(source) - 1 - i)
data = source[i:i+seq_len]
input = torch.stack(torch.stack([item[0] for item in data]).chunk(input_window, 1))
target = torch.stack(torch.stack([item[1] for item in data]).chunk(input_window, 1))
return input, target
학습 함수를 정의한다.
def train(train_data):
model.train() # Turn on the evaluation mode
total_loss = 0.
start_time = time.time()
for batch, i in enumerate(range(0, len(train_data) - 1, batch_size)): # 배치크기만큼 루프
data, targets = get_batch(train_data, i,batch_size)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, targets)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 0.7)
optimizer.step()
total_loss += loss.item()
log_interval = int(len(train_data) / batch_size / 5)
if batch % log_interval == 0 and batch > 0: # 배치 루프 시 loss, 정확도 출력
cur_loss = total_loss / log_interval
elapsed = time.time() - start_time
print('| epoch {:3d} | {:5d}/{:5d} batches | lr {:02.10f} | {:5.2f} ms | loss {:5.7f}'.format(
epoch, batch, len(train_data) // batch_size, scheduler.get_lr()[0], elapsed * 1000 / log_interval,
cur_loss))
total_loss = 0
start_time = time.time()
평가 함수를 정의한다.
def evaluate(eval_model, data_source):
eval_model.eval() # Turn on the evaluation mode
total_loss = 0.
eval_batch_size = 1000
with torch.no_grad():
for i in range(0, len(data_source) - 1, eval_batch_size):
data, targets = get_batch(data_source, i, eval_batch_size)
output = eval_model(data)
total_loss += len(data[0])* criterion(output, targets).cpu().item()
return total_loss / len(data_source)
학습 모델 기반 데이터 예측 함수를 정의한다.
def model_forecast(model, seqence):
model.eval()
total_loss = 0.
test_result = torch.Tensor(0)
truth = torch.Tensor(0)
seq = np.pad(seqence, (0, 3), mode='constant', constant_values=(0, 0))
seq = create_inout_sequences(seq, input_window)
seq = seq[:-output_window].to(device)
seq, _ = get_batch(seq, 0, 1)
with torch.no_grad():
for i in range(0, output_window):
output = model(seq[-output_window:])
seq = torch.cat((seq, output[-1:]))
seq = seq.cpu().view(-1).numpy()
return seq
실 데이터를 이용한 데이터 예측 함수를 정의한다.
def forecast_seq(model, sequences):
"""Sequences data has to been windowed and passed through device"""
start_timer = time.time()
model.eval()
forecast_seq = torch.Tensor(0)
actual = torch.Tensor(0)
with torch.no_grad():
for i in tqdm(range(0, len(sequences) - 1)):
data, target = get_batch(sequences, i, 1)
output = model(data)
forecast_seq = torch.cat((forecast_seq, output[-1].view(-1).cpu()), 0)
actual = torch.cat((actual, target[-1].view(-1).cpu()), 0)
timed = time.time()-start_timer
print(f"{timed} sec")
return forecast_seq, actual
학습 데이터를 준비한다.
train_data, val_data = get_data(logreturn, 0.6) # 60% train, 40% test split
모델, Loss함수, 하이퍼파라메터, 스케쥴 등을 정의한다.
model = transformer_seq().to(device)
criterion = nn.MSELoss() # Loss function
lr = 0.00005 # learning rate
epochs = 500 # Number of epochs
optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.95)
for epoch in range(1, epochs + 1):
epoch_start_time = time.time()
train(train_data)
if(epoch % epochs == 0): # 에폭마다 모델 평가
val_loss = evaluate(model, val_data)
print('-' * 80)
print('| end of epoch {:3d} | time: {:5.2f}s | valid loss: {:5.7f}'.format(epoch, (time.time() - epoch_start_time), val_loss))
print('-' * 80)
else:
print('-' * 80)
print('| end of epoch {:3d} | time: {:5.2f}s'.format(epoch, (time.time() - epoch_start_time)))
print('-' * 80)
scheduler.step()
학습 후 모델 이용해 데이터 예측하고, 실제 데이터와 비교한다.
test_result, truth = forecast_seq(model, val_data)
plt.plot(truth, color='red', alpha=0.7)
plt.plot(test_result, color='blue', linewidth=0.7)
plt.title('Actual vs Forecast')
plt.legend(['Actual', 'Forecast'])
plt.xlabel('Time Steps')
plt.show()
테스트를 위해 랜덤값을 이용해 비교해 본다.
r = np.random.randint(100000, 160000)
test_forecast = model_forecast(model, csum_logreturn[r: r+10]) # random 10 sequence length
print(f"forecast sequence: {test_forecast}")
print(f"Actual sequence: {csum_logreturn[r: r+11]}")
torch.save(model.state_dict(), "transformer_ts_20231211.pth")
학습 파일을 로딩한 후, 다른 시계열 데이터셋도 테스트해본다.
model2 = TransAm() # rename as model2
model2.load_state_dict(torch.load("transformer_ts_20231211.pth"))
model2.to(device)
df2 = pd.read_csv(module_path + '/BA_raw.csv') # 보잉 주식 테스트
close2 = df2['close'].fillna(method = 'ffill')
close2 = np.array(close2)
logreturn2 = np.diff(np.log(close2))
train_data2, val_data2 = get_data(logreturn2, 0.6)
test2_eval = evaluate(model2, val_data2)
print(f'Test 2 loss: {test2_eval :.5f}')
test_result2, truth2 = forecast_seq(model2, val_data2)
plt.plot(truth2, color='red', alpha=0.7)
plt.plot(test_result2, color='blue', linewidth=0.7)
plt.title('Actual vs Forecast')
plt.legend(['Actual', 'Forecast'])
plt.xlabel('Time Steps')
plt.show()
df3 = pd.read_csv(module_path + '/JPM_raw.csv') # JPMorgan Chase & Co 주식 테스트
close3 = df3['close'].fillna(method = 'ffill')
close3 = np.array(close3)
logreturn3 = np.diff(np.log(close3))
train_data3, val_data3 = get_data(logreturn3, 0.6)
test3_eval = evaluate(model2, val_data3)
print(f'Test 3 loss: {test3_eval :.5f}')
test_result3, truth3 = forecast_seq(model2, val_data3)
plt.plot(truth3, color='red', alpha=0.7)
plt.plot(test_result3, color='blue', linewidth=0.7)
plt.title('Actual vs Forecast')
plt.legend(['Actual', 'Forecast'])
plt.xlabel('Time Steps')
plt.show()
레퍼런스
- Transformers from Scratch: Part 2 | by Paula Ceccon Ribeiro | Generative AI
- Language Modeling with nn.Transformer and torchtext — PyTorch Tutorials 2.2.0+cu121 documentation
- Time Series Transformer (huggingface.co)
- How to Apply Transformers to Time Series Models | by Intel | Intel Tech | Medium
- Probabilistic Time Series Forecasting with 🤗 Transformers (huggingface.co)
- Time Series Forecasting with a Basic Transformer Model in PyTorch | by Kaan Aslan | Medium
- Illustrated Differences between MLP and Transformers for Tensor Reshaping in Deep Learning | by Patrick Langechuan Liu | Towards Data Science
- Training and fine-tuning — transformers 3.3.0 documentation (huggingface.co)
- Introduction to Text Classification Using Transformers | by Jaz Allibhai | Medium
- transformers-tutorials/transformers_multiclass_classification.ipynb at master · abhimishra91/transformers-tutorials (github.com)
- A detailed guide to PyTorch’s nn.Transformer() module. | by Daniel Melchor | Towards Data Science
- Text Classification using Transformer Encoder in PyTorch (debuggercafe.com)
- Understanding Temporal Fusion Transformer | by Mouna Labiadh | DataNess.AI | Medium
- Demand forecasting with the Temporal Fusion Transformer — pytorch-forecasting documentation
- How to add padding mask to nn.TransformerEncoder module? - nlp - PyTorch Forums
- Padding and Target Mask
- Learning PyTorch - Language Model with nn.Transformer and TorchText (Part 1) - Ryan Ong
- Japanese-English Machine Translation Model with Transformer & PyTorch
- AI Talk with Andrew Ng
2024년 4월 1일 월요일
FastAPI, Uvicorn, Websocket 기반 Open API 서버 간단히 개발해 보기
이 글은 FastAPI, Uvicorn, Websocket 기반 Open API 서버 간단히 개발하는 방법을 정리한다. FastAPI를 이용하면, 매우 쉽게 Open API 서버를 개발할 수 있다.
FastAPI는 비동기 API 서버를 지원하며, uvicorn과 같은 ASGI 서버를 사용하여 실행할 수 있다. 이를 통해 빠른 성능과 비동기 처리를 구현할 수 있다. 자동 대화형 API 문서도 제공되어 개발자가 API를 쉽게 이해하고 사용할 수 있다.
FastAPI는 다음 웹 어플리케이션 프레임웍인 Flask, Django와 함께 활용하면 좋다.
패키지 설치는 다음과 같다.
pip install fastapi aiohttp uvicorn
서버 개발
크게 2개 유형의 API를 개발한다. 하나는 오래 걸리는 계산 함수, 다른 하나는 대용량 데이터 파일 전달이다. 대용량 파일은 청크로 분할해 전달한다. server.py 코드는 다음과 같다.
import json, time, logging
from fastapi import FastAPI, BackgroundTasks, WebSocket
from fastapi.middleware.cors import CORSMiddleware
# Set up logging to debug
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# uvicorn open_api_server:app --reload --port 8001 --ws-max-size 16777216 # https://www.uvicorn.org
app = FastAPI()
# CORS middleware. considering security
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
def calculate_dataset(length):
logger.debug('Calculation started...')
time.sleep(30) # 30 seconds
# TODO call your calculation functions
data = 3.14
logger.debug(f'End')
return data
@app.post("/v1/calc/dataset")
async def calculate_dataset(background_tasks: BackgroundTasks, length: str):
logger.debug('Calculation started...')
results = calculate_dataset(length)
return {"results": results}
@app.websocket("/ws/dataset") # don't remove /ws prefix to use websocket
async def websocket_endpoint(websocket: WebSocket):
logger.debug('Trying to connect...')
await websocket.accept()
logger.debug('Connection accepted.')
with open('output_xml.json') as json_file:
data = json.load(json_file)
data = json.dumps(data)
logger.debug('Message length: ' + str(len(data)))
CHUNK_SIZE = 64 * 1024 # 64KB
for i in range(0, len(data), CHUNK_SIZE):
chunk = data[i:i+CHUNK_SIZE]
print(f'chunk: {i}')
await websocket.send_text(chunk)
logger.debug('JSON data sent.')
다음과 같이 실행한다.
uvicorn open_api_server:app --reload --port 8001
클라이언트 개발
대용량 데이터와 결과를 클라이언트에서 처리할 때는 타임아웃 설정과 청크 다운로드 루프를 구현해야 한다. client.py를 코딩한다.
import json, traceback, asyncio, websockets, aiohttp # import httpx
from aiohttp import ClientSession, ClientTimeout
async def call_calc_dataset():
params = {"length": "10"}
t = ClientTimeout(total=60*2) # 2 minutes
async with aiohttp.ClientSession(timeout=t) as session:
async with session.post('http://localhost:8001/v1/calc/dataset', params=params) as resp:
results = await resp.text()
print(results)
async def connect():
async with websockets.connect('ws://localhost:8001/ws/dataset') as websocket:
CHUNK_SIZE = 64 * 1024 # 64KB
full_data = None
received_count = 0
try:
while True:
chunk = await asyncio.wait_for(websocket.recv(), timeout=5) # adjust timeout value considering internet speed
if full_data is None:
full_data = chunk
else:
full_data += chunk
received_count += len(chunk)
print('Received data: ', received_count)
except asyncio.TimeoutError:
print("Timeout error: The server didn't respond within 5 seconds")
except Exception as e:
print(e)
pass
print('Received data: ', received_count)
data = json.loads(full_data)
try:
os.remove('big_xml.json')
except:
pass
with open('big_xml.json', 'w') as json_file:
json.dump(data, json_file)
print('JSON data saved to file.')
def main():
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(call_calc_dataset())
loop.run_until_complete(connect())
except Exception as e:
print(traceback.format_exc())
if __name__ == '__main__':
main()
다음과 같이 실행한다.
python client.py
서버와 클라이언트에서 실행 결과는 다음과 같다. 다음과 같으면 정상 동작되는 것이다.
마무리
FastAPI는 Sebastián Ramírez라는 개발자에 의해 처음 개발되었다. 그는 현재 독일 베를린에 살고 있고, 많은 오픈소스 기여활동으로 유명하다.
피드 구독하기:
글 (Atom)