チュートリアル - STEP2
1. 概要
このチュートリアルで実行するコンポーネントを以下の図に示します。

「はじめに」で示した各コンポーネントの役割を、以下に再掲します。
WriterはSINETStreamを用いてメッセージをBrokerに送信しますReaderはSINETStreamを用いてBrokerからメッセージを受信しますBrokerはWriterとReaderの間でメッセージの集約、配送などを行い、エンドポイント間の分離を行います
STEP1のチュートリアルでは、Writer、Reader、Broker を同一のマシンで実行しましたが、
このチュートリアルでは、Writer、Reader、Broker を別々のマシンで実行します。
前提条件
- チュートリアルを実行するマシン(
Writer,Reader,Broker)に Docker Engine がインストールされていること- Docker Engine のインストールについては「Docker Engine インストール手順へのリンク」などを参照してください。
Brokerとなるマシンで TCP/1883, TCP/9092 のポートが利用可能なこと- ブローカーがTCPの待ち受けポートとして利用します
Writer,ReaderのマシンからBrokerのマシンの TCP/1883, TCP/9092 にアクセス可能なこと- ファイアウォールなどで通信がブロックされないように設定してください
実行例について
このチュートリアルではWriter, Reader, Broker が別のマシンになり、それぞれにホスト環境、コンテナ環境があるので6つの環境での実行例がでてきます。
実行例におけるホスト名、ユーザ名などを以下の表のように定めます。
| 役割 | 環境 | ホスト名 | ユーザ名 | IPアドレス |
|---|---|---|---|---|
| Broker | ホスト環境 | host-broker | user00 | 192.168.1.XXX |
| Reader | ホスト環境 | host-reader | user00 | - |
| Writer | ホスト環境 | host-writer | user00 | - |
| Broker | コンテナ環境 | broker | user01 | - |
| Reader | コンテナ環境 | reader | user01 | - |
| Writer | コンテナ環境 | writer | user01 | - |
ホスト環境の値については、実際にチュートリアルを実行する環境に合わせて適宜読み替えてください。
実行例を示す際は、コマンドプロンプトにホスト名、ユーザ名を示すことで、どの環境で実行しているのかを区別できるように表記しています。
たとえば、Writerのコンテナ環境での実行例は以下のように表記します。
[user01@writer]$ ls
プロンプトの[] の中の@の前の部分がユーザ名を、後の部分がホスト名を表しています。
Writer のホスト環境の場合は以下のようになります。
[user00@host-writer]$ ls
2. 実行環境を準備する
Broker, Reader, Writer のそれぞれの環境を順に準備します。
それぞれのマシンのターミナルを開き操作を行ってください。
2.1. Brokerを準備する
2.1.1. バックエンドシステムを準備する
SINETStreamが利用するバックエンドのメッセージングシステム(Kafka, MQTT)をdockerコンテナで実行します。
Brokerのホスト環境で以下のコマンドを実行してください。
[user00@host-broker]$ docker run -d --name broker --hostname broker \
-p 1883:1883 -p 9092:9092 harbor.vcloud.nii.ac.jp/sinetstream/tutorial:1.8
コンテナが正常に起動したことを確認するために、状態を表示させます。
[user00@host-broker]$ docker ps -l
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
xxxxxxxxxxxx sinetstream/tutorial:1.8 "/usr/local/bin/supe…" About a minute ago Up About a minute 0.0.0.0:1883->1883/tcp, 0.0.0.0:9092->9092/tcp broker
STATUS が Up と表示されていれば、コンテナが正常に起動しています。
起動したコンテナでは、SINETStreamが利用するメッセージングシステム Kafka, MQTT(Mosquitto) のブローカーが実行されています。 コンテナで実行しているプロセスを確認してみると以下のようになります。
[user00@host-broker]$ docker exec -t broker ps ax
PID TTY STAT TIME COMMAND
1 ? Ss 0:00 /usr/bin/python3 /usr/local/bin/supervisord -n -c /et
9 ? Sl 0:05 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPaus
10 ? S 0:00 /usr/sbin/mosquitto -c /etc/mosquitto/mosquitto.conf
12 ? Sl 0:01 java -Xmx512M -Xms512M -server -XX:+UseG1GC -XX:MaxGC
822 pts/0 Rs+ 0:00 ps ax
上記の実行例に表示された、それぞれのプロセスの役割を以下に示します。
- PID 1
- いわゆる
initプロセス - コンテナ内で実行するサービスの管理を行う
- いわゆる
- PID 9
- Kafkaブローカー
- PID 10
- MQTT(Mosquitto)ブローカー
- PID 12
- ZooKeeper
- Kafkaが設定、構成情報などを保存するために利用しているサービス
- PID 822
- プロセスリストを表示するために実行した ps コマンド
2.2. Readerを準備する
Readerを実行する環境をコンテナで用意します。
2.2.1. Readerの実行環境となるコンテナを起動する
Readerのホスト環境で以下のコマンドを実行してください。
[user00@host-reader]$ docker run -d --name reader --hostname reader -e ENABLE_BROKER=false \
--add-host=broker:192.168.1.xxx harbor.vcloud.nii.ac.jp/sinetstream/tutorial:1.8
192.168.1.XXXには実際に使用する環境のBrokerのIPアドレスを指定してください。
コンテナが正常に起動したことを確認するために、状態を表示させます。
[user00@host-reader]$ docker ps -l
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
xxxxxxxxxxxx sinetstream/tutorial:1.8 "/usr/local/bin/supe…" About a minute ago Up About a minute 1883/tcp, 9092/tcp reader
STATUS が Up と表示されていれば、コンテナが正常に起動しています。
ここで実行したコンテナイメージはBrokerと同じものですが、コンテナ起動時に-e ENABLE_BROKER=falseを指定することで、ブローカーが実行されないようになっています。
コンテナ内のプロセスの一覧を表示してそのことを確認してみます。
[user00@host-reader]$ docker exec -t reader ps ax
PID TTY STAT TIME COMMAND
1 ? Ss 0:00 /usr/bin/python3 /usr/local/bin/supervisord -n -c /et
30 pts/0 Rs+ 0:00 ps ax
Brokerコンテナでプロセス一覧を確認した時の結果と異なり、Kafkaブローカー、MQTTブローカー、ZooKeeperが実行されていないことがわかります。
Reader用のコンテナを起動する際に指定した --add-host はReaderコンテナの /etc/hosts に、Broker の IPアドレスを登録するためのものです。
Kafkaブローカーを利用するためにはサーバアドレスの名前解決が必要となるため、このパラメータの指定を追加しています。
Readerコンテナの /etc/hosts を表示して Broker のIPアドレスが登録されていることを確認します。
[user00@host-reader]$ docker exec -t reader cat /etc/hosts
127.0.0.1 localhost
::1 localhost ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
192.168.1.XXX broker
172.17.0.3 reader
192.168.1.XXXにはコンテナ起動時に指定したBrokerのIPアドレスが表示されます。また reader のIPアドレスは実行環境によって異なる値が表示されます。
2.2.2. SINETStreamをインストールする
SINETStreamのPython3ライブラリをコンテナ環境にインストールします。
まずReaderのホスト環境からコンテナ環境にはいります。
[user00@host-reader]$ docker exec -it -u user01 reader bash
次にコンテナ環境で SINETStream のライブラリをインストールします。 以下のコマンドを実行してください。
[user01@reader]$ pip3 install --user sinetstream-kafka sinetstream-mqtt
Collecting sinetstream-kafka
(中略)
Successfully installed avro-python3-1.10.0 kafka-python-2.0.2 paho-mqtt-1.5.1 promise-2.3 pycryptodome-3.9.9 pyyaml-3.13 sinetstream-1.4.0 sinetstream-kafka-1.4.0 sinetstream-mqtt-1.4.0 six-1.15.0
最後に Successfully installed ...と表示されていれば、ライブラリのインストールに成功しています。
確認のためインストールされている Python3 ライブラリの一覧を表示してみます。
[user01@reader]$ pip3 list
Package Version
----------------- --------
avro-python3 1.10.0
kafka-python 2.0.2
paho-mqtt 1.5.1
pip 20.2.4
promise 2.3
pycryptodome 3.9.9
PyYAML 3.13
setuptools 50.3.2
sinetstream 1.4.0
sinetstream-kafka 1.4.0
sinetstream-mqtt 1.4.0
six 1.15.0
supervisor 4.2.1
SINETStream以外のライブラリの
Version表示については上記と異なる場合があります。
2.2.3. Readerのプログラムと設定ファイルを準備する
手順を以下に示します。
Reader用のディレクトリを作成する- SINETStream の設定ファイルを準備する
Readerのプログラムを準備する
この節ではコンテナ環境にてコマンドの実行を行います。
2.2.2でコンテナ環境の操作を行っていた状態が継続されていることを想定しています。
ディレクトリを作成し、そのディレクトリに移動します。
[user01@reader]$ mkdir -p ~/sinetstream/reader
[user01@reader]$ cd ~/sinetstream/reader
SINETStreamの設定ファイルを準備します。 このチュートリアルのための設定ファイルを GitHub に用意していますので、それを取得します。
[user01@reader]$ ss_url=https://raw.githubusercontent.com/nii-gakunin-cloud/sinetstream/main
[user01@reader]$ curl -O ${ss_url}/docs/tutorial/.sinetstream_config.yml
SINETStreamのPython3 APIを用いて作成されたReaderのサンプルプログラムをGitHubから取得します。
取得したプログラムには実行権限を付与します。
[user01@reader]$ curl -O ${ss_url}/python/sample/text/consumer.py
[user01@reader]$ chmod a+x consumer.py
ここまでの手順が正しく行われたことを確認します。 ディレクトリとファイルが以下の実行例と同じになっていることを確認してください。
[user01@reader]$ pwd
/home/user01/sinetstream/reader
[user01@reader]$ ls -a
. .. .sinetstream_config.yml consumer.py
2.3. Writerを準備する
Writerを実行する環境をコンテナで用意します。
2.3.1. Writerの実行環境となるコンテナを起動する
Writerのホスト環境で以下のコマンドを実行してください。
[user00@host-writer]$ docker run -d --name writer --hostname writer -e ENABLE_BROKER=false \
--add-host=broker:192.168.1.xxx harbor.vcloud.nii.ac.jp/sinetstream/tutorial:1.8
192.168.1.XXXには実際に使用する環境のBrokerのIPアドレスを指定してください。
コンテナが正常に起動したことを確認するために、状態を表示させます。
[user00@host-writer]$ docker ps -l
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
xxxxxxxxxxxx sinetstream/tutorial:1.8 "/usr/local/bin/supe…" About a minute ago Up About a minute 1883/tcp, 9092/tcp writer
STATUS が Up と表示されていれば、コンテナが正常に起動しています。
Readerコンテナの場合と同様、起動時に -e ENABLE_BROKER=falseを指定したので、コンテナ内ではブローカーが実行されません。
そのことを確認します。
[user00@host-writer]$ docker exec -t writer ps ax
PID TTY STAT TIME COMMAND
1 ? Ss 0:00 /usr/bin/python3 /usr/local/bin/supervisord -n -c /et
31 pts/0 Rs+ 0:00 ps ax
また--add-host の指定により Writerコンテナの /etc/hosts に Broker のIPアドレスが登録されていることを確認します。
[user00@host-writer]$ docker exec -t writer cat /etc/hosts
127.0.0.1 localhost
::1 localhost ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters
192.168.1.XXX broker
172.17.0.4 writer
192.168.1.XXXにはコンテナ起動時に指定したBrokerのIPアドレスが表示されます。また writer のIPアドレスは実行環境によって異なる値が表示されます。
2.3.2. SINETStreamをインストールする
SINETStreamのPython3ライブラリをコンテナ環境にインストールします。
まずWriterのホスト環境からコンテナ環境にはいります。
[user00@host-writer]$ docker exec -it -u user01 writer bash
次にコンテナ環境で SINETStream のライブラリをインストールします。 以下のコマンドを実行してください。
[user01@writer]$ pip3 install --user sinetstream-kafka sinetstream-mqtt
Collecting sinetstream-kafka
(中略)
Successfully installed avro-python3-1.10.0 kafka-python-2.0.2 paho-mqtt-1.5.1 promise-2.3 pycryptodome-3.9.9 pyyaml-3.13 sinetstream-1.4.0 sinetstream-kafka-1.4.0 sinetstream-mqtt-1.4.0 six-1.15.0
最後に Successfully installed ...と表示されていれば、ライブラリのインストールに成功しています。
確認のためインストールされている Python3 ライブラリの一覧を表示してみます。
[user01@writer]$ pip3 list
Package Version
----------------- --------
avro-python3 1.10.0
kafka-python 2.0.2
paho-mqtt 1.5.1
pip 20.2.4
promise 2.3
pycryptodome 3.9.9
PyYAML 3.13
setuptools 50.3.2
sinetstream 1.4.0
sinetstream-kafka 1.4.0
sinetstream-mqtt 1.4.0
six 1.15.0
supervisor 4.2.1
SINETStream以外のライブラリの
Version表示については上記と異なる場合があります。
2.3.3. Writerのプログラムと設定ファイルを準備する
手順を以下に示します。
Writer用のディレクトリを作成する- SINETStream の設定ファイルを準備する
Writerのプログラムを準備する
この節ではコンテナ環境にてコマンドの実行を行います。
2.3.2でコンテナ環境の操作を行っていた状態が継続されていることを想定しています。
ディレクトリを作成し、そのディレクトリに移動します。
[user01@writer]$ mkdir -p ~/sinetstream/writer
[user01@writer]$ cd ~/sinetstream/writer
SINETStreamの設定ファイルを準備します。 このチュートリアルのための設定ファイルを GitHub に用意していますので、それを取得します。
[user01@writer]$ ss_url=https://raw.githubusercontent.com/nii-gakunin-cloud/sinetstream/main
[user01@writer]$ curl -O ${ss_url}/docs/tutorial/.sinetstream_config.yml
SINETStreamのPython3 APIを用いて作成されたWriterのサンプルプログラムをGitHubから取得します。
取得したプログラムには実行権限を付与します。
[user01@writer]$ curl -O ${ss_url}/python/sample/text/producer.py
[user01@writer]$ chmod a+x producer.py
ここまでの手順が正しく行われたことを確認します。 ディレクトリとファイルが以下の実行例と同じになっていることを確認してください。
[user01@writer]$ pwd
/home/user01/sinetstream/writer
[user01@writer]$ ls -a
. .. .sinetstream_config.yml producer.py
3. Reader, Writerを実行する
ReaderとWriterを実行して SINETStream を利用したメッセージの送受信が行えることを確認します。
SINETStreamで利用可能なメッセージングシステムは Kafka や MQTT(Mosquitto) などがあります。 ここでは、まず Kafkaブローカーとメッセージの送受信が行えることを確認します。 その後、設定変更のみでプログラムを変更することなくMQTTブローカーともメッセージの送受信が行えることを確認します。
3.1. Kafkaブローカーとの間でメッセージの送受信を行う
ここからは、ReaderとWriterのプログラムを同時に実行します。
実行するためのターミナルをそれぞれのマシンで開いてください。
Readerの実行
Readerのターミナルにて、ホスト環境からコンテナ環境に入ります。
以下のコマンドを実行してください。
既にコンテナ環境に入っている場合は実行する必要はありません。
[user00@host-reader]$ docker exec -it -u user01 reader bash
Reader用のディレクトリに移動してください。
[user01@reader]$ cd ~/sinetstream/reader
Readerのプログラムを実行します。
引数に指定しているservice-tutorial-kafkaはKafkaブローカーを指定するサービス名です。
[user01@reader]$ ./consumer.py -s service-tutorial-kafka
Press ctrl-c to exit the program.
: service=service-tutorial-kafka
コマンドラインで指定したサービス名が : の後に表示されます。
Writerの実行
Writerのターミナルにて、ホスト環境からコンテナ環境に入ります。
以下のコマンドを実行してください。
既にコンテナ環境に入っている場合は実行する必要はありません。
[user00@host-writer]$ docker exec -it -u user01 writer bash
Writer用のディレクトリに移動してください。
[user01@writer]$ cd ~/sinetstream/writer
Writerのプログラムを実行します。
引数に指定しているservice-tutorial-kafkaはKafkaブローカーを指定するサービス名です。
[user01@writer]$ ./producer.py -s service-tutorial-kafka
Press ctrl-c to exit the program.
: service=service-tutorial-kafka
コマンドラインで指定したサービス名が : の後に表示されます。
メッセージの送受信
Writerのターミナルにて、メッセージとなるテキストを入力し最後に改行を打ち込んでください。
改行までに入力された文字列がメッセージとして Kafka ブローカーに送信されます。
Readerのプログラムは Kafkaブローカーに送られたメッセージを受信してターミナルに表示します。
先ほど Writer で送信したメッセージが表示されていることを確認してください。
メッセージがブローカーによって配送されていることを確認する
Writerから送信したメッセージが Broker によってReaderに配送されていることを確認するために、ブローカーを一時的に停止してみます。
Brokerのホスト環境で以下のコマンドを実行しBrokerコンテナを停止させてください。
[user00@host-broker]$ docker stop broker
Brokerコンテナが停止した状態で Writer のターミナルから実行中のサンプルプログラム producer.pyでメッセージの送信を行ってください。
Kafkaブローカーが停止しているため、Reader側でメッセージの受信ができずWriter側で入力した文字列が表示されないことが確認できます。
確認が済んだらBrokerのホスト環境で以下のコマンドを実行してBrokerコンテナを再開させてください。
[user00@host-broker]$ docker start broker
Reader, Writer の停止
Reader と Writer のサンプルプログラムを停止します。
サンプルプログラムを実行しているそれぞれのターミナルで ctrl-c を打ち込んでください。
3.2. MQTTブローカー(Mosquitto)との間でメッセージの送受信を行う
Kafkaブローカーと同じ操作を行い、MQTTブローカーを利用した場合もメッセージの送受信が行えることを確認します。
先ほどはプログラムの引数に Kafka ブローカーを指定するサービス名として service-tutorial-kafkaを指定しました。
ここでは代わりに MQTTブローカーを指定するためのサービス名 service-tutorial-mqttを指定します。
Readerの実行
Readerのターミナルにて、以下のコマンドを実行してください。
[user01@reader]$ ./consumer.py -s service-tutorial-mqtt
Press ctrl-c to exit the program.
: service=service-tutorial-mqtt
Readerのターミナルでは、コンテナ環境のReader用ディレクトリにいることを想定しています。
Writerの実行
Writerのターミナルにて、以下のコマンドを実行してください。
[user01@writer]$ ./producer.py -s service-tutorial-mqtt
Press ctrl-c to exit the program.
: service=service-tutorial-mqtt
Writerのターミナルでは、コンテナ環境のWriter用ディレクトリにいることを想定しています。
メッセージの送受信
Kafkaブローカーの場合と同様の操作を行い、MQTTブローカーを用いた場合もメッセージの送受信が行えることを確認します。
Writerのターミナルから文字列を入力して改行を打ち込むとReaderのターミナルに入力した文字列に対応するメッセージが表示されることを確認してください。
Reader, Writer の停止
メッセージの送受信が行えたことを確認したら Reader と Writer のサンプルプログラムを停止します。
それぞれのターミナルで ctrl-c を打ち込んでください。
3.3. コンテナの停止、削除
最後にこのチュートリアルで使用したコンテナの停止、削除を行います。
コンテナの操作はホスト環境で実行します。
コンテナ環境からホスト環境に戻る場合は exit を実行してください。
例えば Readerのターミナルでコンテナ環境からホスト環境に戻る場合、以下のようになります。
[user01@reader]$ exit
exit
[user00@host-reader]$
ホスト環境に戻ったら、それぞれのマシンで以下のコマンドを実行してください。
Broker
[user00@host-broker]$ docker stop broker
[user00@host-broker]$ docker rm broker
Reader
[user00@host-reader]$ docker stop reader
[user00@host-reader]$ docker rm reader
Writer
[user00@host-writer]$ docker stop writer
[user00@host-writer]$ docker rm writer
4. SINETStreamについて
チュートリアルで使用した SINETStream の設定ファイルとAPIについて簡単な説明を行います。
この章の記述は STEP1 のものと全く同じ内容になっています。
4.1. 設定ファイル
Reader, Writer で使用した SINETStream の設定ファイル .sinetstream_config.yml の内容を以下に示します。
service-tutorial-kafka:
type: kafka
brokers: "broker:9092"
topic: topic-tutorial-kafka
value_type: text
service-tutorial-mqtt:
type: mqtt
brokers: "broker:1883"
topic: topic-tutorial-mqtt
value_type: text
設定ファイルは階層化したキー、バリューをYAMLの形式で記述しています。
トップレベルのキーには SINETStreamでサービス名と呼んでいる名前を記しています。
上記の例では service-tutorial-kafka、 service-tutorial-mqtt がこれにあたります。
サービス名はブローカーとの接続に関する種々のパラメータをまとめて扱うためのラベル名になります。
Reader, Writerのサンプルプログラムを実行する際に指定したサービス名は、設定ファイルに記述した、この値に対応しています。
サービス名の子要素にブローカーとの接続に関する具体的なパラメータを記述します。
サービス名service-tutorial-kafka に対応するパラメータは以下の部分になります。
type: kafka
brokers: "broker:9092"
topic: topic-tutorial-kafka
value_type: text
各パラメータに関する簡単な説明を以下に記します。
- type
- メッセージングシステムの種別を指定します
kafka,mqttなどが指定できます。
- brokers
- ブローカーのアドレスを指定します
- アドレスの書式はホスト名とポート番号を
:で繋げたものとします
- topic
- メッセージを送受信する対象となるトピックを指定します
- value_type
- メッセージの種別を指定します
- 指定できる値は以下のいずれかになります
- text
- byte_array
- チュートリアルで用いた
textはメッセージが文字列型であることの指定になります
設定ファイルに関する詳細についてはユーザガイドを参照してください。
4.2. SINETStream API
Reader
Readerのサンプルプログラムconsumer.py で SINETStream API を使用している箇所を以下に示します。
with MessageReader(service) as reader:
for message in reader:
print(f"topic={message.topic} value='{message.value}'")
サンプルプログラムconsumer.py 全体のコードはGitHubで確認できます。
はじめにメッセージを受信するための MessageReader のオブジェクトを作成します。
その際、引数としてサービス名を指定します。
MessageReaderは通常Pythonのwith文で実行します。
これにより、ブローカーとの接続、切断が with文のブロックの境界で実行されます。
with文が返した値 reader はイテラブルなオブジェクトになっています。
for文などによりreaderから順次取得した値が、ブローカーから受信したメッセージとなります。
Writer
Writerのサンプルプログラムproducer.py で SINETStream API を使用している箇所を以下に示します。
with MessageWriter(service) as writer:
while True:
message = get_message()
writer.publish(message)
サンプルプログラムproducer.py 全体のコードはGitHubで確認できます。
メッセージを送信するための MessageWriter のオブジェクトを作成します。
その際、引数としてサービス名を指定します。
MessageWriterは通常Pythonのwith文で実行します。
これにより、ブローカーとの接続、切断が with文のブロックの境界で実行されます。
with文が返した値 writer に対して .publish(message)を呼び出すことでメッセージをブローカーに送信することができます。
SINETStreamのPython APIに関する詳細についてはユーザガイドを参照してください。
4.3. SINETStream API(非同期API)
SINETStream v1.4 では非同期APIを利用することができます。 ここまで記してきたのと同様の処理を非同期APIで実行するサンプルプログラムをGithubに用意しています。
Reader(非同期API)
非同期APIを用いたReaderのサンプルプログラムconsumer.pyでSINETStream APIを使用している箇所を以下に示します。
reader = AsyncMessageReader(service)
reader.on_message = show_message
reader.open()
はじめにメッセージを受信するための AsyncMessageReader のオブジェクトを作成します。
その際、引数としてサービス名を指定します。
次に.on_messageプロパティにメッセージを受信した際に呼び出すコールバック関数を指定します。
コールバック関数は引数から受信したメッセージを受け取ることができます。
サンプルプログラムでは以下のようなコールバック関数を用いています。
def show_message(message):
ts = datetime.fromtimestamp(message.timestamp)
print(f"[{ts}] topic={message.topic} value='{message.value}'")
最後にreader.open()を呼び出すことでブローカーに接続されます。
Writer(非同期API)
非同期APIを用いたWriterのサンプルプログラムproducer.pyでSINETStream APIを使用している箇所を以下に示します。
with AsyncMessageWriter(service) as writer:
while True:
message = get_message()
writer.publish(message)
メッセージを送信するための AsyncMessageWriter のオブジェクトを作成します。
その際、引数としてサービス名を指定します。
AsyncMessageWriterは通常Pythonのwith文で実行します。
これにより、ブローカーとの接続、切断が with文のブロックの境界で実行されます。
with文が返した値 writer に対して .publish(message)を呼び出すことでメッセージをブローカーに送信することができます。
同期APIでは.publish()が送信処理の完了までブロックしますが、非同期APIの場合はブロックすることなく返ります。
また非同期APIの.publish()は処理結果が確定した後の処理を指定するための
Promiseオブジェクトを返します。