English

SINETStream ユーザガイド

Python API

1. 使用例
2. Python API クラス一覧
 2.1 MessageReader クラス
 2.2 AsyncMessageReader クラス
 2.3 MessageWriter クラス
 2.4 AsyncMessageWriter クラス
 2.5 Message クラス
 2.6 Metrics クラス
 2.7 例外一覧
3. メッセージングシステム固有のパラメータ
 3.1 Apache Kafka
 3.2 MQTT (Eclipse Paho)
 3.3 S3
4. チートシートの表示方法

1. 使用例

はじめに簡単な使用例を示す。

この例では、異なるメッセージングシステムをバックエンドとする二つのサービス service-1service-2 を利用する。 service-1 のバックエンドは Apache Kafka で、4台のブローカー kafka-1kafka-4 で構成される。 service-2 のバックエンドは MQTT で、1台のブローカー 192.168.2.105 で構成される。

設定ファイル作成

設定ファイルは、クライアントがブローカーに接続するための設定が記述されたファイルである。 詳細は 設定ファイル を参照すること。

この例では、以下の内容の設定ファイル .sinetstream_config.yml をクライアントマシンのカレントディレクトリに作成する。

service-1:
  type: kafka
  brokers:
    - kafka-1:9092
    - kafka-2:9092
    - kafka-3:9092
    - kafka-4:9092
service-2:
  type: mqtt
  brokers: 192.168.2.105:1883
  username_pw_set:
    username: user01
    password: pass01

メッセージ送信

サービス名 service-1 に対応するメッセージングシステムのトピック topic-1 に対してメッセージを送信する例を示す。

from sinetstream import MessageWriter

writer = MessageWriter(service='service-1', topic='topic-1')
with writer as f:
    f.publish(b'Hello! This is the 1st message.')
    f.publish(b'Hello! This is the 2nd message.')

はじめに、サービス名とトピック名を指定して MessageWriter オブジェクトのインスタンスを作成する。 このインスタンスを with 文で開き、ブロック内で publish() メソッドを呼び出すことで、メッセージをブローカーに送信する。

MessageWriter オブジェクトは、with ブロックに入ると自動的にメッセージングシステムに接続され、 with ブロックを抜けると自動的にメッセージングシステムとの接続がクローズされる。

デフォルトでは、publish() の引数はバイト列である。 バイト列以外のオブジェクトを渡すには、MessageWriterクラス のコンストラクタで value_type または value_serializer を指定する。

メッセージ受信

サービス名 service-1 に対応するメッセージングシステムのトピック topic-1 からメッセージを受信する例を示す。

from sinetstream import MessageReader

reader = MessageReader(service='service-1', topic='topic-001')
with reader as f:
    for msg in f:
        print(msg.value)

はじめに、サービス名とトピック名を指定して MessageReader オブジェクトのインスタンスを作成する。 このインスタンスを with 文で開き、ブロック内でターゲット f に対しイテレータを回し、イテレータの value プロパティを参照することで、メッセージをブローカーから受信する。

MessageReader オブジェクトは、with ブロックに入ると自動的にメッセージングシステムに接続され、 with ブロックを抜けると自動的にメッセージングシステムとの接続がクローズされる。

デフォルトでは、メッセージ受信処理はタイムアウトせず、for 文は無限ループとなる。 for ループから抜けるには、MessageReaderクラス のコンストラクタで receive_timeout_ms を指定するか、シグナル処理を行う必要がある。

2. Python API クラス一覧

2.1 MessageReader クラス

MessageReader()

MessageReaderクラスのコンストラクタ。

MessageReader(
    service=None,
    topics=None,
    config=None,
    **kwargs)
パラメータ

kwargs に記述されたパラメータは、バックエンドのメッセージングシステムのコンストラクタにそのまま渡される。 詳細は メッセージングシステム固有のパラメータ を参照。

service 以外の引数は、設定ファイルにデフォルト値を記述することができる。 設定ファイルとコンストラクタの引数の両方に同じパラメータの値を指定した場合は、コンストラクタの引数に指定した値が優先する。

制限事項: Kafka の consistencyEXACTLY_ONCE を指定しても AT_LEAST_ONCE にダウングレードする。

例外

プロパティ

設定ファイルもしくはコンストラクタで指定したパラメータのうち、プロパティとして値を参照することが出来るものを以下に示す。

MessageReader.open()

メッセージングシステムのブローカーに接続する。 通常は明示的に呼び出すことはなく MessageReaderをwith文で用いた場合に、暗黙的に呼び出されることを想定している。

戻り値

メッセージングシステムとの接続状態を保持しているハンドラ。

例外

MessageReader.close()

メッセージングシステムのブローカーとの通信を切断する。 通常は明示的に呼び出すことはなく MessageReaderをwith文で用いた場合に、暗黙的に呼び出されることを想定している。

MessageReader.__iter__()

メッセージングシステムから取得したメッセージのイテレータを返す。

例外

このメソッドが返したイテレータに対して next() を呼び出した場合に以下の例外が発生することがある。

メッセージングシステムによっては認可されていない操作をおこなっても上記の例外が発生しないことがある。 MQTT(Mosquitto)がこれに該当し、認可されていない操作を行っても例外が発生しない。 これは認可されていない操作を行った場合もブローカー側がクライアント側にエラーを返さないためである。

2.2 AsyncMessageReader クラス

AsyncMessageReader()

AsyncMessageReaderクラスのコンストラクタ。

AsyncMessageReader(
    service,
    topics=None,
    config=None,
    consistency=AT_MOST_ONCE,
    client_id=DEFAULT_CLIENT_ID,
    value_type="byte_array",
    value_deserializer=None,
    **kwargs)
パラメータ

kwargs に記述されたパラメータは、バックエンドのメッセージングシステムのコンストラクタにそのまま渡される。 詳細は メッセージングシステム固有のパラメータ を参照。

service 以外の引数は、設定ファイルにデフォルト値を記述することができる。 設定ファイルとコンストラクタの引数の両方に同じパラメータの値を指定した場合は、コンストラクタの引数に指定した値が優先する。

制限事項: Kafka の consistencyEXACTLY_ONCE を指定しても AT_LEAST_ONCE にダウングレードする。

例外

プロパティ

設定ファイルもしくはコンストラクタで指定したパラメータのうち、プロパティとして値を参照することが出来るものを以下に示す。

AsyncMessageReader.open()

メッセージングシステムのブローカーに接続する。

戻り値

メッセージングシステムとの接続状態を保持しているハンドラ。

例外

AsyncMessageReader.close()

メッセージングシステムのブローカーとの通信を切断する。 通常は明示的に呼び出すことはなく AsyncMessageReaderをwith文で用いた場合に、暗黙的に呼び出されることを想定している。

プロパティ: AsyncMessageReader.on_message

メッセージを受信した際に呼び出されるコールバック関数を設定する。

2.3 MessageWriter クラス

MessageWriter()

MessageWriter(
    service,
    topic,
    config=None,
    consistency=AT_MOST_ONCE,
    client_id=DEFAULT_CLIENT_ID,
    value_type="byte_array",
    value_serializer=None,
    **kwargs)

MessageWriterクラスのコンストラクタ。

パラメータ

kwargs に記述されたパラメータは、バックエンドのメッセージングシステムのコンストラクタにそのまま渡される。 詳細は メッセージングシステム固有のパラメータ を参照。

service 以外の引数は、設定ファイルにデフォルト値を記述することができる。 設定ファイルとコンストラクタの引数の両方に同じパラメータの値を指定した場合はコンストラクタの引数に指定した値が優先する。

制限事項: Kafka の consistencyEXACTLY_ONCE を指定しても AT_LEAST_ONCE にダウングレードする。

例外

プロパティ

設定ファイルもしくはコンストラクタで指定したパラメータのうち、プロパティとして値を参照することが出来るものを以下に示す。

MessageWriter.open()

メッセージングシステムのブローカーに接続する。 通常は明示的に呼び出すことはなく MessageWriterをwith文で用いた場合に、暗黙的に呼び出されることを想定している。

戻り値

メッセージングシステムとの接続状態を保持しているハンドラ。

例外

MessageWriter.close()

メッセージングシステムのブローカーとの通信を切断する。 通常は明示的に呼び出すことはなく MessageWriterをwith文で用いた場合に、暗黙的に呼び出されることを想定している。

MessageWriter.publish(message)

メッセージをメッセージングシステムのブローカーに送信する。 messageMessageWriterのパラメータvalue_typeあるいはvalue_serializerによってシリアライズされたうえでブローカーに送信される。

例外

メッセージングシステムによっては認可されていない操作をおこなってもAuthorizationErrorの例外が発生しないことがある。 以下のケースが該当する。

  1. MQTT(Mosquitto)の場合
    • 認可されていない操作を行った場合もブローカー側がクライアント側にエラーを返さない。そのため例外が発生しない。
  2. KafkaでConsistencyAT_MOST_ONCEを指定した場合
    • ブローカーの応答を待たずにクライアント側のメッセージの送信処理が完了する。そのため、ブローカー側の認可エラーを検知できず、例外が発生しない。

2.4 AsyncMessageWriter クラス

AsyncMessageWriter()

AsyncMessageWriter(
    service,
    topic,
    config=None,
    consistency=AT_MOST_ONCE,
    client_id=DEFAULT_CLIENT_ID,
    value_type="byte_array",
    value_serializer=None,
    **kwargs)

AsyncMessageWriterクラスのコンストラクタ。

パラメータ

kwargs に記述されたパラメータは、バックエンドのメッセージングシステムのコンストラクタにそのまま渡される。 詳細は メッセージングシステム固有のパラメータ を参照。

service 以外の引数は、設定ファイルにデフォルト値を記述することができる。 設定ファイルとコンストラクタの引数の両方に同じパラメータの値を指定した場合はコンストラクタの引数に指定した値が優先する。

制限事項: Kafka の consistencyEXACTLY_ONCE を指定しても AT_LEAST_ONCE にダウングレードする。

例外

プロパティ

設定ファイルもしくはコンストラクタで指定したパラメータのうち、プロパティとして値を参照することが出来るものを以下に示す。

AsyncMessageWriter.open()

メッセージングシステムのブローカーに接続する。 通常は明示的に呼び出すことはなく AsyncMessageWriterをwith文で用いた場合に、暗黙的に呼び出されることを想定している。

戻り値

メッセージングシステムとの接続状態を保持しているハンドラ。

例外

AsyncMessageWriter.close()

メッセージングシステムのブローカーとの通信を切断する。 通常は明示的に呼び出すことはなく AsyncMessageWriterをwith文で用いた場合に、暗黙的に呼び出されることを想定している。

AsyncMessageWriter.publish(message)

メッセージをメッセージングシステムのブローカーに送信する。 messageAsyncMessageWriterのパラメータvalue_typeあるいはvalue_serializerによってシリアライズされたうえでブローカーに送信される。

publish(message)は非同期処理であり promisePromiseオブジェクトを返す。 Promiseオブジェクトのメソッド .then(), .catch()を用いることで、 送信結果(成功、失敗)に応じた処理を設定することができる。使用例を以下に示す。

with AsyncMessageWriter('service-1') as writer:
    writer.publish("message 1").then(lambda _: print("success")).catch(lambda _: print("failure"))
例外

メッセージングシステムによっては認可されていない操作をおこなってもAuthorizationErrorの例外が発生しないことがある。 以下のケースが該当する。

  1. MQTT(Mosquitto)の場合
    • 認可されていない操作を行った場合もブローカー側がクライアント側にエラーを返さない。そのため例外が発生しない。
  2. KafkaでConsistencyAT_MOST_ONCEを指定した場合
    • ブローカーの応答を待たずにクライアント側のメッセージの送信処理が完了する。そのため、ブローカー側の認可エラーを検知できず、例外が発生しない。

2.5 Message クラス

メッセージングシステムのメッセージオブジェクトのラッパークラス。

プロパティ

全て読み取りアクセスのみ。

2.6 Metrics クラス

メトリクス情報のクラス。 Reader/Writerオブジェクトに対してmetricsプロパティを参照すると得られる。 Reader/Writerオブジェクトをclose()したあとはcloseしたときのメトリクス情報が得られる(ただしrawはNone)。

Reader/Writerオブジェクトに対してreset_metrics()メソッドを呼び出すとReader/Writerの統計情報がリセットされる。 引数 reset_raw にTrueを指定した場合に限り、 SINETStreamの統計情報だけでなくメッセージングシステム固有の統計情報もリセットされる(可能であれば)。

Eclipse Paho(SINETStreamのMQTTプラグインで使用しているMQTTクライアントライブラリ)は統計情報を提供してない。 Kafkaにはメッセージングシステム固有の統計情報があるがリセット機能はない。

統計情報はSINETStreamメインライブラリとメッセージングシステムプラグインの境界で測定した値が使われる。 したがって、SINETStreamの暗号化機能が有効の場合は暗号化されたメッセージが測定される。 統計情報の更新タイミングはWriterではメッセージングシステムプラグインにデータ渡す直前(メッセージングシステムが実際に送信したかは関知しない)、 Readerではメッセージングシステムプラグインからデータを受け取った直後である。 圧縮に関する統計統計情報は例外で圧縮処理の前後で測定される。

  <writer>                      <reader>
  Application                   Application
    ↓                            ↑
  value_serializer              value_deserializer
    ↓                            ↑                ←msg_uncompressed_bytes_total
  compressor                    decompressor
    ↓                            ↑                ←msg_compressed_bytes_total
  Avro serializer               Avro deserializer
    ↓                            ↑
  encrypt                       decrypt
- - ↓  - - - - - - - - - - - - - ↑ - - - - - - - -←メトリクス測定境界
  messaging system → broker → messaging system

プロパティ

使用例

受信したメッセージ数・バイト数を表示する。

from sinetstream import MessageReader

reader = MessageReader('service-1', 'topic-001')
# (1)
with reader as f:
    for msg in f:
        pass
    m = reader.metrics  # (1)からの累積の統計情報が得られる
    print(f'COUNT: {m.msg_count_total}')
    print(f'BYTES: {m.msg_bytes_total}')

10メッセージごとに受信レートを表示する。

from sinetstream import MessageReader

reader = MessageReader('service-1', 'topic-001')
with reader as f:
    count = 0
    for msg in f:
        count += 1
        if (count == 10):
            count = 0
            m = reader.metrics
            reader.reset_metrics()
            print(f'COUNT/s: {m.msg_count_rate}')
            print(f'BYTES/s: {m.msg_bytes_rate}')

2.7 例外一覧

例外 発生元メソッド 理由
NoServiceError MessageReader(), MessageWriter(), AsyncMessageReader(), AsyncMessageWriter() 指定したサービス名が設定ファイルで定義されていない。
UnsupportedServiceTypeError MessageReader(), MessageWriter(), AsyncMessageReader(), AsyncMessageWriter() サービスの定義で指定されているサービスタイプをサポートしていない。または対応するプラグインがインストールされていない。
NoConfigError MessageReader(), MessageWriter(), AsyncMessageReader(), AsyncMessageWriter() 設定ファイルがない。
InvalidArgumentError MessageReader(), MessageWriter(), AsyncMessageReader(), AsyncMessageWriter(), MessageReader.open(), MessageWriter.open(), MessageWriter.publish(), AsyncMessageReader().open(), AsyncMessageWriter().open() 引数が間違っている。
ConnectionError MessageReader.open(), MessageWriter.open(), MessageWriter.publish(), AsyncMessageReader().open(), AsyncMessageWriter().open() ブローカーとの接続に問題がある。
AlreadyConnectedError MessageReader.open(), MessageWriter.open(), AsyncMessageReader().open(), AsyncMessageWriter().open() すでにブローカと接続している。
InvalidMessageError MessageWriter.publish(), MessageReader.__iter__().__next__() メッセージのフォーマットが間違っている。
AuthorizationError MessageWriter.publish(), MessageReader.__iter__().__next__() 権限のない操作を行った。

3. メッセージングシステム固有のパラメータ

kwargs を用いて、バックエンドのメッセージングシステム固有のパラメータを透過的に指定できる。 実際にどのようなパラメータを渡せるかはバックエンドによって異なる。 kwargs に指定されたパラメータの妥当性チェックは行われない。

3.1 Apache Kafka

基本的に kafka-pythonKafkaConsumerKafkaProducer の コンストラクタ引数をパラメータとして指定できる。 KafkaConsumer のみ、または KafkaProducer のみで意味を持つパラメータについては、 それぞれ MessageReader, MessageWriter の対応するクラスのみに影響を与える。

Kafka固有のパラメータ

3.2 MQTT (Eclipse Paho)

基本的に paho.mqtt.client.Client の コンストラクタと設定関数 (XXX_set) などの引数に指定できるパラメータを指定できる。

MQTT固有のパラメータ

3.3 S3

S3固有のパラメータ

4. チートシートの表示方法

SINETStreamをインストール後 python3 -m sinetstream を実行するとチートシートが表示される。

$ python3 -m sinetstream
==================================================
Default parameters:
MessageReader(
    service=SERVICE,                 # Service name defined in the configuration file. (REQUIRED)
    topics=TOPICS,                   # The topic to receive.
    config=CONFIG,                   # Config name on the config-server.
    consistency=AT_MOST_ONCE,        # consistency: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE
    client_id=DEFAULT_CLIENT_ID,     # If not specified, the value is automatically generated.
    value_type=BYTE_ARRAY,           # The type of message.
    value_deserializer=None          # If not specified, use default deserializer according to valueType.
)
MessageWriter(
    service=SERVICE,                 # Service name defined in the configuration file. (REQUIRED)
    topic=TOPIC,                     # The topic to send.
    config=CONFIG,                   # Config name on the config-server.
    consistency=AT_MOST_ONCE,        # consistency: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE
    client_id=DEFAULT_CLIENT_ID,     # If not specified, the value is automatically generated.
    value_type=BYTE_ARRAY,           # The type of message.
    value_serializer=None            # If not specified, use default serializer according to valueType.
)
AsyncMessageReader(
    service=SERVICE,                 # Service name defined in the configuration file. (REQUIRED)
    topics=TOPICS,                   # The topic to receive.
    config=CONFIG,                   # Config name on the config-server.
    consistency=AT_MOST_ONCE,        # consistency: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE
    client_id=DEFAULT_CLIENT_ID,     # If not specified, the value is automatically generated.
    value_type=BYTE_ARRAY,           # The type of message.
    value_deserializer=None          # If not specified, use default deserializer according to valueType.
)
AsyncMessageWriter(
    service=SERVICE,                 # Service name defined in the configuration file. (REQUIRED)
    topic=TOPIC,                     # The topic to send.
    config=CONFIG,                   # Config name on the config-server.
    consistency=AT_MOST_ONCE,        # consistency: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE
    client_id=DEFAULT_CLIENT_ID,     # If not specified, the value is automatically generated.
    value_type=BYTE_ARRAY,           # The type of message.
    value_serializer=None            # If not specified, use default serializer according to valueType.
)