日本語

SINETStream User Guide

Python API

1. Example
2. Summary of Python API classes
 2.1 MessageReader Class
 2.2 AsyncMessageReader Class
 2.3 MessageWriter Class
 2.4 AsyncMessageWriter Class
 2.5 Message Class
 2.6 Metrics Class
 2.7 Summary of exception
3. Messaging system-specific parameters
 3.1 Apache Kafka
 3.2 MQTT (Eclipse Paho)
 3.3 S3
4. How to show a cheat sheet

1. Example

First, a simple example is shown.

This example uses two services, namely service-1 and service-2, each with a different messaging system as its backend. The backend of service-1 is Apache Kafka, which consists of four brokers, namely kafka-1 thru kafka-4. The backend of service-2 is MQTT, which consists of one broker, 192.168.2.105.

Creating a configuration file

The configuration file contains settings for the clients to connect to the broker. Please refer to the Configuration file for details.

In this example, we create the following configuration file .sinetstream_config.yml in the current directory on the client machine.

service-1:
  type: kafka
  brokers:
    - kafka-1:9092
    - kafka-2:9092
    - kafka-3:9092
    - kafka-4:9092
service-2:
  type: mqtt
  brokers: 192.168.2.105:1883
  username_pw_set:
    username: user01
    password: pass01

Sending Messages

The following code sends two messages to the topic topic-1 of the messaging system associated with the service service-1.

from sinetstream import MessageWriter

writer = MessageWriter('service-1', 'topic-1')
with writer as f:
    f.publish(b'Hello! This is the 1st message.')
    f.publish(b'Hello! This is the 2nd message.')

First, create an instance of the MessageWriter object by specifying the service name and the topic name. Open this instance with a with statement and send messages to the broker by invoking the publish() method in the block.

The MessageWriter object automatically connects to the messaging system when entering the with block, and it automatically closes the connection when exiting the with block.

By default, the argument of publish() is a byte sequence. To transfer an object other than a byte sequence, specify value_type or value_serializer in the constructor of the MessageWriter Class.

Receiving Messages

The following code receives messages from the topic topic-1 of the messaging system associated with the service service-1.

from sinetstream import MessageReader

reader = MessageReader('service-1', 'topic-1')
with reader as f:
    for msg in f:
        print(msg.value)

First, create an instance of the MessageReader object by specifying the service name and the topic name. Open this instance with a with statement, loop the iterator for f in the block, and receive messages from the broker by referring to the value property of the iterator.

The MessageReader object automatically connects to the messaging system when entering the with block, and it automatically closes the connection when exiting the with block.

By default, the message reader process does not time out and the for statement becomes an infinite loop. To exit the for loop, specify the receive_timeout_ms parameter in the constructor of the MessageReader class or perform a signal handling.

2. Summary of Python API classes

2.1 MessageReader Class

MessageReader()

The constructor of the MessageReader class

MessageReader(
    service,
    topics=None,
    consistency=AT_MOST_ONCE,
    client_id=DEFAULT_CLIENT_ID,
    value_type=None,
    value_deserializer=None,
    receive_timeout_ms=float("inf"),
    **kwargs)
Parameters

The parameters specified in kwargs are passed to the constructor of the backend messaging system. Please refer to the Messaging system-specific parameters of for details.

For the arguments other than service, their default values can be specified in the configuration file. If the same parameter is specified in both the configuration file and the constructor argument, the value specified in the constructor argument takes precedence.

** Limitation: SINETStream downgrades to AT_LEAST_ONCE even if EXACTLY_ONCE is specified for Kafka consistency. **

Exception

Properties

The following properties can be used to get the parameter value specified in the configuration file or in the constructor.

MessageReader.open()

Connects to the broker of the messaging system. Implicitly invoked when using MessageReader in a with statement; not intended for explicit invocation.

Returned value

A handler that mentains the connection status with the messaging system.

Exception

MessageReader.close()

Disconnects from the broker of the messaging system. Implicitly invoked when using MessageReader in a with statement; not intended for explicit invocation.

MessageReader.__iter__()

Returns an iterator for the messages received from the messaging system.

Exception

The following errors may occur when calling next() to the iterator.

AuthorizationError does not occur in the following cases:

  1. When using MQTT (Mosquitto)
    • Because the MQTT broker raises no error for unauthorized operation.

2.2 AsyncMessageReader Class

AsyncMessageReader()

The constructor of the AsyncMessageReader class

AsyncMessageReader(
    service,
    topics=None,
    consistency=AT_MOST_ONCE,
    client_id=DEFAULT_CLIENT_ID,
    value_type="byte_array",
    value_deserializer=None,
    **kwargs)
Parameters

The parameters specified in ‘kwargs’ are passed to the constructor of the backend messaging system. Please refer to the Messaging system-specific parameters for details.

For the arguments other than ‘service’, their default values can be specified in the configuration file. If the same parameter is specified in both the configuration file and the constructor argument, the value specified in the constructor argument takes precedence.

** Limitation: SINETStream downgrades to ‘AT_LEAST_ONCE’ even if ‘EXACTLY_ONCE’ is specified for Kafka ‘consistency’. **

Exception

Properties

The following properties can be used to get the parameter value specified in the configuration file or in the constructor.

AsyncMessageReader.open()

Connects to the broker of the messaging system.

Returned value

A handler that mentains the connection status with the messaging system.

Exception

AsyncMessageReader.close()

Disconnects from the broker of the messaging system. Implicitly invoked when using AsyncMessageReader in a ‘with’ statement; not intended for explicit invocation.

Property: ‘AsyncMessageReader.on_message’

Set the callback function to be invoked when the message is received.

2.3 MessageWriter Class

MessageWriter()

MessageWriter(
    service,
    topic,
    consistency=AT_MOST_ONCE,
    client_id=DEFAULT_CLIENT_ID,
    value_serializer=None,
    **kwargs)

The constructor of MessageWriter class

Parameters

The parameters specified in kwargs are passed to the constructor of the backend messaging system. Please refer to the Messaging system-specific parameters of for details.

For the arguments other than service, their default values can be specified in the configuration file. If the same parameter is specified in both the configuration file and the constructor argument, the value specified in the constructor argument takes precedence.

** Limitations: SINETStream downgrades to AT_LEAST_ONCE even if EXACTLY_ONCE is specified for Kafka consistency. **

Exception

Properties

The following properties can be used to get the parameter value specified in the configuration file or in the constructor.

MessageWriter.open()

Connects to the broker of the messaging system. Implicitly invoked when using MessageWriter in a with statement; not intended for explicit invocation.

Returned value

A handler that mentains the connection status with the messaging system

Exception

MessageWriter.close()

Disconnects from the broker of the messaging system. Implicitly invoked when using MessageWriter in a with statement; not intended for explicit invocation.

MessageWriter.publish(message)

Sends a message to the broker of the messaging system. The message is serialized according to the value_type parameter value or using the function specified by value_serializer.

Exception

AuthorizationError does not occur in the following cases:

  1. When using MQTT (Mosquitto)
    • Because the MQTT broker raises no error for unauthorized operation.
  2. When using Kafka with Consistency set to AT_MOST_ONCE
    • Because the client does not wait for a response from the broker after sending a message. Therefore, the client cannot detect an error on the broker side.

2.4 AsyncMessageWriter Class

AsyncMessageWriter()

AsyncMessageWriter(
    service,
    topic,
    consistency=AT_MOST_ONCE,
    client_id=DEFAULT_CLIENT_ID,
    value_type="byte_array",
    value_serializer=None,
    **kwargs)

The constructor of MessageWriter class

Parameters

The parameters specified in ‘kwargs’ are passed to the constructor of the backend messaging system. Please refer to the Messaging system-specific parameters for details.

For the arguments other than service, their default values can be specified in the configuration file. If the same parameter is specified in both the configuration file and the constructor argument, the value specified in the constructor argument takes precedence.

** Limitations: SINETStream downgrades to ‘AT_LEAST_ONCE’ even if ‘EXACTLY_ONCE’ is specified for Kafka ‘consistency’. **

Exception

Properties

The following properties can be used to get the parameter value specified in the configuration file or in the constructor.

AsyncMessageWriter.open()

Connects to the broker of the messaging system. Implicitly invoked when using ‘AsyncMessageWriter’ in a with statement; not intended for explicit invocation.

Returned value

A handler that mentains the connection status with the messaging system

Exception

‘AsyncMessageWriter.close()’

Disconnects from the broker of the messaging system. Implicitly invoked when using ‘AsyncMessageWriter’ in a with statement; not intended for explicit invocation.

‘AsyncMessageWriter.publish(message)’

Sends a message to the broker of the messaging system. The message is serialized according to the ‘value_type’ parameter or using the function specified by ‘value_serializer’ of AsyncMessageWriter and then sent it to broker.

‘publish(message)’ is an asynchronous process and returns a promise object of promise. By using the methods ‘.then()’ and ‘.catch()’ of the Promise object, It is possible to set processing according to the transmission result (success or failure). A usage example is shown as below.

with AsyncMessageWriter('service-1') as writer:
    writer.publish("message 1").then(lambda _: print("success")).catch(lambda _: print("failure"))
Exception

Depending on the messaging system, even if an unauthorized operation is performed, an AuthorizationError Exception may not occur in the following cases:

  1. When using MQTT (Mosquitto)
    • Because the MQTT broker raises no error for unauthorized operation.
  2. When using Kafka with ‘Consistency’ set to ‘AT_MOST_ONCE’
    • Because the client does not wait for a response from the broker after sending a message. Therefore, the client cannot detect an error on the broker side.

2.5 Message class

The wrapper class for the message object provided by the messaging system.

Property

All the properties are read only.

2.6 Metrics Class

Metrics class You can get metrics information by referencing the metrics property for Reader/Writer objects. After close() Reader/Writer objects, you can get metrics information when it is closed (but raw is None).

The Reader/Writer metrics are reset when the reset_metrics() method was called from the Reader/Writer class. If the reset_raw argument is set to True, the metrics of the backend messaging system will also be reset if possible.

Eclipse Paho, an MQTT client library used in the SINETStream MQTT plugin, does not provide metrics collection capability. The Kafka client library has the capability, but does not provide the reset function.

The metrics are measured at the boundary of the SINETStream main library and the specified messaging system plugin. Therefore, a stream of encrypted massages will be measured if the data encryption function provided by SINETStream is used.

Property

Examples

Display the number of received messages and its amount in bytes:

from sinetstream import MessageReader

reader = MessageReader('service-1', 'topic-001')
# (1)
with reader as f:
    for msg in f:
        pass
    m = reader.metrics  # Statistics on the accumulation from (1)
    print(f'COUNT: {m.msg_count_total}')
    print(f'BYTES: {m.msg_bytes_total}')

Display the receive rate for every 10 messages:

from sinetstream import MessageReader

reader = MessageReader('service-1', 'topic-001')
with reader as f:
    count = 0
    for msg in f:
        count += 1
        if (count == 10):
            count = 0
            m = reader.metrics
            reader.reset_metrics()
            print(f'COUNT/s: {m.msg_count_rate}')
            print(f'BYTES/s: {m.msg_bytes_rate}')

2.7 Summary of exception

exception origin method reason
NoServiceError MessageReader(), MessageWriter(), AsyncMessageReader(), AsyncMessageWriter() The specified service name is not defined in the configuration file.
UnsupportedServiceTypeError MessageReader(), MessageWriter(), AsyncMessageReader(), AsyncMessageWriter() The specified service type is not supported or the corresponding plugin is not installed.
NoConfigError MessageReader(), MessageWriter(), AsyncMessageReader(), AsyncMessageWriter() The configuration file does not exist.
InvalidArgumentError MessageReader(), MessageWriter(), AsyncMessageReader(), AsyncMessageWriter(), MessageReader.open(), MessageWriter.open(), MessageWriter.publish(), AsyncMessageReader().open(), AsyncMessageWriter().open() The argument is incorrect.
ConnectionError MessageReader.open(), MessageWriter.open(), MessageWriter.publish(), AsyncMessageReader().open(), AsyncMessageWriter().open() Error connecting to the broker.
AlreadyConnectedError MessageReader.open(), MessageWriter.open(), AsyncMessageReader().open(), AsyncMessageWriter().open() Already connected to a broker.
InvalidMessageError MessageWriter.publish(), MessageReader.__iter__().__next__() The message format is incorrect.
AuthorizationError MessageWriter.publish(), MessageReader.__iter__().__next__() An unauthorized operation was conducted

3. Messaging system-specific parameters

kwargs can be used to transparently pass parameters to the backend messaging system. The actual parameters that can be passed depend on the backend. No validation is performed.

3.1 Apache Kafka

Basically, the constructor arguments to KafkaConsumer and KafkaProducer of kafka-python can be specified as parameters. If the parameter is valid only in KafkaConsumer or KafkaProducer, it affects MessageReader or MessageWriter, respectively.

Kafka-specific parameters

3.2 MQTT (Eclipse Paho)

Basically, the constructor arguments and the setter function (XXX_set) arguments of paho.mqtt.client.Client can be specified as parameters.

MQTT-specific parameters

3.3 S3

4. How to show a cheat sheet

After installing SINETStream, run python3 -m sinetstream to show a cheat sheet.

$ python3 -m sinetstream
==================================================
Default parameters:
MessageReader(
    service=SERVICE,                 # Service name defined in the configuration file. (REQUIRED)
    topics=TOPICS,                   # The topic to receive.
    config=CONFIG,                   # Config name on the config-server.
    consistency=AT_MOST_ONCE,        # consistency: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE
    client_id=DEFAULT_CLIENT_ID,     # If not specified, the value is automatically generated.
    value_type=BYTE_ARRAY,           # The type of message.
    value_deserializer=None          # If not specified, use default deserializer according to valueType.
)
MessageWriter(
    service=SERVICE,                 # Service name defined in the configuration file. (REQUIRED)
    topic=TOPIC,                     # The topic to send.
    config=CONFIG,                   # Config name on the config-server.
    consistency=AT_MOST_ONCE,        # consistency: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE
    client_id=DEFAULT_CLIENT_ID,     # If not specified, the value is automatically generated.
    value_type=BYTE_ARRAY,           # The type of message.
    value_serializer=None            # If not specified, use default serializer according to valueType.
)
AsyncMessageReader(
    service=SERVICE,                 # Service name defined in the configuration file. (REQUIRED)
    topics=TOPICS,                   # The topic to receive.
    config=CONFIG,                   # Config name on the config-server.
    consistency=AT_MOST_ONCE,        # consistency: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE
    client_id=DEFAULT_CLIENT_ID,     # If not specified, the value is automatically generated.
    value_type=BYTE_ARRAY,           # The type of message.
    value_deserializer=None          # If not specified, use default deserializer according to valueType.
)
AsyncMessageWriter(
    service=SERVICE,                 # Service name defined in the configuration file. (REQUIRED)
    topic=TOPIC,                     # The topic to send.
    config=CONFIG,                   # Config name on the config-server.
    consistency=AT_MOST_ONCE,        # consistency: AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE
    client_id=DEFAULT_CLIENT_ID,     # If not specified, the value is automatically generated.
    value_type=BYTE_ARRAY,           # The type of message.
    value_serializer=None            # If not specified, use default serializer according to valueType.
)