2020년 3월 19일 목요일

딥러닝 기반 이미지 시멘틱 세그먼테이션

이 글은 딥러닝 기반 이미지 세그먼테이션 관련 내용을 간단히 소개한다.

개별 이미지에 대한 딥러닝 기법은 정확도가 많이 높아져, 사람이 인식하는 수준을 넘었으나, 전체 그림에서 객체의 시멘틱 정보를 인식하는 딥러닝 기법은 현재 정확도 수준이 그에 미치지 못하고 있다.

시멘틱 기법은 데이터 종류에 따라, 2차원 이미지, 3차원 포인트 클라우드로 구분할 수 있다. 

포인트 클라우드 시멘틱 세그먼테이션의 최신 기법은 다음과 같다.

이미지 기반 시멘틱 세그먼테이션의 최신 기법은 다음과 같다.
Semantic segmentation mAP, FPS
레퍼런스



2020년 3월 16일 월요일

빅데이터를 고려한 카프카 기반 Raspberry Pi IoT 시스템 개발

이 글은 빅데이터를 고려한 카프카 기반 Raspberry Pi(RPi) IoT 시스템을 개발하는 방법을 간단히 다루어 본다. 카프카는 대용량 메시지 처리, IoT 센서 데이터, 빅데이터 처리 등에 MongoDB와 함께 사용되는 경우가 많다. 참고로, 카프카 개발팀은 스타트업 Confluent 사를 설립했는 데, 시리즈 D에서 1억 2500만달러를 투자받았고, 자산가치는 25억달러로 평가받고 있다(참고). 현재, 카카오, 네이버 라인 등 많은 회사에서 카프카를 메시지 미들웨어 핵심 엔진으로 사용하고 있다.
이 내용은 다음 목적에 유용하다.
  • 저렴한 IoT 데이터 분산처리 시스템 개발
  • 라즈베리파이 기반 IoT 클러스터 구축
라즈베리파이는 임베디드 보드라 메모리 제약이 심하다. 이런 이유로 기대한 만큼 카프카 성능과 안전성이 좋지 않을 수도 있다. 만약, 라즈베리파이를 단지 IoT 데이터 취득 목적으로만 사용하고, 별도 서버(IBM 호환 PC, 엔비디아 보드 등)에 카프카와 주키퍼를 서비스한다면 파이썬의 카프카 라이브러리 함수인 KafkaProducer, KafkaConsumer를 사용하면 된다. 이 내용은 아래 부록이나 링크를 참고한다. 
준비
하드웨어를 다음과 같이 준비한다.
  • Raspberry Pi 3 Model B 이상
  • 32 GB 이상 Micro SD 메모리 
  • EDiMAX 등 WiFi USB 동글
  • 라즈베리파이 전원 아답터
  • DHT11 온습도 및 초음파 거리센서 HC-SR04

소프트웨어를 다음과 같이 준비한다.
  • RPi NOOBS(New Out Of the Box Software) 다운로드 후 압축 풀고, 파일들을 Micro SD에 복사함
  • RPi 에 Micro SD를 넣고, 키보드, 마우스, 전원을 연결함
  • RPi 가 켜지면 WiFi 설정 후 운영체제 설치 명령을 따라 Raspbian OS를 설치(Lite 버전은 설치하지 말것) 

다음과 같이 hostname을 local IP 주소와 함께 설정한다.
vim /etc/hostname # and set a name like raspberry-8.
vim /etc/hosts # Replace raspberry with your new name.

아파치 주키퍼와 카프카를 다운로드 한 후 RPi 에 복사해 넣는다.

주키퍼 및 카프카 서버 실행
설정
라즈베리파이 메모리 부족 에러 방지를 위해 압축해제한 카프카 폴더 내 bin의 kafka-server-start.sh 에 서버 실행 전 아래 내용을 추가한다.
export JMX_PORT=${JMX_PORT:-9999}
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

카프카 LEADER_NOT_AVAILABLE 에러를 방지하기 위해 /config/server.properties에 아래를 추가한다.
advertised.listeners = PLAINTEXT://localhost:9092
delete.topic.enable=true

서버 실행
주피커와 카프카 서버를 터미널에서 차례대로 실행한다.
kafka\bin\zookeeper-server-start.sh ../config/zookeeper.properties
kafka\bin\kafka-server-start.sh ../config/server.properties

그리고, producer와 consumer를 터미널에서 실행한다.
kafka\bin\kafka-console-producer.sh --broker-list localhost:9092 --topic IoT
kafka\bin\kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic IoT

producer에서 키보드로 메시지 입력하면, 다음과 같이 consumer에 보일 것이다. 그럼 성공한 것이다.
카프카 서버 실행 화면

좀더 자세한 내용은 아래를 참고한다.
센서 설치 및 개발
이 글에서 개발할 센서는 온습도, 거리 센서이다. 센서 회로 연결 시 아래 RPi 보드 핀 레이아웃을 참고한다. 빅데이터 분산 처리 메시지 스트리밍 개발의 목적이므로, 간단한 회로 작성을 위해 저항 등은 연결하지 않았다. 다음 예시의 회로 연결은 실습용이니 참고하길 바란다.
RPi 핀 번호

개발 환경 설치
우선 아래와 같이 파이선 라이브러리를 설치한다.
sudo apt-get update
sudo apt-get install python-pip
sudo python -m pip install --upgrade pip setuptools wheel
sudo pip install kafka-python
sudo pip install Adafruit_DHT

데이터 분석, 비전을 개발한다면, 다음 패키지도 함께 설치한다(설치가 안된다면, 해당 패키지 github에서 설치방법을 확인한 후 수동으로 설치해야 한다).
sudo pip install scikit-learn numpy
sudo pip install opencv-python imageio Pandas Pillow

아두이노 개발하려면 아래를 설치한다.
sudo apt-get install arduino

온습도센서 설치 및 코딩
DHT11을 라즈베리파이와 다음과 같이 연결한다.
DHT11 Vcc - RPi 5V
DHT11 GND - RPi GND
DHT11 Data - RPi GPIO No 2
DHT11 온습도 센서 핀 배치

다음과 같이 코딩한다. 카프카로 IoT 주제로 데이터를 생성한다.
from kafka import KafkaProducer
from json import dumps
import RPi.GPIO as GPIO                    
import Adafruit_DHT as ada
import time               

producer = kafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))

dht = ada.DHT11
pin = 2    # GPIO No 2

print("start")
while True:
  h, t = ada.read_retry(dht. pin)
  data = {'temp': [h, t]}
  if h is not None and t is not None:
    producer.send("safety", value = data)
    print("T={0}, H={1}".format(t, h))
  else:
    print("none")
  time.sleep(0.5)  

다음과 같이 실행되면 카프카로 센서 메시지 발행이 성공한 것이다.
RPi기반 카프카 분산 메시지 스트리밍 실행 결과

네트웍 IP 주소가 있다면, 다른 컴퓨터에서 연결한 카프카 CONSUMER에서 라즈베리파이에서 발생한 센서 데이터 메시지를 스트리밍받아 볼 수 있다.

라즈베리파이 작업관리자를 확인해 보면, 카프카 실행상태에서 리소스 사용은 CPU 5-20%, 메모리 662MB/874MB이였으며, 다음과 같았다.

거리센서 설치 및 코딩
거리센서 HC-SR04를 아래와 같이 연결한다. 카프카 safety 주제(토픽)으로 data를 생성해본다.
회로 연결

RPi의 GPIO 라이브러리를 이용해 아래와 같이 dist.py파일을 코딩한다.
from kafka import KafkaProducer
from json import dumps
import RPi.GPIO as GPIO                    
import time                       
  
producer = kafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda x: dumps(x).encode('utf-8'))
       
GPIO.setmode(GPIO.BCM)     #Set GPIO pin numbering 

trig = 23                                  
echo = 24                                  

GPIO.setup(trig,GPIO.OUT)                 
GPIO.setup(echo,GPIO.IN)                   

while True:
  GPIO.output(trig, False)                 
  time.sleep(1)                            

  GPIO.output(trig, True)                  
  time.sleep(0.00001)                      
  GPIO.output(trig, False)       
     
  while GPIO.input(echo)==0:          
    start = time.time()          
  while GPIO.input(echo)==1:            
    end = time.time()               

  travelTime = end - start 
  distance = travelTime * 17150        
  distance = round(distance, 2)            
  
  data = {'dist': distance}
  producer.send("safety", value = data)
     print "D = ", distance,"cm"

그리고, dist.py를 실행한다.
python dist.py

정상 실행되면 아래와 같은 데이터 출력을 확인할 수 있다. 같은 방식으로 카프카 CONSUMER에서 데이터를 확인할 수 있다.

부록: 별도 카프카 서버 구축, 라즈베리파이 IoT 센서 데이터 스트리밍 및 성능 테스트
카프카 환경 설정
무선이나 유선 네트워크가 가능한 환경에서, 별도 컴퓨터에 카프카를 앞의 내용을 참고해 설치한다. 시스템 구조는 다음과 같다.
카프카를 설치한 컴퓨터에, 앞서 설명한 server.properties 파일의 아래 내용을 해당 컴퓨터 IP주소에 맞게 수정한다.
listeners=PLAINTEXT://IP주소:9092
advertised.listeners=PLAINTEXT://IP주소:9092

주키퍼와 카프카 서버를 각각 터미널에서 실행한다.
c:\kafka\bin\windows\zookeeper-server-start.bat ../../config/zookeeper.properties
c:\kafka\bin\windows\kafka-server-start.bat ../../config/server.properties

센서 데이터 획득 및 빅데이터 생성 코딩
라즈베리파이에서 앞에 설명한 초음파 거리센서 예제를 참고해, 파이썬으로 다음과 같이 코딩한다. 카프카 서버 IP주소는 앞의 IP주소와 동일해야 한다.
from kafka import KafkaProducer
from json import dumps
import RPi.GPIO as gpio
import time

producer = KafkaProducer(bootstrap_servers=['카프카 컴퓨터 서버 IP주소:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'))

gpio.setmode(gpio.BCM)
trig = 23
echo = 24

gpio.setup(trig, gpio.OUT)
gpio.setup(echo, gpio.IN)

index = 0
while True:
  gpio.output(trig, 0)      # 0.01초 마다 센서 데이터 생성 및 전송
  time.sleep(0.01)
  gpio.output(trig, 1)
  time.sleep(0.00001)
  gpio.output(trig, 0)

  while gpio.input(echo) == 0:
    start = time.time()
  while gpio.input(echo) == 1:
    end = time.time()

  travelTime = end - start
  distance = travelTime * 17150 # / 58.0
  index = index + 1
  data = {'dist': [index, distance]}
  producer.send("IoT", value = data)
  print index, 'D = ', round(distance, 2), 'cm'

코딩 후 이 파이썬 프로그램을 실행한다. 

카프카 서버에서 consumer를 실행해 네트워크를 통해 데이터가 제대로 전달되는 지를 확인한다. 다음과 같이 카프카 서버와 라즈베리파에서 실행되면 성공한 것이다.
카프카 서버 실행 모습
라즈베리파이 센서 데이터 획득 및 카프카 서버에 센서 데이터 스트림 네트워크 전송 모습

카프카 데이터 스트리밍 성능 테스트
카프카 메시지 스트리밍 성능 테스트를 위해 센서 데이터 전송을 초당 100개로 설정해 다음과 같이 확인해 보았다. 
라즈베리파이 센서 값

인터넷 네트워크 회선에 큰 문제가 없다면, 카프카 서버에서 실시간 수준으로 데이터 스트리밍 처리하는 것을 확인할 수 있다. 데이터 패킷 크기가 50 bytes 인 경우, 초당 5K가 처리되는 수준이다. 좀 더 부하를 주기위해 초당 1000개 토픽을 생성해 전송하였고, 이때도 큰 무리 없이 실시간 수준으로 스트리밍처리되었다. 이 경우, 초당 50K가 처리되는 수준이다.

좀 더 명확한 성능 측정을 위해서는 다음 링크의 성능 분석 매트릭스 및 데쉬보드 도구를 설치해 확인할 수 있다.
카프카 데이터 패킷 구조에 대해 궁금하다면 다음 링크를 참고한다.
부록: 카프카, 스파크, NoSQL 빅데이터 처리 아키텍처
카프카 서버에 전송된 데이터는 스트리밍되어 다른 어플리케이션이나 컴퓨터에서 고속으로 데이터를 전송받을 수 있다. 이 데이터를 Mongo DB같은 NoSQL DB에 저장하고, 스파크 등으로 빅데이터 분석을 수행할 수 있다. 이에 대한 내용은 아래를 참고한다.
다음은 이를 고려한 빅데이터 처리 아키텍처 예시이다.
Big data process architecture framework(Data Science: Third International Conference of Pioneering Computer, Springer, 2017)


부록: RPi 카메라 설치
라즈베리파이 카메라를 RPi와 연결하고, 다음과 같이 카메라 설정한다.
sudo raspi-config

UI메뉴에서 5번 setting interfaces, 1번 Setting camera 를 선택해 카메라를 활성화한다. 이제 다음 명령을 입력한다.
raspivid -o video.h264 -t 10000 -w 640 -h 480 -p 0,0,640,480

그럼 다음과 같이 카메라 화면이 나타난다.

파이썬 OPENCV를 이용해 객체 감지 후 해당 메시지를 앞에서 설명한 것과 같은 방식으로 카프카 메시지로 스트리밍 서비스할 수 있다.

부록: 라즈베리파이 기반 카프카 클러스터 서버 생성
주키퍼 설정
다음과 같이 주키퍼 클러스터 설정용 conf/zoo.cfg을 수정한다.
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/zookeeper # Don't put under /tmp, it will be deleted.
clientPort=2181
server.1=192.168.0.18:2888:3888
server.2=192.168.0.15:2888:3888
server.3=192.168.0.16:2888:3888

각 RPi의 /var/zookeeper 아래 myid 파일에 설정 파일을 복사한다.
그리고 zookeeper 루트 폴더아래 ./bin/zkServer.sh start 를 입력해 주키퍼 서비스를 시작한다. 주키퍼가 적절히 실행된다면, 다음처럼 프롬프트에 보일 것이다.
pi@raspberrypi-16:~/apache/zookeeper-3.4.11 $ ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/pi/apache/zookeeper-3.4.11/bin/../conf/zoo.cfg
Mode: follower # (or leader)

카프카 설정
주키퍼 클러스터가 실행 되고 있으면, 이제 아파치 카프카를 RPi에 배포한다.

다음 같이, config/server.properties 파일을 수정한다.
broker.id=1 # 1/2/3 for each card
port=9092
host.name=192.168.0.16 # IP address
zookeeper.connect=192.168.0.18:2181,192.168.0.15:2181,192.168.0.16:2181

bin/kafka-server-start.sh 를 갱신한다.
export JMX_PORT=${JMX_PORT:-9999}
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M" # Otherwise, JVM would complain not able to allocate the specified memory.

bin/kafka-run-class.sh 를 갱신한다.
KAFKA_JVM_PERFORMANCE_OPTS="-client -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true" # change -server to -client

카프카를 시작한다.
./bin/kafka-server-start.sh config/server.properties &

카프카가 적절히 시작되고 있다면, 다음과 같이 다른 콘솔 터미널에서 producer 를 실행해 본다.
./bin/kafka-console-producer.sh — broker-list 192.168.0.15:9092,192.168.0.16:9092,192.168.0.18:9092 — topic v-topic # create a topic
./bin/kafka-console-producer.sh --broker-list 192.168.0.15:9092,192.168.0.16:9092,192.168.0.18:9092 --topic v-topic # kick off a producer

카프카 consumer를 실행한다.
./bin/kafka-console-consumer.sh — zookeeper 192.168.0.16 — topic v-topic from-beginning

레퍼런스
P.S. 최근 천억 규모의 연구과제가 발주되었다. 사실 1억도 매우 큰 돈이다. 국가 연구과제가 이렇게 크면 누수가 많아지고, 인하우스 기술 개발보단 아웃소싱만 많아진다. 몇천만원이라도 10년을 꾸준하게 연구하고 싶은 사람이나 팀에게 지원해 주고 그 연구에만 몰입할 수 있는 환경을 준다면, 선진국 보유 핵심기술이라도 충분히 개발할 수 있다고 생각한다. 대형 설비가 필요해 큰 펀딩이 있어야 하는 경우를 제외한다면, 이런 펀딩은 핵심기술은 겨녕 아웃소싱 연구행정관리만 하다 아까운 시간 낭비할 수 있다. 홍보하기는 좋겠지만, 지속적이지 않고, 전문성은 더욱 떨어질 수 있다. 간만 보다 끝날 수 있다고 생각한다.

2020년 3월 8일 일요일

깃허브(github) 개념 및 사용법 정리

이 글은 협업을 통한 개발 시 많이 사용되는 깃허브(github) 명령어 기록 겸 사용법 간단히 정리한다. github는 2018년 6월 마이크로소프트에 8조원(75억 달러)으로 인수된 개발자 협업 및 형상관리(configuration management) 플랫폼이다. 톰, 크리스, 피제이 개발자 3명이 2007년부터 개발을 시작해 전세계에 서비스되었다(상세).

깃허브 가입 및 설치
깃허브는 명령행 인터페이스 프로그램과 깃허브 웹사이트를 통해 형상 관리를 지원한다.

깃허브 사용을 위해 다음 링크 클릭해 가입하고, 깃허브 프로그램을 설치한다.
깃허브 형상관리 개념
형상관리는 소프트웨어나 문서의 변경사항을 체계적으로 추적 관리하는 것이다. 깃은 형상관리 플랫폼이다. 다음 그림은 새 소스파일을 생성해 깃(git) 저장소에 저장하고 협업하는 과정을 보여준다. git은 크게 4개 git 관리 파일 DB를 가진다. git은 각 DB들을 갱신하며 파일 형상을 관리한다.
  • workspace: 컴퓨터 로컬 작업 폴더
  • staging: 컴퓨터 로컬 폴더 내 파일 인덱스 DB
  • local repository: 컴퓨터 로컬 폴더 내 파일 저장소
  • remote repository: 협업용 원격 깃 저장소
git process(Diego C Martin)

깃 프로세스는 다음과 같다. 이 과정에서 파일이 추가, 변경, 삭제된 이력과 내용은 모두 깃허브에 기록된다. 이를 통해, 파일 내용을 특정 버전으로 되돌리거나(roll back), 다른 사람이 만든 파일과 병합(merge)하는 형상관리가 가능해진다. 앞의 그림은 이 과정을 잘 보여준다.
  1. 로컬 폴더 생성: 프로젝트 파일이 저장될 폴더를 생성한다.
  2. 깃 저장소 생성: 온라인 협업 작업을 위한 원격 작업할 깃 저장소를 생성한다.
  3. 로컬과 깃 저장소 연동: 원격 협업 작업을 위해 깃 저장소 연동 정보를 생성한다. 
  4. 파일 생성 및 깃 저장소 추가(add): 파일 작업 후 해당 파일을 로컬 폴더 인덱스에 추가한다.
  5. 파일 커밋(commit)/푸시(push): 작업한 파일들을 커밋해 깃 저장소에 추가하고, 실제 파일 내용을 푸시하여 깃 저장소에 저장한다.
  6. 로컬 폴더 파일 갱신(PULL): 다른 사람들이 푸시한 깃 저장소 최신 버전 파일들을 로컬 폴더에 다운로드 받아 동기화한다.
저장소 생성, 파일 형상관리
깃허브 로긴 후 개발 프로젝트 폴더에 해당하는 저장소(repository)를 생성한다.
  • Repository 생성. README 생성 옵션은 체크하지 않음
  • 로컬에 개발 폴더 생성
  • 로컬 개발 폴더 내에 source.py 파일 생성 및 내용 입력
  • git init 로 로컬 폴더 git 설정
  • git add source.py 로 파일 인덱스를 추가함
  • git commit -m "source.py" 로 파일 정보를 github에 저장
  • git status 로 상태 확인
  • 저장소 주소를 다음과 같이 이름과 연결. 주소와 이름은 적절히 수정할 것.
git remote add remoteName https://github.com/username/repositoryName
  • git push remoteName master 로 현재 커밋 파일을 저장소에 저장
git push 시 아이디 및 암호 입력 화면
git push 화면
다시 source.py를 수정하하면, 다음 과정을 다시 거쳐야 한다.
  • git add source.py
  • git commit -m "source.py"
git 저장소가 달라졌으면, 로컬을 동기화하기 위해 다시 git pull origin master 수행해야 한다.

브랜치 생성 및 사용
브랜치는 일정 기간 동안만 유지되는 개발 시 사용한다. 브랜치 사용법은 다음과 같다.
  • git branch 로 브랜치 목록 보기
  • git branch sub1 브랜치 만들기
  • git checkout sub1 로컬 저장소의 브랜치 전환
  • git branch
  • git add source.py
  • git commit -m "source.py"
  • git push remoteName sub1
  • git checkout sub1
  • git pull
브랜치 병합은 다음과 같다. 
  • git checkout master 로컬 저장소 브랜치 전환
  • git merge sub1 병합
  • git push remoteName master 저장
브랜치 삭제는 다음과 같다. 
  • git branch -d sub1 
  • git branch
Rollback 하기
현재 파일을 수정하다, 깃 저장소에 최신 파일로 되돌리고 싶을 때 다음 명령어를 사용한다.
  • git reset --hard {commit번호} : 특정 커밋으로 되돌리고, 이후 버전은 히스토리에서 삭제됨
  • git reverse {commit번호}: 특정 커밋으로 되돌리고, 이후 버전 이력은 히스토리에 남아 있음
  • git checkout . : 로컬 작업 폴더에서 수정한 모든 파일을 git add 이전의 현재 버전으로 되돌림
깃허브 에러 대처 방법
git 진행 시 에러는 다음과 같이 해결한다.
  • Updates were rejected because the remote contains work that you do not have locally: 저장소 생성 시 README를 체크하지 말거나, git pull 로 해결. 예) git pull origin master
  • Updates were rejected because the tip of your current branch is behind: git pull 할 때 --allow-unrelated-histories 옵션을 추가.
git 에러 화면 예시

레퍼런스

OpenCV 기반 형상 유사도 비교 방법

OpenCV는 컴퓨터 비전에 필요한 알고리즘 대부분이 잘 구현되어 있는 오픈소스 라이브러리이다. 이 글은 OpenCV를 이용해 Hu-Moment 기반 형상(모양) 유사도를 비교하는 방법을 간단히 설명한다.

형상 유사도와 모멘트 정의
예를 들어, 서로 다른 모양의 두개의 형상이 있다고 가정해 보자. 이 두개 형상의 유사한 정도는 어떻게 비교할 수 있을까? 이 글은 모양의 모멘트(moment)를 이용해 회전, 이동, 스케일에 독립적인 특성을 계산해 형상을 비교하는 방법을 소개한다.

모양의 모멘트는 모양을 이루는 각 픽셀의 강도에 대한 가중 평균이다. 예를 들어, 흑백 컬러를 가진 모양 I를 가정한다. x, y지점의 픽셀의 강도는 I(x, y)로 표현한다. I(x, y)는 0 혹은 1을 가질것이다. 이때 모엔트는 다음과 같다. 
M은 모든 픽셀 강도의 합이다. 다른말로, M은 강도에만 기반한 값이다. M은 모양의 회전에 불변인 값이 된다(RI. rotation invariant). 다음 그림을 보면, 두개의 A에 대한 M은 서로 같음을 알 수 있다. 하지만, M(C)는 다를것이다. M은 픽셀 강도의 합이므로 형상의 질량이라 볼 수 있다.
다음은 raw moment를 정의한다. 여기서, i, j는 0, 1, 2와 같은 정수값이된다. 이 정수값은 모양 질량의 중심을 계산할 때 사용한다.


다음 x̄, ȳ 는 모양 질량의 중심이라 정의한다.

이제 중심 모멘트(central moment)를 정의해보자. 이는 raw moment와 매우 유사하다. 이는 x, y 위치에서 질량 중심 x̄, ȳ 를 뺀 수치를 사용한다. 이렇게하면 이동에 불편이 된다(TI. translation invariant). 


여기에 다음 수식을 적용해 스케일에 불변인 정규화된 중심 모멘트(normalized central moments)를 얻는다(SI. scale invariant).
이제 유사도를 비교할 Hu Moment를 정의한다. Hu Moment는 7개 식으로 구성된다. 첫 6개 모멘트는 이동, 스케일, 회전 및 반사에 불편인 값을 제공한다. 마지막 7번째 모멘트는 반사에 따라 변경되는 부호이다. 

OpenCV 이용한 Hu Moments 및 유사도 계산
OpenCV는 Hu moment 계산함수를 다음과 같이 제공한다. 
이 함수는 이미지의 Hu Moment를 계산한 후, 유사도값을 리턴하는 matchShapes 함수에 사용된다. Hu moments는 log 변환을 통해 좀 더 계산하기 좋은 값으로 변환된다.

이 H값들을 이용해 유사도 비교를 위한 거리 함수 D(A, B)를 정의한다. A, B는 형상이다. OpenCV는 아래와 같이 3개의 거리 함수를 제공한다.
  • CONTOURS_MATCH_I1 
  • CONTOURS_MATCH_I2
  • CONTOURS_MATCH_I3
다음은 이를 이용한 이미지 형상 유사도 비교 파이썬 코드이다. fileList 배열에 가지고 있는 이미지 파일명으로 수정하면, 각 이미지 파일의 유사도 비교 결과를 확인할 수 있다.
import cv2
import numpy
print('OpenCV = ', cv2.__version__)

fileList = ["image1.jpg", "image2.jpg", "image1.jpg"]

imgList = []
for file in fileList:
img = cv2.imread(file, cv2.IMREAD_GRAYSCALE) 
imgList.append(img)

contours = []
for im in imgList:
ret, imgBin = cv2.threshold(im, 127, 255,0)
conts, hierarchy = cv2.findContours(imgBin, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
contour = conts[0]
contours.append(contour)

print("Shape Distances Between \n-------------------------")
index = 0
for contour in contours:
m = cv2.matchShapes(contours[0], contour, 1, 0)
print("{0} and {1} : {2}".format(fileList[0], fileList[index], m))
index = index + 1
cv2.waitKey()

입력 이미지는 다음과 같다. 

계산 결과는 다음과 같다. 십자모양과 같은 이미지는 유사도가 0이고, 기울어진 십자모양도 0.01로 거의 유사하다. 나머지는 차이가 있는 것을 알 수 있다.

2020년 3월 2일 월요일

아두이노 센서 데이터의 빅데이터 처리 개발 방법

이 글은 아두이노 센서 데이터의 빅데이터 처리 개발 방법을 간단히 소개한다.

시스템 아키텍처(Gabriel, 2017, hackster.io)
제플린 노트북 빅데이터 K-means 분석 결과(Gabriel, 2017, hackster.io)
온습도 분포 분석 결과(Gabriel, 2017, hackster.io)

이 사례는 아두이노 우노, ESP8266, DHT11 온습도 센서를 사용해 데이터를 취득하여, 카프카(kafka) 서버로 발행하고, Hive 데이터 저장소에 저장된 후, Zeppelin에서 스파크(Spark) 코드를 실행해 빅데이터 분석을 수행한다.
사용된 온습도 센서 회로

분석된 빅데이터 결과는 MongoDB, Cassandra DB등을 사용해 저장한다.

레퍼런스
아두이노나 NodeMCU 센서 데이터를 MongoDB에 저장하는 예시는 다음을 참고한다.

아파치 Kafka, MongoDB, 파이썬 기반 빅데이터 메시지 스트리밍 서버 개발

이 글은 분산 메시지 스트리밍 플랫폼 아파치 Kafka(카프카) 설치 후 사용하는 방법을 간단히 설명한다. 카프카는 대용량 메시지 처리, IoT 센서 데이터, 빅데이터 처리 등에 MongoDB와 함께 사용되는 경우가 많다. 참고로, 카프카 개발팀은 스타트업 Confluent 사를 설립했는 데, 시리즈 D에서 1억 2500만달러를 투자받았고, 자산가치는 25억달러로 평가받고 있다(참고). 현재, 카카오, 네이버 라인 등 많은 회사에서 카프카를 메시지 미들웨어 핵심 엔진으로 사용하고 있다.

분산 메시지 스크리밍 플랫폼 카프카 
카프카는 대용량 메시지 발행-구독 서버로 사용된다. 아파치 소프트웨어재단이 2011년 오픈소스로 공개한 카프카는 비즈니스 구인구직 소셜 네트워크 서비스인 링크드인(LinkedIn) 수석 엔지니어 제이 크렙스(Jay Kreps)가 개발했다.

많은 웹 서비스가 다음 그림과 같이 메시지 발행(producer)-구독(consumer) 패턴을 가지는 데 카프카는 이런 메시지를 큐(queue) 자료구조로 관리하며, 빠른 속도로 메시지 데이터를 저장한 후 사용할 수 있는 기능을 제공한다.
카프카 발행-구독 서버(Apache Kafka documentation)

토픽(topic. 주제)은 다음과 같은 토픽별 파티션(partition)에 저장된다.
카프카 주제(토픽) 생성-소비 및 파티션 구조(https://www.cloudera.com/documentation/kafka/1-2-x/topics/kafka.html)

토픽이 만들어지면, 데이터는 파티션에 다음 같은 큐(queue) 형식으로 기록된다. 이를 생성된 순서대로 각 소비자가 읽을 수 있다.
카프카 메시지큐(www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client)

다음은 카프카 파티션에 저장되는 메시지 큐 자료 구조를 좀 더 명확히 보여준 것이다. 그림에서 offset은 소비자(consumer)가 현재 읽고 있는 메시지를 참조할 때 사용되는 참조번호가 된다. 소비자가 토픽의 메시지를 읽을 때 마다, current position 은 하나씩 증가하고, 얻어온 데이터는 청크 저장소(Chunk store)에 다음 그림과 같이 저장된다.

카프카 구조는 간단하나, 카프카 개발팀은 데이터 스트리밍을 고속으로 처리하기 위해, 이런 자료 구조를 메모리 상에서 관리하며, 적당할 때 하드디스크로 고속 저장(swap)하고 불러오는 과정을 효율적으로 개발했다.

고가용성 분산 코디네이션 지원 ZooKeeper
카프카는 고가용성을 위해 분산 코디네이션 기능을 지원하는 아파치 ZooKeeper(주키퍼) 와 함께 사용된다. 주키퍼는 쓰기 동작 시 분산된 카프카 클라리언트들 간 동기화 처리를 해준다.
주키퍼 쓰기 요청 시 동작(Zookeeper: A Wait-Free Coordination Kernel)

주키퍼 클라이언트들은 ZNode 단위로 관리되어 데이터 동기화 처리를 수행한다.
ZooKeeper znodes(Zookeeper: A Wait-Free Coordination Kernel)

빅데이터 처리를 위한 카프카와 주키퍼
각종 센서 및 다양한 말단에서 수집되는 대용량 데이터를 적절히 분산 저장 및 관리하기 위해 앞서 설명한 카프카가 사용되는 경우는 많다. 카프카는 분산 처리를 위해 다음 그림처럼 주키퍼를 함께 사용한다.

카프카의 효율적인 디스크 쓰기 알고리즘으로 인해 다음 그림과 같이 메시지 처리 성능은 매우 좋다고 알려져 있으며, 네이버 LINE 등에서 초당 4 GB 메시지 처리를 지원하는 핵심인 컴포넌트로 사용되고 있다(참고). 

윈도우에서 카프카 설치하기
다음과 같은 순서로 설치한다.
윈도우 버전은 bin/windows 폴더 내에 실행 배치파일이 있다. 이제 다음과 같이 각 콘솔에서 주키퍼 서버를 실행한다. 실행 시 설치 경로는 적절히 수정한다.
c:\kafka\bin\windows\zookeeper-server-start.bat ../../config/zookeeper.properties

다음과 같이 카프카 서버를 실행한다. 
c:\kafka\bin\windows\kafka-server-start.bat ../../config/server.properties

이제 카프카 서버에 producer를 이용하여 토픽 메시지를 발행한다. 다음을 실행하고, 메시지를 입력해 본다.
c:\kafka\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test20200316

다음과 같이 메시지 consumer를 실행하면, 앞서 입력된 메시지가 구독되는 것을 확인할 수 있을 것이다. 
c:\kafka\bin\windows>kafka-console-consumer.bat

MongoDB, 카프카 활용 파이썬 코딩 
MongoDB를 여기서 다운로드 받아 설치한다. 그리고, 다음과 같이 pip 로 패키지를 설치한다.
pip install kafka-python
pip install pymongo

kafka_server.py 파일을 아래와 같이 코딩한다.
from time import sleep
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: dumps(x).encode('utf-8'))

for e in range(1000):
    data = {'number' : e}
    producer.send('numtest', value=data)
    sleep(5)

kafka_consumer.py 파일을 아래와 같이 코딩한다. auto_off_reset에 earliest 로 설정해 커밋된 최신 오프셋에서 메시지 기록을 읽기를 시작한다. 만약, latest로 설정하면 로그 끝에서 읽기 시작할 것이다. enable_auto_commit을 true로 하면 소비가자 매 간격마다 읽기 오프셋을 커밋하게 된다. group_id는 소비자가 속한 그룹을 정의한 것이다. value_deserializer는 데이터를 json 형식으로 변환한다.
from kafka import KafkaConsumer
from pymongo import MongoClient
from json import loads

consumer = KafkaConsumer(
    'numtest',
     bootstrap_servers=['localhost:9092'],
     auto_offset_reset='earliest',
     enable_auto_commit=True,
     group_id='my-group',
     value_deserializer=lambda x: loads(x.decode('utf-8')))

client = MongoClient('localhost:27017')
collection = client.numtest.numtest
     
for message in consumer:
    message = message.value
    collection.insert_one(message)
    print('{} added to {}'.format(message, collection))
   
콘솔창에서 다음과 같이 각각 실행한다.
python kafka_server.py
python kafka_consumer.py

그럼 다음처럼 5초마다 'numtest'토픽이름으로 데이터가 생성되어, consumer에게 전달되는 것을 확인할 수 있다.
카프카 생성 메시지 pub-sub 

MongoDB를 실행해 database를 연결하면, 다음과 같이 kafka에서 생성된 데이터가 저장된 것을 확인할 수 있다.
몽고DB의 카프카 생성 데이터 뷰

이런 방식으로 IoT 센서, 메신저, SNS 메시지 등 대용량으로 생성되는 빅데이터를 카프카에서 모아, NoSQL DB로 저장하거나, 스파크(spark)로 분석해 NoSQL DB로 저장하는 등의 작업을 쉽게 처리할 수 있다.

기타 명령 
  • 토픽 생성 명령: kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic <topic name>
  • 토픽 목록 확인: kafka-topics.bat --list --zookeeper localhost:2181
마무리 
카프카는 IoT 장치, node-red 와 같은 데이터 토픽을 발생하는 데이터 스트리밍 파이프라인을 구성하는 데 중요한 역할을 할 수 있다. 카프카는 빅데이터 분석을 지원하는 스파크(spark)와도 연동되어 사용되기도 한다. 카프카는 고가용성과 대용량 데이터 분산처리가 필요한 경우에 효과적이다. 아울러, 파이썬, nodejs 등을 이용해 카프카, 스파크 등을 사용할 수 있어 편리하다.

참고: 주키퍼 설정 수정
압축 해제한 카프카 폴더의 config 폴더에서 다음과 같이 해당 파일을 수정한다. 
# zookeeper.properties
# The directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

# server.properties
############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

레퍼런스