레이블이 빅데이터인 게시물을 표시합니다. 모든 게시물 표시
레이블이 빅데이터인 게시물을 표시합니다. 모든 게시물 표시

2024년 10월 20일 일요일

가성비 있는 웹 서비스 호스팅 Fly.IO 사용하기

이 글은 가성비 있는 웹 서비스 호스팅이 가능한 Fly.IO 사용방법을 정리한다. 이 글은 다음 글의 후속편으로, DNA, 호스팅 등 개념은 이 글을 참고하길 바란다.

머리말
FLY.IO는 Kurt Mackey가 공동 설립한 회사로 개발되었다. 그는 이전에 Compose.io라는 데이터베이스 호스팅 플랫폼을 공동 창업했으며, 이를 IBM에 매각한 경험이 있다. FLY.IO는 주로 개발자 친화적인 글로벌 애플리케이션 호스팅 서비스를 제공하기 위해 설립되었다.

FLY.IO의 장단점은 다음과 같다.

장점
1. 글로벌 배포: 애플리케이션을 여러 지역에 쉽게 배포할 수 있어 지리적으로 분산된 사용자에게 낮은 지연 시간을 제공한다.
2. 간단한 사용성: 간단한 명령어로 애플리케이션을 배포할 수 있으며, 설정도 비교적 쉬운 편이다.
3. 자동 확장: 필요에 따라 인프라를 자동으로 확장할 수 있어, 트래픽이 급증할 때도 유연하게 대응 가능하다.
4. 애플리케이션 근접 배치: 사용자 근처에 애플리케이션을 배치할 수 있어 성능 향상과 지연 시간 감소에 효과적이다.
5. 무료 계층 제공: 제한적이지만 무료로 사용할 수 있는 계층을 제공해, 작은 프로젝트나 테스트에 유용하다. 평소에는 배포 앱이 비활성화되었다가 URL 접속하면 자동 활성화되는 Pay As You Go Plan (기존 Hobby Plan) 을 지원한다.

단점
1. 제한된 문서화: 다른 클라우드 서비스에 비해 문서화가 부족한 편이며, 일부 기능에 대한 정보가 불충분할 수 있다.
2. 복잡한 설정: 고급 기능을 사용할 때 설정이 복잡해질 수 있으며, 초보자에게는 어려울 수 있다.
3. 비교적 작은 생태계: AWS나 Google Cloud와 비교하면 생태계가 작아, 지원되는 서비스나 도구가 제한적이다.
4. 무료 계층 한계: 무료 계층의 자원이 제한적이므로, 트래픽이 많은 애플리케이션에는 적합하지 않다.
5. 서비스 안정성: 일부 사용자들은 특정 상황에서 예기치 않은 중단을 경험할 수 있다고 보고했다.

환경 설치
FLY.IO 사용법을 다음과 같다.

1. 회원가입 및 설치
FLY.IO 웹사이트(https://fly.io)에 접속하여 회원가입을 한다.

2. 파워셀에서 설치 스크립트 실행
pwsh -Command "iwr https://fly.io/install.ps1 -useb | iex"

3. 터미널에서 로그인 실행
fly auth login

로그인 결과는 다음과 같다.

4. Github 로그인함

소스코드에서 데모앱 빌드, 배포 및 실행
FLY.IO에서 제공하는 데모앱을 간단히 실행해 보자. 
다음 예제를 터미널에서 실행한다.
cd hello-fly
코드 예시

앱을 배포하고 실행한다.
fly launch --now

그 결과, 도커를 자동 빌드한다. 다음은 이미지 스크립트이다. 
FROM node:16.19.0-slim

WORKDIR /usr/src/app
COPY package*.json ./

RUN npm install
COPY . .

EXPOSE 8080
CMD [ "npm", "start" ]

도커 이미지는 자동으로 FLY.IO 클라우드에 업로드되고, 설정파일대로 웹 인터페이스를 연결한다.
도커 이미지 생성 화면
Fly.IO의 웹 앱 배포된 모습

여기서 배포된 링크를 클릭한다.

결과는 다음과 같다. 웹 앱이 정상 서비스된 것을 확인할 수 있다.

이미 개발된 도커 이미지에서 데모앱 빌드, 배포 및 실행
Fly Launch를 사용해 도커 이미지로 앱을 배포할 수 있다. 미리 빌드된 Docker이미지를 이용해 데모앱을 만들고 배포해 본다. 

도커가 설치되었다는 가정 하에 다음 명령을 실행한다.
fly launch --image flyio/hellofly:latest
fly launch

이 결과 fly.toml 설정파일이 생성된다. 참고로, 웹 인터페이스는 다음과 같은 fly.toml 구성파일을 통해 설정된다. 
app = 'fly-io-delicate-surf-7133'
primary_region = 'nrt'

[http_service]
  internal_port = 8080
  force_https = true
  auto_stop_machines = 'stop'
  auto_start_machines = true
  min_machines_running = 0
  processes = ['app']

[[vm]]
  memory = '1gb'
  cpu_kind = 'shared'
  cpus = 1

앱을 배포한 후 상태를 확인한다.
앱 링크를 클릭해 오픈하거나 다음 명령을 이용해 방문한다.
fly apps open /fred

그럼, 다음 웹 페이지를 확인할 수 있다.

이외, 변경사항 배포는 fly deploy, 데모 앱 삭제는 fly apps destory 명령을 사용할 수 있다.
변경사항 배포 예

index 파일을 다음과 같이 수정한 후 fly deploy 명령을 실행한다.
배포에 성공하면, 다음과 같이 같은 DNS에서 변경된 서비스를 확인할 수 있다.

원격 터미널 접속
Fly.IO는 SSH 터미널 접속을 제공한다. 다음과 같이, SSH 키를 생성한다.
ssh-keygen -t rsa -b 4096 -C "your email@gmail.com"

그럼. id_rsa 암호키가 생성될 것이다.

원격 터미널에 접속한다. 도커 이미지의 리눅스가 접속된 것을 확인할 수 있다.
fly ssh console

결론
Kurt Mackey는 소프트웨어 엔지니어이자 창업가로, 클라우드 인프라와 개발자 도구 분야에서 주목받는 인물이다. 그는 여러 기술 회사에서 경력을 쌓아왔고, Compose.io 등 두 개의 성공적인 스타트업을 공동 창업했다. 2015년, IBM이 Compose.io를 인수하면서 Mackey는 IBM에서 기술 리더십 역할을 맡았다. 

Mackey는 오랜 시간 소프트웨어 엔지니어로 활동해왔으며, 주로 인프라 서비스와 클라우드 컴퓨팅에 대한 깊은 전문성을 보유하고 있다. 그의 작업 철학은 개발자가 인프라 관리에 신경 쓰지 않고도 효율적으로 작업할 수 있도록 도와주는 도구와 플랫폼을 제공하는 데 중점을 두고 있다.

Kurt Mackey는 기술 커뮤니티 내에서 활발하게 활동하며, 개발자 도구의 발전과 클라우드 기반 서비스의 미래에 대해 꾸준히 발언하고 있다. 그의 비전은 애플리케이션이 물리적 서버나 데이터 센터에 묶이지 않고, 사용자 근처에서 자동으로 최적화되고 배포될 수 있는 환경을 조성하는 것이다.

부록: Gradio web app의 Fly.toml 설정 예시
# fly.toml app configuration file generated for bim-data-quality-checker-solitary-wave-2365 on 2025-01-28T22:16:55+09:00
#
# See https://fly.io/docs/reference/configuration/ for information about how to use this file.
#
app = 'bim-data-quality-checker-solitary-wave-2365'
primary_region = 'nrt'

[build]
  # Use the Dockerfile in the current directory
  dockerfile = "./Dockerfile"

[env]
  PYTHONUNBUFFERED = "1" # Ensure logs are shown in 

[deploy]
  release_command = "echo Deploying Gradio app on Fly.io!"
  command = ["python", "./src/app.py"]

[[vm]]
  memory = '1gb'
  cpu_kind = 'shared'
  cpus = 1
  
[[services]]
  internal_port = 7860 # Default Gradio app port
  protocol = "tcp"

  [[services.ports]]
    handlers = ["http"]
    port = 80 # HTTP port

  [[services.ports]]
    handlers = ["tls", "http"]
    port = 443 # HTTPS port

  [[services.tcp_checks]]
    interval = "15s"
    timeout = "2s"
    grace_period = "5s"
    restart_limit = 0

부록: Gradio web app의 Dockerfile 예시
# https://www.gradio.app/main/guides/deploying-gradio-with-docker
# Use a base image with Python
FROM python:3.10-slim

# Set the working directory in the container
WORKDIR /app

# Copy the project files to the container
COPY ./src /app/src
COPY requirements.txt /app

# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt

# Expose the port your app will run on (e.g., 7860 for Gradio)
EXPOS
E 7860
ENV GRADIO_SERVER_NAME="0.0.0.0"

# Set the command to run your app when the container starts
CMD ["python", "./src/app.py"]

레퍼런스

2021년 4월 6일 화요일

MongoDB 시계열 데이터 분석 질의, 데이터 분석 가시화 및 자동 요약 트리거 개발

이 글은 시계열 데이터 분석 시 필요한 데이터 쿼리, 데이터 분석 시각화, 시계열 데이터 자동 요약 트리거 개발 방법을 다룬다. 


몽고디비의 쿼리는 SQL형식과는 다른 도큐먼트, 컬렉션에 대한 JSON 형식과 유사한 질의 언어를 사용한다. 다음과 같은 스키마로 데이터가 저장되어 있다고 가정한다.

{_id : ObjectId, sensor : String, area : String, date : String, value : Int32}

만약, sensor 중 "light"센서의 value 값이 150보다 작은 레코드만 얻도록 질의하려면 아래와 같이 필터를 만들어 실행하면 된다.

{sensor: 'light', value: {$lt: 150}}

쿼리 연산자는 아래와 같이 매우 다양하다. 

상세한 내용은 Query and Projection Operators를 참고한다. 

쿼리를 해서 원하는 데이터 레코드를 얻으면, Export data 기능을 이용해 아래와 같이 엑셀 파일 등을 저장할 수 있다.

만약, 데이터 분석을 통한 차트 시각화가 필요하다면, 아래 MongoDB chart를 사용하면 된다. 참고로, 여기서 제공하는 서버는 AWS 등에서 운영되며, 제한된 기능에서 무료로 MongoDB 인스턴스를 제공하기 때문에 부담없이 사용이 가능하다.

몽고디비 서버 인스턴스가 생성되면 다음과 같은 데쉬보드를 확인할 수 있다.

기존에 사용했던 데이터베이스에서 엑셀 저장 기능을 이용해 데이터를 MongoDB charts 서버로 옮길 수 있다. 

데이터를 입력한 후에는 다음과 같이 데이터를 분석해 차트 그래프로 가시화할 수 있다.




2020년 3월 16일 월요일

빅데이터를 고려한 카프카 기반 Raspberry Pi IoT 시스템 개발

이 글은 빅데이터를 고려한 카프카 기반 Raspberry Pi(RPi) IoT 시스템을 개발하는 방법을 간단히 다루어 본다. 카프카는 대용량 메시지 처리, IoT 센서 데이터, 빅데이터 처리 등에 MongoDB와 함께 사용되는 경우가 많다. 참고로, 카프카 개발팀은 스타트업 Confluent 사를 설립했는 데, 시리즈 D에서 1억 2500만달러를 투자받았고, 자산가치는 25억달러로 평가받고 있다(참고). 현재, 카카오, 네이버 라인 등 많은 회사에서 카프카를 메시지 미들웨어 핵심 엔진으로 사용하고 있다.
이 내용은 다음 목적에 유용하다.
  • 저렴한 IoT 데이터 분산처리 시스템 개발
  • 라즈베리파이 기반 IoT 클러스터 구축
라즈베리파이는 임베디드 보드라 메모리 제약이 심하다. 이런 이유로 기대한 만큼 카프카 성능과 안전성이 좋지 않을 수도 있다. 만약, 라즈베리파이를 단지 IoT 데이터 취득 목적으로만 사용하고, 별도 서버(IBM 호환 PC, 엔비디아 보드 등)에 카프카와 주키퍼를 서비스한다면 파이썬의 카프카 라이브러리 함수인 KafkaProducer, KafkaConsumer를 사용하면 된다. 이 내용은 아래 부록이나 링크를 참고한다. 
준비
하드웨어를 다음과 같이 준비한다.
  • Raspberry Pi 3 Model B 이상
  • 32 GB 이상 Micro SD 메모리 
  • EDiMAX 등 WiFi USB 동글
  • 라즈베리파이 전원 아답터
  • DHT11 온습도 및 초음파 거리센서 HC-SR04

소프트웨어를 다음과 같이 준비한다.
  • RPi NOOBS(New Out Of the Box Software) 다운로드 후 압축 풀고, 파일들을 Micro SD에 복사함
  • RPi 에 Micro SD를 넣고, 키보드, 마우스, 전원을 연결함
  • RPi 가 켜지면 WiFi 설정 후 운영체제 설치 명령을 따라 Raspbian OS를 설치(Lite 버전은 설치하지 말것) 

다음과 같이 hostname을 local IP 주소와 함께 설정한다.
vim /etc/hostname # and set a name like raspberry-8.
vim /etc/hosts # Replace raspberry with your new name.

아파치 주키퍼와 카프카를 다운로드 한 후 RPi 에 복사해 넣는다.

주키퍼 및 카프카 서버 실행
설정
라즈베리파이 메모리 부족 에러 방지를 위해 압축해제한 카프카 폴더 내 bin의 kafka-server-start.sh 에 서버 실행 전 아래 내용을 추가한다.
export JMX_PORT=${JMX_PORT:-9999}
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

카프카 LEADER_NOT_AVAILABLE 에러를 방지하기 위해 /config/server.properties에 아래를 추가한다.
advertised.listeners = PLAINTEXT://localhost:9092
delete.topic.enable=true

서버 실행
주피커와 카프카 서버를 터미널에서 차례대로 실행한다.
kafka\bin\zookeeper-server-start.sh ../config/zookeeper.properties
kafka\bin\kafka-server-start.sh ../config/server.properties

그리고, producer와 consumer를 터미널에서 실행한다.
kafka\bin\kafka-console-producer.sh --broker-list localhost:9092 --topic IoT
kafka\bin\kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic IoT

producer에서 키보드로 메시지 입력하면, 다음과 같이 consumer에 보일 것이다. 그럼 성공한 것이다.
카프카 서버 실행 화면

좀더 자세한 내용은 아래를 참고한다.
센서 설치 및 개발
이 글에서 개발할 센서는 온습도, 거리 센서이다. 센서 회로 연결 시 아래 RPi 보드 핀 레이아웃을 참고한다. 빅데이터 분산 처리 메시지 스트리밍 개발의 목적이므로, 간단한 회로 작성을 위해 저항 등은 연결하지 않았다. 다음 예시의 회로 연결은 실습용이니 참고하길 바란다.
RPi 핀 번호

개발 환경 설치
우선 아래와 같이 파이선 라이브러리를 설치한다.
sudo apt-get update
sudo apt-get install python-pip
sudo python -m pip install --upgrade pip setuptools wheel
sudo pip install kafka-python
sudo pip install Adafruit_DHT

데이터 분석, 비전을 개발한다면, 다음 패키지도 함께 설치한다(설치가 안된다면, 해당 패키지 github에서 설치방법을 확인한 후 수동으로 설치해야 한다).
sudo pip install scikit-learn numpy
sudo pip install opencv-python imageio Pandas Pillow

아두이노 개발하려면 아래를 설치한다.
sudo apt-get install arduino

온습도센서 설치 및 코딩
DHT11을 라즈베리파이와 다음과 같이 연결한다.
DHT11 Vcc - RPi 5V
DHT11 GND - RPi GND
DHT11 Data - RPi GPIO No 2
DHT11 온습도 센서 핀 배치

다음과 같이 코딩한다. 카프카로 IoT 주제로 데이터를 생성한다.
from kafka import KafkaProducer
from json import dumps
import RPi.GPIO as GPIO                    
import Adafruit_DHT as ada
import time               

producer = kafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))

dht = ada.DHT11
pin = 2    # GPIO No 2

print("start")
while True:
  h, t = ada.read_retry(dht. pin)
  data = {'temp': [h, t]}
  if h is not None and t is not None:
    producer.send("safety", value = data)
    print("T={0}, H={1}".format(t, h))
  else:
    print("none")
  time.sleep(0.5)  

다음과 같이 실행되면 카프카로 센서 메시지 발행이 성공한 것이다.
RPi기반 카프카 분산 메시지 스트리밍 실행 결과

네트웍 IP 주소가 있다면, 다른 컴퓨터에서 연결한 카프카 CONSUMER에서 라즈베리파이에서 발생한 센서 데이터 메시지를 스트리밍받아 볼 수 있다.

라즈베리파이 작업관리자를 확인해 보면, 카프카 실행상태에서 리소스 사용은 CPU 5-20%, 메모리 662MB/874MB이였으며, 다음과 같았다.

거리센서 설치 및 코딩
거리센서 HC-SR04를 아래와 같이 연결한다. 카프카 safety 주제(토픽)으로 data를 생성해본다.
회로 연결

RPi의 GPIO 라이브러리를 이용해 아래와 같이 dist.py파일을 코딩한다.
from kafka import KafkaProducer
from json import dumps
import RPi.GPIO as GPIO                    
import time                       
  
producer = kafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))
       
GPIO.setmode(GPIO.BCM)     #Set GPIO pin numbering 

trig = 23                                  
echo = 24                                  

GPIO.setup(trig,GPIO.OUT)                 
GPIO.setup(echo,GPIO.IN)                   

while True:
  GPIO.output(trig, False)                 
  time.sleep(1)                            

  GPIO.output(trig, True)                  
  time.sleep(0.00001)                      
  GPIO.output(trig, False)       
     
  while GPIO.input(echo)==0:          
    start = time.time()          
  while GPIO.input(echo)==1:            
    end = time.time()               

  travelTime = end - start 
  distance = travelTime * 17150        
  distance = round(distance, 2)            
  
  data = {'dist': distance}
  producer.send("safety", value = data)
     print "D = ", distance,"cm"

그리고, dist.py를 실행한다.
python dist.py

정상 실행되면 아래와 같은 데이터 출력을 확인할 수 있다. 같은 방식으로 카프카 CONSUMER에서 데이터를 확인할 수 있다.

부록: 별도 카프카 서버 구축, 라즈베리파이 IoT 센서 데이터 스트리밍 및 성능 테스트
카프카 환경 설정
무선이나 유선 네트워크가 가능한 환경에서, 별도 컴퓨터에 카프카를 앞의 내용을 참고해 설치한다. 시스템 구조는 다음과 같다.
카프카를 설치한 컴퓨터에, 앞서 설명한 server.properties 파일의 아래 내용을 해당 컴퓨터 IP주소에 맞게 수정한다.
listeners=PLAINTEXT://IP주소:9092
advertised.listeners=PLAINTEXT://IP주소:9092

주키퍼와 카프카 서버를 각각 터미널에서 실행한다.
c:\kafka\bin\windows\zookeeper-server-start.bat ../../config/zookeeper.properties
c:\kafka\bin\windows\kafka-server-start.bat ../../config/server.properties

센서 데이터 획득 및 빅데이터 생성 코딩
라즈베리파이에서 앞에 설명한 초음파 거리센서 예제를 참고해, 파이썬으로 다음과 같이 코딩한다. 카프카 서버 IP주소는 앞의 IP주소와 동일해야 한다.
from kafka import KafkaProducer
from json import dumps
import RPi.GPIO as gpio
import time

producer = KafkaProducer(bootstrap_servers=['카프카 컴퓨터 서버 IP주소:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'))

gpio.setmode(gpio.BCM)
trig = 23
echo = 24

gpio.setup(trig, gpio.OUT)
gpio.setup(echo, gpio.IN)

index = 0
while True:
  gpio.output(trig, 0)      # 0.01초 마다 센서 데이터 생성 및 전송
  time.sleep(0.01)
  gpio.output(trig, 1)
  time.sleep(0.00001)
  gpio.output(trig, 0)

  while gpio.input(echo) == 0:
    start = time.time()
  while gpio.input(echo) == 1:
    end = time.time()

  travelTime = end - start
  distance = travelTime * 17150 # / 58.0
  index = index + 1
  data = {'dist': [index, distance]}
  producer.send("IoT", value = data)
  print index, 'D = ', round(distance, 2), 'cm'

코딩 후 이 파이썬 프로그램을 실행한다. 

카프카 서버에서 consumer를 실행해 네트워크를 통해 데이터가 제대로 전달되는 지를 확인한다. 다음과 같이 카프카 서버와 라즈베리파에서 실행되면 성공한 것이다.
카프카 서버 실행 모습
라즈베리파이 센서 데이터 획득 및 카프카 서버에 센서 데이터 스트림 네트워크 전송 모습

카프카 데이터 스트리밍 성능 테스트
카프카 메시지 스트리밍 성능 테스트를 위해 센서 데이터 전송을 초당 100개로 설정해 다음과 같이 확인해 보았다. 
라즈베리파이 센서 값

인터넷 네트워크 회선에 큰 문제가 없다면, 카프카 서버에서 실시간 수준으로 데이터 스트리밍 처리하는 것을 확인할 수 있다. 데이터 패킷 크기가 50 bytes 인 경우, 초당 5K가 처리되는 수준이다. 좀 더 부하를 주기위해 초당 1000개 토픽을 생성해 전송하였고, 이때도 큰 무리 없이 실시간 수준으로 스트리밍처리되었다. 이 경우, 초당 50K가 처리되는 수준이다.

좀 더 명확한 성능 측정을 위해서는 다음 링크의 성능 분석 매트릭스 및 데쉬보드 도구를 설치해 확인할 수 있다.
카프카 데이터 패킷 구조에 대해 궁금하다면 다음 링크를 참고한다.
부록: 카프카, 스파크, NoSQL 빅데이터 처리 아키텍처
카프카 서버에 전송된 데이터는 스트리밍되어 다른 어플리케이션이나 컴퓨터에서 고속으로 데이터를 전송받을 수 있다. 이 데이터를 Mongo DB같은 NoSQL DB에 저장하고, 스파크 등으로 빅데이터 분석을 수행할 수 있다. 이에 대한 내용은 아래를 참고한다.
다음은 이를 고려한 빅데이터 처리 아키텍처 예시이다.
Big data process architecture framework(Data Science: Third International Conference of Pioneering Computer, Springer, 2017)


부록: RPi 카메라 설치
라즈베리파이 카메라를 RPi와 연결하고, 다음과 같이 카메라 설정한다.
sudo raspi-config

UI메뉴에서 5번 setting interfaces, 1번 Setting camera 를 선택해 카메라를 활성화한다. 이제 다음 명령을 입력한다.
raspivid -o video.h264 -t 10000 -w 640 -h 480 -p 0,0,640,480

그럼 다음과 같이 카메라 화면이 나타난다.

파이썬 OPENCV를 이용해 객체 감지 후 해당 메시지를 앞에서 설명한 것과 같은 방식으로 카프카 메시지로 스트리밍 서비스할 수 있다.

부록: 라즈베리파이 기반 카프카 클러스터 서버 생성
주키퍼 설정
다음과 같이 주키퍼 클러스터 설정용 conf/zoo.cfg을 수정한다.
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/zookeeper # Don't put under /tmp, it will be deleted.
clientPort=2181
server.1=192.168.0.18:2888:3888
server.2=192.168.0.15:2888:3888
server.3=192.168.0.16:2888:3888

각 RPi의 /var/zookeeper 아래 myid 파일에 설정 파일을 복사한다.
그리고 zookeeper 루트 폴더아래 ./bin/zkServer.sh start 를 입력해 주키퍼 서비스를 시작한다. 주키퍼가 적절히 실행된다면, 다음처럼 프롬프트에 보일 것이다.
pi@raspberrypi-16:~/apache/zookeeper-3.4.11 $ ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/pi/apache/zookeeper-3.4.11/bin/../conf/zoo.cfg
Mode: follower # (or leader)

카프카 설정
주키퍼 클러스터가 실행 되고 있으면, 이제 아파치 카프카를 RPi에 배포한다.

다음 같이, config/server.properties 파일을 수정한다.
broker.id=1 # 1/2/3 for each card
port=9092
host.name=192.168.0.16 # IP address
zookeeper.connect=192.168.0.18:2181,192.168.0.15:2181,192.168.0.16:2181

bin/kafka-server-start.sh 를 갱신한다.
export JMX_PORT=${JMX_PORT:-9999}
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" # Otherwise, JVM would complain not able to allocate the specified memory.

bin/kafka-run-class.sh 를 갱신한다.
KAFKA_JVM_PERFORMANCE_OPTS="-client -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true" # change -server to -client

카프카를 시작한다.
./bin/kafka-server-start.sh config/server.properties &

카프카가 적절히 시작되고 있다면, 다음과 같이 다른 콘솔 터미널에서 producer 를 실행해 본다.
./bin/kafka-console-producer.sh — broker-list 192.168.0.15:9092,192.168.0.16:9092,192.168.0.18:9092 — topic v-topic # create a topic
./bin/kafka-console-producer.sh --broker-list 192.168.0.15:9092,192.168.0.16:9092,192.168.0.18:9092 --topic v-topic # kick off a producer

카프카 consumer를 실행한다.
./bin/kafka-console-consumer.sh — zookeeper 192.168.0.16 — topic v-topic from-beginning

레퍼런스
P.S. 최근 천억 규모의 연구과제가 발주되었다. 사실 1억도 매우 큰 돈이다. 국가 연구과제가 이렇게 크면 누수가 많아지고, 인하우스 기술 개발보단 아웃소싱만 많아진다. 몇천만원이라도 10년을 꾸준하게 연구하고 싶은 사람이나 팀에게 지원해 주고 그 연구에만 몰입할 수 있는 환경을 준다면, 선진국 보유 핵심기술이라도 충분히 개발할 수 있다고 생각한다. 대형 설비가 필요해 큰 펀딩이 있어야 하는 경우를 제외한다면, 이런 펀딩은 핵심기술은 겨녕 아웃소싱 연구행정관리만 하다 아까운 시간 낭비할 수 있다. 홍보하기는 좋겠지만, 지속적이지 않고, 전문성은 더욱 떨어질 수 있다. 간만 보다 끝날 수 있다고 생각한다.

2020년 3월 2일 월요일

아파치 Kafka, MongoDB, 파이썬 기반 빅데이터 메시지 스트리밍 서버 개발

이 글은 분산 메시지 스트리밍 플랫폼 아파치 Kafka(카프카) 설치 후 사용하는 방법을 간단히 설명한다. 카프카는 대용량 메시지 처리, IoT 센서 데이터, 빅데이터 처리 등에 MongoDB와 함께 사용되는 경우가 많다. 참고로, 카프카 개발팀은 스타트업 Confluent 사를 설립했는 데, 시리즈 D에서 1억 2500만달러를 투자받았고, 자산가치는 25억달러로 평가받고 있다(참고). 현재, 카카오, 네이버 라인 등 많은 회사에서 카프카를 메시지 미들웨어 핵심 엔진으로 사용하고 있다.

분산 메시지 스크리밍 플랫폼 카프카 
카프카는 대용량 메시지 발행-구독 서버로 사용된다. 아파치 소프트웨어재단이 2011년 오픈소스로 공개한 카프카는 비즈니스 구인구직 소셜 네트워크 서비스인 링크드인(LinkedIn) 수석 엔지니어 제이 크렙스(Jay Kreps)가 개발했다.

많은 웹 서비스가 다음 그림과 같이 메시지 발행(producer)-구독(consumer) 패턴을 가지는 데 카프카는 이런 메시지를 큐(queue) 자료구조로 관리하며, 빠른 속도로 메시지 데이터를 저장한 후 사용할 수 있는 기능을 제공한다.
카프카 발행-구독 서버(Apache Kafka documentation)

토픽(topic. 주제)은 다음과 같은 토픽별 파티션(partition)에 저장된다.
카프카 주제(토픽) 생성-소비 및 파티션 구조(https://www.cloudera.com/documentation/kafka/1-2-x/topics/kafka.html)

토픽이 만들어지면, 데이터는 파티션에 다음 같은 큐(queue) 형식으로 기록된다. 이를 생성된 순서대로 각 소비자가 읽을 수 있다.
카프카 메시지큐(www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client)

다음은 카프카 파티션에 저장되는 메시지 큐 자료 구조를 좀 더 명확히 보여준 것이다. 그림에서 offset은 소비자(consumer)가 현재 읽고 있는 메시지를 참조할 때 사용되는 참조번호가 된다. 소비자가 토픽의 메시지를 읽을 때 마다, current position 은 하나씩 증가하고, 얻어온 데이터는 청크 저장소(Chunk store)에 다음 그림과 같이 저장된다.

카프카 구조는 간단하나, 카프카 개발팀은 데이터 스트리밍을 고속으로 처리하기 위해, 이런 자료 구조를 메모리 상에서 관리하며, 적당할 때 하드디스크로 고속 저장(swap)하고 불러오는 과정을 효율적으로 개발했다.

고가용성 분산 코디네이션 지원 ZooKeeper
카프카는 고가용성을 위해 분산 코디네이션 기능을 지원하는 아파치 ZooKeeper(주키퍼) 와 함께 사용된다. 주키퍼는 쓰기 동작 시 분산된 카프카 클라리언트들 간 동기화 처리를 해준다.
주키퍼 쓰기 요청 시 동작(Zookeeper: A Wait-Free Coordination Kernel)

주키퍼 클라이언트들은 ZNode 단위로 관리되어 데이터 동기화 처리를 수행한다.
ZooKeeper znodes(Zookeeper: A Wait-Free Coordination Kernel)

빅데이터 처리를 위한 카프카와 주키퍼
각종 센서 및 다양한 말단에서 수집되는 대용량 데이터를 적절히 분산 저장 및 관리하기 위해 앞서 설명한 카프카가 사용되는 경우는 많다. 카프카는 분산 처리를 위해 다음 그림처럼 주키퍼를 함께 사용한다.

카프카의 효율적인 디스크 쓰기 알고리즘으로 인해 다음 그림과 같이 메시지 처리 성능은 매우 좋다고 알려져 있으며, 네이버 LINE 등에서 초당 4 GB 메시지 처리를 지원하는 핵심인 컴포넌트로 사용되고 있다(참고). 

윈도우에서 카프카 설치하기
다음과 같은 순서로 설치한다.
윈도우 버전은 bin/windows 폴더 내에 실행 배치파일이 있다. 이제 다음과 같이 각 콘솔에서 주키퍼 서버를 실행한다. 실행 시 설치 경로는 적절히 수정한다.
c:\kafka\bin\windows\zookeeper-server-start.bat ../../config/zookeeper.properties

다음과 같이 카프카 서버를 실행한다. 
c:\kafka\bin\windows\kafka-server-start.bat ../../config/server.properties

이제 카프카 서버에 producer를 이용하여 토픽 메시지를 발행한다. 다음을 실행하고, 메시지를 입력해 본다.
c:\kafka\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test20200316

다음과 같이 메시지 consumer를 실행하면, 앞서 입력된 메시지가 구독되는 것을 확인할 수 있을 것이다. 
c:\kafka\bin\windows>kafka-console-consumer.bat

MongoDB, 카프카 활용 파이썬 코딩 
MongoDB를 여기서 다운로드 받아 설치한다. 그리고, 다음과 같이 pip 로 패키지를 설치한다.
pip install kafka-python
pip install pymongo

kafka_server.py 파일을 아래와 같이 코딩한다.
from time import sleep
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: dumps(x).encode('utf-8'))

for e in range(1000):
    data = {'number' : e}
    producer.send('numtest', value=data)
    sleep(5)

kafka_consumer.py 파일을 아래와 같이 코딩한다. auto_off_reset에 earliest 로 설정해 커밋된 최신 오프셋에서 메시지 기록을 읽기를 시작한다. 만약, latest로 설정하면 로그 끝에서 읽기 시작할 것이다. enable_auto_commit을 true로 하면 소비가자 매 간격마다 읽기 오프셋을 커밋하게 된다. group_id는 소비자가 속한 그룹을 정의한 것이다. value_deserializer는 데이터를 json 형식으로 변환한다.
from kafka import KafkaConsumer
from pymongo import MongoClient
from json import loads

consumer = KafkaConsumer(
    'numtest',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: loads(x.decode('utf-8')))

client = MongoClient('localhost:27017')
collection = client.numtest.numtest
     
for message in consumer:
    message = message.value
    collection.insert_one(message)
    print('{} added to {}'.format(message, collection))
   
콘솔창에서 다음과 같이 각각 실행한다.
python kafka_server.py
python kafka_consumer.py

그럼 다음처럼 5초마다 'numtest'토픽이름으로 데이터가 생성되어, consumer에게 전달되는 것을 확인할 수 있다.
카프카 생성 메시지 pub-sub 

MongoDB를 실행해 database를 연결하면, 다음과 같이 kafka에서 생성된 데이터가 저장된 것을 확인할 수 있다.
몽고DB의 카프카 생성 데이터 뷰

이런 방식으로 IoT 센서, 메신저, SNS 메시지 등 대용량으로 생성되는 빅데이터를 카프카에서 모아, NoSQL DB로 저장하거나, 스파크(spark)로 분석해 NoSQL DB로 저장하는 등의 작업을 쉽게 처리할 수 있다.

기타 명령 
  • 토픽 생성 명령: kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic <topic name>
  • 토픽 목록 확인: kafka-topics.bat --list --zookeeper localhost:2181
마무리 
카프카는 IoT 장치, node-red 와 같은 데이터 토픽을 발생하는 데이터 스트리밍 파이프라인을 구성하는 데 중요한 역할을 할 수 있다. 카프카는 빅데이터 분석을 지원하는 스파크(spark)와도 연동되어 사용되기도 한다. 카프카는 고가용성과 대용량 데이터 분산처리가 필요한 경우에 효과적이다. 아울러, 파이썬, nodejs 등을 이용해 카프카, 스파크 등을 사용할 수 있어 편리하다.

참고: 주키퍼 설정 수정
압축 해제한 카프카 폴더의 config 폴더에서 다음과 같이 해당 파일을 수정한다. 
# zookeeper.properties
# The directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

# server.properties
############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

레퍼런스