Kinesis Data Streamsをpythonとgolangで使ってみる

スポンサーリンク

Amazon Kinesis Data Streams (KDS)とは

  • Amazon Kinesis Data Streams
  • Amazon Kinesis Data Streams (KDS)については、AWSが提供するデータストリーミングサービスです。
  • Amazon Kinesis Data Streamsを理解する上で必要不可欠な概念が3つあります。
    • Producer
    • Data Stream
    • Consumer

  • それぞれについて軽く説明していきたいと思います。

Producer

Producerとは、Kinesis Data Streamsに対して、PutRecordする側のプログラム、ソフトウェアの事です。
簡単に言うと、DataStreamに対してデータを送信する側の事です。
AWS公式のaws-kinesis-agentやfluentdのaws-fluent-plugin-kinesisプラグインやユーザがaws-sdkを利用して作成したアプリケーション等が当てはまります。

Data Stream

Data Streamは、Producerから送信されたデータイベントを一時的に保存して、データを取得する側のConsumerに対してデータを渡すAPIを提供します。
(あくまでデータを取得しに来るのはconsumerでkinesisは送信しない)
イメージとしては、Data Streamはメッセージキューのようなもので、先にPutされたデータからStreamに格納されます。
格納されたデータは、Scanを行って先頭からすべてのデータを取得することもできます。
また、各シャードのIteratorを利用して、ある位置のデータから末尾のデータなど範囲を指定してデータを取得する事もできます。
基本的には、Consumer側で、PutRecordされるタイミングをポーリングしておき、PutRecordが起きたら処理を行うか、
ある一定期間のデータをバッチ処理的な形で処理するみたいです。
Data Streamに保存されるデータの永続性はなく、デフォルトで24時間で7日まで延長することができます。(要課金)
Data Streamは、複数のシャードに分ける事ができ、データのパーティションキーによって処理されるシャードが決定されます。

Consumer

Consumerは、Kinesis Data Streamsに対して、GetRecordする側のプログラム、ソフトウェアの事です。
つまり、Data Streamに対してデータを参照して処理する側の事です。
例えば、Data StreamにPutされたデータを加工してDynamoDBにPutしたりするのがConsumerの役割です。
Data Streamのデータを可視化ツールが処理できるように加工してDBやS3等に保存するのが一般的なConsumerみたいです。
これには、Amazon Elasticsearch Serviceやaws-sdk,Kinesis Client Libraryを利用して作成したアプリケーション等が当てはまります。

Consumerには、AWSのサーバーレスでコードを実行できるLambdaが最適です。
理由としては、Data Streamに対してProducerからPutRecordされたのを検知して、自動的にLambdaを起動してConsumerのプログラムを実行してくれるためです。
(Lambdaのイベントハンドラーとしてkinesisを指定する事ができる)
普通のEC2等を利用してConsumerを実装する場合は、Data StreamをポーリングしてPutRecordを検知するという実装が必要になります。(それか、定期実行されるか)
(Java製のKinesis Client Libraryを利用すればすこし楽になる)

よくある組み合わせ

  • よく大量のログを集約して分析したい場合は以下の構成を取るケースが多いようです。
fluentd → Kinesis Data Stream(firehose) → Elastic Search → Kibana

今回やること

  • 今回は、json形式のデータを以下の流れで扱います。
Producer(Python3.6) → Kinesis Data Stream → Consumer(Golang)
  • Producerの実装にはPythonを利用
  • Consumerの実装にはGolangを利用
  • サンプルコードはこちら

手順

  1. Kinesis Data StreamでStreamの設定
  2. PythonでProducerの実装
  3. GolangでConsumerの実装

Lambdaとの連携はせずに,ローカル端末でProducerとConsumerを実装します。
(ターミナルでデータの入力と表示を行うため)

Kinesis Data StreamでData Streamを作成する

  • 事前に,awscliを設定しておいてください。
  • Streamの作成
$ aws kinesis create-stream --stream-name <StreamName> --shard-count 1

# 例
$ aws kinesis create-stream --stream-name Hoge --shard-count 1
  • たったこれだけでData Streamが作成できます!

Producerの作成(Python 3.6)

  • ターミナルから入力した値をPersonクラスに入れて、json形式でKinesis Data StreamにPutRecordします。

Consumerの作成(Go 1.10)

  • 1秒に1回Kinesis Data Streamに、GetRecordを行いデータがあれば、標準出力に表示します。
  • データがない場合は、1秒間waitしてから再度GetRecordを行います。

サンプルコードの実行

  • Consumerの実行
$ go run consumer.go
........................
  • Producerの実行
$ python producer.py
Please Enter Name:
  • Producer側で適当な名前を入力する
$ python producer.py
Please Enter Name: Alice
Success!! shardId-000000000000 49590252322500397384462374421513376095705325676485672962
  • Consumer側にデータが表示されるはす….
........................
Alice
  • Producer → Kinesis Data Stream → Consumer でデータが流れていることが確認できました!
  • Kinesisのコンソールを見てみてみます。
  • PutRecordされているみたいです!

まとめ

今回は、PythonとGolangでProducerとConsumerを実装して、Kinesis Data StreamにPutRecordとGetRecordしてみました。
サンプルコードを作成することで、それぞれの役割(Data Stream ,Producer, Consumer)が理解できました。
それぞれの役割が粗結合していることで、データを非同期で効率よく処理できるみたいですね。
また、PutRecordする際にKinesis Data Streamのキャパシティを上回るデータを送ったり、適切にシャーディングされていないとデータがロストされることもあるようなので、Kinesisの仕様を確かめて実装するのが良さそうです。
また、触る機会があったら投稿したいと思います!

タイトルとURLをコピーしました