1
/
5

MQTTでPublishしてAWS IoTからのKinesis Streamsへ【エンジニアブログより】

技術3課の森です。
春の陽気が来たかと思うと雪が降ったりと難しい季節。
新社会人や入学の季節ということで、引っ越しがピークになってきたのではないでしょうか。
引っ越しと言えば、データを送り出すということで、今回は、MQTTでAWS IoTにPublishしたデータをKinesis Streamsに流してみることをしてみました。


アジェンダ

今回はこのようなものを作っていきます。

  1. AWS IoTの作成
  2. Amazon Kinesis Streamsの作成
  3. IAMロールの作成
  4. MQTTクライアントの作成

MQTTクライアントからQoS1でデータを送信し、AWS IoTで受けた後、アクションでKinesis Streamsへ流すような感じです。


環境構築

必要なAWSリソースの作成を行います。赤字にしたところを作っていきます。

  1. AWS IoTの作成
  2. Amazon Kinesis Streamsの作成
  3. IAMロールの作成
  4. MQTTクライアントの作成

AWS IoT - Thingの作成

1.まずはマネージメントコンソールから「IoT Core」を選択し、「管理」をクリックします。

2.「モノ」の「作成」ボタンをクリックします。


3.「単一のモノを作成する」ボタンをクリックします。


4.「名前」を入力して、モノのタイプを作成する必要があるので、「タイプの作成」ボタンをクリックします。


5.「名前」を入力して、「モノのタイプの作成」ボタンをクリックします。(今回モノのタイプは特に設定しません。)


6.「モノの作成」画面で、「モノのタイプ」が先程作成したものになっていることを確認します。


7.それ以外の項目には何も入れずに「次へ」をクリックします。


8.AWS IoTと通信するための証明書の作成をします。今回はさっと作るため、「1-Click 証明書作成(推奨)」の「証明書の作成」ボタンをクリックします。


9.後で、プログラムでも利用する証明書のダウンロードを行います。「このモノの証明書」「パブリックキー」「プライベートキー」「AWS IoTのルートCA」をダウンロードし、「有効化」ボタンをクリックした後、「ポリシーのアタッチ」ボタンをクリックします。


10.作成した証明書をポリシーにアタッチします。今回は既に作成されている「PubSubToAnyTopic」を利用します。「PubSubToAnyTopic」を選択し、「モノの登録」ボタンをクリックします。


11.「モノ」に作成した「DeviceForBlog」が作成されていることを確認します。


12.ルールの作成を行います。左側のフレームの「ACT」をクリックし、右側の「作成」ボタンをクリックします。


13.ルールの「名前」を入力します。


14.プログラム的にデータは作成していますが、今回は作成するデータを全て表示するようにするため、「属性」に情報を入力し、「トピックフィルター」にも情報を入力します。「条件」は特に指定なしで全てを表示します。


15.AWS IoTで受けた後にデータを流すことを今回は行うため、アクション指定が必要になります。「アクションの追加」ボタンをクリックします。

16.Kinesis Streamsを利用するため、「Amazon Kinesisストリームにメッセージを送信する」を選択し、「アクションの設定」ボタンをクリックします。


17.現時点では、Kinesis Streamsを作成していないので、「新しいリソースを作成する」ボタンをクリックします。


18.新しいタブが表示され、Amazon Kinesisの画面が表示されます。その中で、「Kinesis ストリームの作成」画面に遷移していますので、情報を入力していきます。
「Kinesis ストリームの名前」を入力し、今回は複数のシャードを利用したいため、「シャード数」を「2」にして、「Kinesis ストリームの作成」ボタンをクリックします。



19.作成したKinesis Streamsが作成されていることを確認します。

20.AWS IoTのアクションの設定画面に戻り、ストリーム名の更新ボタンをクリックした後、先程作成した「ストリーム名」を選択し、「パーティションキー」(今回は、ユニークになるように「newuuid()」関数を利用しています)を入力し、Kinesisストリームに設定するロールを新たに作成するために、「新しいロールの作成」をクリックします。


21.「IAMロール名」を入力するフィールドが出てくるので、名前を入力して、「新しいロールの作成」ボタンをクリックします。


22.先程入力した「IAMロール名」を選択し、「アクションの追加」ボタンをクリックします。


23.作成したKinesis Streamsになっていることを確認し、「ルールの作成」ボタンをクリックします。


24.作成したルールが表示されていることを確認します。


MQTTクライアント作成

MQTTクライアントを作成していきます。Python3.6を使ってコードの作成をしていきます。といっても、既に作成したものです。 mqtt-pub_01.pyをサンプルとして記載します。mqtt-pub_01.pyとmqtt-pub_02.pyの違いはGZファイルの名前だけです。 異なるテキストを用意し、GZIPで圧縮して下さい。

#!/usr/bin/python
# -*- coding: utf-8 -*-

#mqtt-pub_01.py
import paho.mqtt.client as mqtt
import ssl
import time
import json
import base64
from datetime import datetime

# settings
deviceplace = 'device_00'
roomname = 'room_00'

# AWS IoT settings
## マネージメントコンソール→AWS IoT→設定→カスタムエンドポイント にあるエンドポイント名をコピー
host = '<<endpoint>>.iot.ap-northeast-1.amazonaws.com'  # AWS IoT Endpoint
## ポート番号は以下、固定
port = 8883  # port
## AWS IoTで作成した証明書
cacert = './cert/rootCA.pem'  # root ca
clientCert = './cert/<<id>>-certificate.pem.crt'  # certificate
clientKey = './cert/<<id>>-private.pem.key'  # private key
## AWS IoTで利用するトピック名
topic = 'deviceforblog/%s' % roomname  # topic

counter = 0

def on_connect(client, userdata, flags, respons_code):
    print('Connected')

# AWS IoTと接続する関数
def sensing():
    while True:
        data = {}
        data['bindata'] = get_bindata()
        data['bindate'] = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
        publish(data)
        time.sleep(1)

# バイナリデータの読み込み
def get_bindata():
    #create the bin data
    ##送信するバイナリデータを利用
    ##mqtt-pub_01.pyの場合はtestdata01.gz
    ##mqtt-pub_02.pyの場合はtestdata02.gz
    bindata = open('testdata01.gz', 'rb').read()
    #base64 encoding
    bindata_base64 = base64.b64encode(bindata).decode('utf-8')
    #return bin data
    return bindata_base64

# データをPublish
def publish(data):
    data['place'] = deviceplace
    global counter
    counter = counter + 1
    data['counter'] = counter
    print("put record:" + str(counter))
    #第一引数: AWS IoT トピック
    #第二引数: JSONデータ
    #第三引数: QoS(今回はQoS1、QoS0の場合は省略可)
    client.publish(topic, json.dumps(data, ensure_ascii=False), 1)  # publish


#以下、メイン
if __name__ == '__main__':
    client = mqtt.Client(protocol=mqtt.MQTTv311)

    # certifications
    client.tls_set(cacert,
        certfile=clientCert,
        keyfile=clientKey,
        tls_version=ssl.PROTOCOL_TLSv1_2)
    client.tls_insecure_set(True)

    # callback
    client.on_connect = on_connect

    # port, keepalive
    client.connect(host, port=port, keepalive=60)

    client.loop_start()

    sensing()

フォルダ構成

AWSIoT2Kinesis/
├── cert
│   ├── cd74dc100b-certificate.pem.crt
│   ├── cd74dc100b-private.pem.key
│   ├── cd74dc100b-public.pem.key
│   └── rootCA.pem
├── testdata01.gz
├── testdata02.gz
├── mqtt-pub_01.py
├── mqtt-pub_02.py
└── requirements.txt

プログラムの実行

pythonコマンドを利用して、実行します。 今回は、2つプログラムを実行しますので、ターミナルを2つ立ち上げて実行して下さい。 1つ目のターミナルでは、「python mqtt-pub_01.py」を実行すると以下のようなメッセージが出力されます。

$ python mqtt-pub_01.py
Connected
put record:1
put record:2

2つ目のターミナルでも、「python mqtt-pub_02.py」を実行すると以下のようなメッセージが出力されます。

$ python mqtt-pub_02.py
Connected
put record:1
put record:2


シャードに入った情報を数える

実行した結果を確認していきます。一定の時間(今回は約20分程度)が経過した時、ターミナルで実行しているプログラムをCTRL+Cで止めて、そのときに出力されている「put record:n」の「n」をメモしておきます。今回、mqtt-pub_01.pyは1,300、mqtt-pub_02.pyは1,250と仮定します。

$ aws kinesis get-records --shard-iterator $(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name kinesis-for-blog | jq -r .ShardIterator) > 00.txt
$ aws kinesis get-records --shard-iterator $(aws kinesis get-shard-iterator --shard-id shardId-000000000001 --shard-iterator-type TRIM_HORIZON --stream-name kinesis-for-blog | jq -r .ShardIterator) > 01.txt

これで、シャードに入ってるデータ情報を確認できます。結果はこのような感じになっています。

{
    "Records": [
        {
            "Data": "XXXXXXXXXXRhIjoiSDRzSUNHMzhzVm9BQTJGaVl5NTBlSFFBXXXXXXXXXXVZaTRBSG9EbHR3MEFBQUE9IXXXXXXXXXXX0ZSI6IjIwMTgvMDMvMjUgMTA6Mzk6NTkiLCJwbGFXXXXXXXXXX21fMDAiLCJjb3VudGVyIjoxfQ==", 
            "PartitionKey": "deviceforblog/room_00/99999999-9999-9999-9999-99999999999", 
            "ApproximateArrivalTimestamp": 1521942003.734, 
            "SequenceNumber": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
        }, 
        ....
    ], 
    "NextShardIterator": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", 
    "MillisBehindLatest": 9305000
}

レコードの中身で「ApproximateArrivalTimestamp」を使って件数を取得してみます。

$ grep ApproximateArrivalTimestamp 00.txt | wc -l
1278
$grep ApproximateArrivalTimestamp 01.txt | wc -l
1272

今回の結果

実行した結果、このような結果が得られました。

  • 2つのシャードに分散されていた
  • 送った側のメッセージとKinesis Streamsで受け取ったデータ数(2つのシャード内を確認)がだいたい同じ


次回予告

この後、試してみる内容としてはこのようなことを考えています。

  • Kinesis Streamsで受け取ったデータをAWS Lambdaで取って見てみる
  • AWS IoTのルールのアクションで権限のないKinesis Streamsへアクセスした場合の動きを見てみる
株式会社サーバーワークス's job postings
1 Likes
1 Likes

Weekly ranking

Show other rankings