마크베이스 사용방법


마크베이스 V5 와 Apache Kafka 연동

작성자
machbase
작성일
2019-03-05 18:11
조회
156

이 문서는, Apache Kafka 에서 생성되는 스트림 데이터를 마크베이스 V5 에 데이터를 넣기 위한 과정을 설명한다. 마크베이스는 폭증하는 실시간 데이터 저장과 조회를 목적으로 하기 때문에, 실시간 스트리밍 이벤트 데이터에 대한 수요 역시 존재한다. 대표적인 스트리밍 플랫폼인 Apache Kafka 를 통해 입력받을 수 있는 방법이 없는지 문의하는 고객 또한 많으므로, 이번 기회를 통해 단계별로 사용 방법을 설명해 보고자 한다.


아래 그림은 Apache Kafka 와 그 사이에 있는 Producer / Consumer 프로그램, 그리고 Consumer 가 수신받은 데이터를 입력받을 마크베이스 서버의 그림을 도식화한 것이다. 이 문서에서는, 마크베이스는 이미 설치되어 있다고 가정하고 Apache Kafka 의 설치를 간략하게 언급한 다음, 데모 환경을 위해 이벤트 데이터를 만들어 넣는 방법과 함께 C 언어로 된 Consumer 코드를 작성해 보도록 한다.



 


Apache Kafka 설치와 테스트


Apache Kafka 설치는 이 링크를 통해 진행할 수 있다. 단계별로 진행할 수 있으므로 쉽게 서비스 구동까지 따라할 수 있다.



  1. Kafka 를 다운로드 받아 압축을 푼다.

  2. 곧바로 서버를 구동한다. (여기서는 Apache Zookeeper 이다)


이후의 단계들은 마크베이스 데모를 위해, 이 문서에서 개별 설명하도록 한다.


토픽 (Topic) 생성


먼저, 토픽 (Topic) 을 생성한다. Kafka 에서의 토픽은, 하나의 이벤트 스트림을 의미한다.


tag 라고 이름 붙인 토픽을 아래와 같이 생성한다.
여기서 Zookeeper 서버는 localhost:2181 이라고 가정한다.


$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic tag

아래 명령을 통해 토픽 생성이 잘 되었는지 확인할 수 있다.


$ bin/kafka-topics.sh --list --zookeeper localhost:2181

메시지 생성


kafka-console-producer.sh 를 통해, 토픽에 이벤트 데이터 (=메시지) 를 넣어 볼 수 있다. 아무런 메시지나 입력해 보도록 하자.
여기서 Kafka Broker 서버는 localhost:9092 이라고 가정한다.


$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic tag
> This is a message
> This is another message

메시지 수신


kafka-console-consumer.sh 를 통해 메시지 수신이 잘 이뤄지는지부터 확인해 보자.


$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tag --from-beginning

이렇게 입력하면, 곧바로 Producer Console 에서 입력한 내용 그대로 수신되는 것을 확인할 수 있다.


This is a message
This is another message

마크베이스와 연결


마크베이스 서버는 이미 설치되어 있고 구동 중이라고 가정한다. 마크베이스 서버의 접속 정보는 다음과 같이 기본값이라고 가정한다.



  • HOST : localhost

  • PORT : 5656


예제 프로젝트 준비


Consumer 프로그램 작성을 위해서는 librdkafka 라는 라이브러리가 필요하다.


librdkafka 는 Producer/Consumer 를 모두 지원하는 Apache Kafka protocol 을 구현한 C 라이브러리이다. 메시지 전송 안정성을 지향하면서도 고성능의 라이브러리이므로, Producer 에서는 초당 1백만건의 메시지를, Consumer 에서는 초당 3백만건의 메시지를 전달받을 수 있도록 설계되었다고 한다.


하지만 librdkafka 를 참고해서 샘플 코드를 직접 작성하려면 굉장히 힘들므로, 마크베이스 Github 에 있는 Kafka Example 프로젝트 를 내려받는다.


git checkout https://github.com/MACHBASE/kafka_example
cd kafka_example
make all

make 가 잘 된다면, kafka_to_machbase 바이너리가 생성된다.


마크베이스 : 테이블 준비


먼저, Consumer 에서 입력받을 테이블을 준비한다.


마크베이스 V5 부터는 태그 테이블 (Tagdata Table) 을 지원하므로, 여기에 입력해 보도록 한다. 
해당 쿼리 구문은, repository 에서 query/create.sql 에 위치해 있다.


CREATE TAGDATA TABLE tag (name VARCHAR(20) PRIMARY KEY, time DATETIME BASETIME, value DOUBLE SUMMARIZED);


태그 데이터 생성


repository 디렉토리에 있는 tag/gen.py 를 사용해서 tag.csv 를 만든다.


python gen.py ${COUNT} > /path/to/kafka/install/tag.csv


COUNT 값은, 생성하고자 하는 태그 데이터 개수를 입력한다. (예제에서는 10000 을 입력했다.)
tag.csv 는, Kafka 설치 디렉토리에 위치할 수 있도록 한다.


Consumer 프로그램 구동


Consumer 프로그램을 먼저 구동해야 한다. Repository 디렉토리에 위치한 다음, 다음과 같이 구동한다.


$ ./kafka_to_machbase tag


Producer : CSV 입력


Producer 는 이전 예제와 같이 kafka-console-producer.sh 를 사용한다.
이전 예제와는 달리, 실제 태그 데이터 (=센서 데이터) 가 입력된다고 여길 수 있는 데이터를 입력하는 것이다.


Kafka 설치 디렉토리에 위치한 다음, 아까 생성한 tag.csv 파일을 가지고 다음과 같이 입력한다.


$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic tag < tag.csv


마크베이스 : 입력 확인


다음과 같이 접속해서, 확인해 보자.


SELECT *        FROM tag WHERE name = 'TEST^TAG001' LIMIT 10;
SELECT count(*) FROM tag WHERE name = 'TEST^TAG010';

Mach> SELECT * FROM tag WHERE name = 'TEST^TAG001' LIMIT 5;
NAME TIME VALUE
--------------------------------------------------------------------------------------
TEST^TAG001 2019-01-01 00:00:00 000:000:000 94.405
TEST^TAG001 2019-01-01 00:00:00 100:000:000 11.514
TEST^TAG001 2019-01-01 00:00:00 200:000:000 78.049
TEST^TAG001 2019-01-01 00:00:00 300:000:000 79.428
TEST^TAG001 2019-01-01 00:00:00 400:000:000 59.853
[10] row(s) selected.
Elapsed time: 0.002

Mach> SELECT count(*) FROM tag WHERE name = 'TEST^TAG010';
count(*)
-----------------------
100
[1] row(s) selected.

마치며


지금까지 Apache Kafka 에서 생성되는 이벤트 데이터를 Machbase 서버에 입력하는 과정을 단계별로 진행해 보았다. 실시간 데이터 유형 중 하나인 이벤트 데이터를 Pub/Sub 방식으로 제공할 수 있는 플랫폼인 Kafka 에서 초고속 입력을 가능한 방법을 소개하는 것은, 큰 확장성을 부여하는 것이라고 믿는다.


실제로 Kafka 가 스트림 데이터를 일정 기간동안 보관할 수 있고 다른 Consumer 를 통해 데이터 재분배를 할 수 있기 때문에 클러스터 안정성을 올려줄 수 있다면, Machbase 서버는 Kafka 데이터를 지연 없이 빠른 시간에 입력받아 영구적으로 보관할 수 있는 방법을 제공하기 때문에 서로 공존할 수 있는 그림을 충분히 그려볼 수 있을 것이다.

전체 24
번호 제목 작성자 작성일 추천 조회
24
마크베이스(Machbase)를 Grafana와 연동하기(심화편 1)
machbase | 2019.03.14 | 추천 1 | 조회 113
machbase 2019.03.14 1 113
23
마크베이스 V5 와 Apache Kafka 연동
machbase | 2019.03.05 | 추천 1 | 조회 156
machbase 2019.03.05 1 156
22
마크베이스 V5 : Tutorial 6 (RestAPI with curl)
machbase | 2018.09.18 | 추천 1 | 조회 439
machbase 2018.09.18 1 439
21
마크베이스(Machbase) Tag Analyzer 사용법
machbase | 2018.08.27 | 추천 2 | 조회 590
machbase 2018.08.27 2 590
20
마크베이스 5.0 : Tutorial 4 (실제 데이터를 통한 Tag Analyzer 기초 활용)
machbase | 2018.08.27 | 추천 1 | 조회 385
machbase 2018.08.27 1 385
19
마크베이스 5.0 : Tutorial 5 (Tag Table with RestAPI)
machbase | 2018.08.24 | 추천 0 | 조회 405
machbase 2018.08.24 0 405
18
Machbase 5.0 : 센서데이터 홍수의 유일한 해결책
machbase | 2018.08.21 | 추천 0 | 조회 772
machbase 2018.08.21 0 772
17
마크베이스 5.0 : Tutorial 3 (Real Time Stream Insert : Many Sensors)
machbase | 2018.08.20 | 추천 0 | 조회 543
machbase 2018.08.20 0 543
16
마크베이스 5.0 : Tutorial 2 (Batch Loading : Many Sensors)
machbase | 2018.08.17 | 추천 0 | 조회 434
machbase 2018.08.17 0 434
15
마크베이스 5.0 : Tutorial 1 (Quick Start)
machbase | 2018.08.17 | 추천 0 | 조회 817
machbase 2018.08.17 0 817