Developer Guide for Java messaging system plugin

This section describes the procedure for developing a Java plug-in to support a new messaging system with SINETStream.

1. Introduction

SINETStream v1.1 supports the following messaging systems:

In order to use the above messaging systems via SINETStream, we have developed plugins, which implement SINETStream Service Provider Interface (SPI). Another messaging systems can be used via SINETStream by developing a new plugin for each target system.

This document describes the steps to develop a plugin to support a new messaging system.

1.1 Target readers

Target readers of this document are as follows:

1.2 Prerequisite knowledge

This document assumes that readers have the following knowledge:

2. The software architecture of SINETStream.

Before describing the procedure for developing a plugin, the software architecture of SINETStream required for development will be explained.

2.1 Module configuration

SINETStream module configuration is shown in the following figure:

モジュール構成

The part shown in blue indicates the SINETStream modules which provide common functions that are independent of each messaging system. The part indicated by the green frames show the SINET Stream plugins. All of operations to each messaging system are called via its plugin implementing the SPI.

Below is a brief description of each module.

2.2 Process Sequence

We explain process sequences using MessageReader and MesageWriter classes that receives and sends a message via SINETStream.

2.2.1 MessageReader

We show the sequence diagram when the following message reception program is executed. Here we assume that the program receives messages from Kafka brokers.

MessageReaderFactory<String> factory =
    MessageReaderFactory.<String>builder()
            .service("kafka-service")
            .build();

try (MessageReader<String> reader = factory.getReader()) {
    Message<String> msg;
    while (Objects.nonNull(msg = reader.read())) {
        System.out.print(msg.getValue());
    }
}

MessageReader

Below is a brief description of the classes in the figure.

2.2.2 MessageWriter

We show the sequence diagram when the following message transmission program is executed. Here we assume that the program sends messages to Kafka brokers.

MessageWriterFactory<String> factory =
    MessageWriterFactory.<String>builder()
            .service("kafka-service")
            .build();

try (MessageWriter<String> writer = factory.getWriter()) {
    for (String msg : messages) {
        writer.write(msg);
    }
}

MessageWriter

Below is a brief description of the classes in the figure.

2.2.3 AsyncMessageReader

以下に示す非同期APIのメッセージ受信処理を SINETStreamで行った場合のシーケンス図を示します。 ここではKafkaブローカーからメッセージを受信することを想定しています。

MessageReaderFactory<String> factory =
    MessageReaderFactory.<String>builder()
            .service("kafka-service")
            .build();

try (AsyncMessageReader<String> reader = factory.getAsyncReader()) {
    reader.addOnMessageCallback((message) -> {
        System.out.print(msg.getValue());
    });

    // 他の処理
    otherTask();
}

AsyncMessageReader

図中のクラスについての簡単な説明を以下に記します。

2.2.4 AsyncMessageWriter

以下に示す非同期APIのメッセージ送信処理を SINETStreamで行った場合のシーケンス図を示します。 ここではKafkaブローカーにメッセージを送信することを想定しています。

MessageWriterFactory<String> factory =
    MessageWriterFactory.<String>builder()
            .service("kafka-service")
            .build();

try (AsyncMessageWriter<String> writer = factory.getAsyncWriter()) {
    for (String msg : messages) {
        writer.write(msg)
            .then(r -> System.err.println("success"));
    }
}

MessageWriter

図中のクラスについての簡単な説明を以下に記します。

3. How to implement a plugin

3.1 Outline

SINETStream users can use a plugin using ServiceLoader. The following steps are required to develop a plugin.

Details of each step are described below.

3.2 Create a provider configuration file

Registering service providers in the provider configuration file allows ServiceLoader to find the plug-in.

Place the provider configuration file in META-INF/services/ in the resource directory. The file name must be the service provider’s fully qualified class name. In the case of service providers that support SINETStream message reception and transmission, the file names are as follows:

In the configuration file, the fully qualified class name of the implemented service provider class is described in the one-class-per-line manner.

For example, when you register the jp.ad.sinet.stream.plugins.kafka.KafkaMessageProvider class that sends messages to Kafka brokers, you need to create the configuration file META-INF/services/jp.ad.sinet.stream.spi.MessageWriterProvider and describe the following contents in the file.

jp.ad.sinet.stream.plugins.kafka.KafkaMessageProvider

SINETStreamには4つのサービスプロバイダがありますが、1つのプラグインで全てのサービスプロバイダに対応する必要はありません。 サポートするものに対応する構成ファイルのみを作成してください。 XXX

3.3 Implement service providers

3.3.1 Service provider class to send messages (sync API)

To develop a service provider to send messages (sync API), it is necessary to create an implementation class of the following interface class.

The methods of MessageWriterProvider are described as follows:

The main methods of PluginMessageWriter are shown below.

3.3.2 Service provider class to receive messages (sync API)

To develop a service provider to receive messages, it is necessary to create an implementation class of the following interface class.

The methods of MessageReaderProvider are described as follows:

The main methods of PluginMessageReader are shown below.

3.3.3 メッセージ送信(非同期API)のためのクラス

メッセージ送信(非同期API)を行うサービスプロバイダを実装するには、 以下に示すインターフェースの実装クラスを作成する必要があります。

AsyncMessageWriterProviderのメソッドを以下に示します。

PluginAsyncMessageWriterの主なメソッドを以下に示します。

3.3.4 メッセージ受信(非同期API)のためのクラス

メッセージ受信(非同期API)を行うサービスプロバイダを実装するには、 以下に示すインターフェースの実装クラスを作成する必要がある。

AsyncMessageReaderProviderのメソッドを以下に示します。

PluginAsyncMessageReaderの主なメソッドを以下に示します。

4. An example of a plugin implementation

An example is provided to show the steps for implementing a plug-in.

Instead of accessing the actual broker, data passing using java.util.Queue object in the process is realized as a SINETStream plugin in this example.

4.1 File organication

The following files need to be created:

4.2 Implementation classes

This section describes the classes to be implemented as plugins.

Only the main processing is explained here. Refer to the link in “Source code” to check the entire sample code. XXX TODO check link

4.2.1 QueueMessageProvider.java

QueueMessageProvider.java is an implementation class of provider interfaces, MessageReaderProvider and MessageWriterProvider.

public class QueueMessageProvider implements MessageReaderProvider, MessageWriterProvider,
        AsyncMessageReaderProvider, AsyncMessageWriterProvider {

    private static final ConcurrentMap<String, BlockingQueue<QueueMessage>> queues = new ConcurrentHashMap<>();

    @Override
    public String getType() {
        return "queue";
    }

    @Override
    public PluginMessageReader getReader(ReaderParameters params) {
        String topic = params.getTopics().get(0);
        BlockingQueue<QueueMessage> queue = queues.computeIfAbsent(topic, key -> new LinkedBlockingQueue<>());
        return new QueueMessageReader(params, queue);
    }

    @Override
    public PluginMessageWriter getWriter(WriterParameters params) {
        String topic = params.getTopic();
        BlockingQueue<QueueMessage> queue = queues.computeIfAbsent(topic, key -> new LinkedBlockingQueue<>());
        return new QueueMessageWriter(params, queue);
    }

    @Override
    public PluginAsyncMessageReader getAsyncReader(ReaderParameters params) {
        String topic = params.getTopics().get(0);
        BlockingQueue<QueueMessage> queue = queues.computeIfAbsent(topic, key -> new LinkedBlockingQueue<>());
        return new QueueAsyncMessageReader(params, queue);
    }

    @Override
    public PluginAsyncMessageWriter getAsyncWriter(WriterParameters params) {
        String topic = params.getTopic();
        BlockingQueue<QueueMessage> queue = queues.computeIfAbsent(topic, key -> new LinkedBlockingQueue<>());
        return new QueueAsyncMessageWriter(params, queue);
    }
}

getType() returns the type name of the messaging system. getReader() returns a QueueMessageReader object that implements PluginMessageReader of the plugin. getWriter() returns a QueueMessageWriter object that implements PluginMessageWriter of the plugin in the same manner. getAsyncReader()でプラグインのPluginAsyncMessageReader実装となるQueueAsyncMessageReaderオブジェクトを返します。 getAsyncWriter()でプラグインのPluginAsyncMessageWriter実装となるQueueAsyncMessageWriterオブジェクトを返します。 XXX

The object queues of BlockingQueue are passed as an argument to the constructors of QueueMessageReader, QueueMessageWriter, QueueAsyncMessageReader and QueueAsyncMessageWriter. The messages will be passed through the queues.

4.2.2 QueueMessageReader.java

QueueMessageReader.java is a class that implements PluginMessageReader.

public class QueueMessageReader implements PluginMessageReader {
(XXX omitted)
    @Override
    public PluginMessageWrapper read() {
        try {
            return queue.poll(receiveTimeout.getSeconds(), TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new SinetStreamIOException(e);
        }
    }
(XXX omitted)
}

read() is the method that receives messages from the messaging system and returns their values. Here read() receives a message from the queue and returns its value.

4.2.3 QueueMessageWriter.java

QueueMessageWriter.java is a class that implements PluginMessageWriter.

public class QueueMessageWriter implements PluginMessageWriter {
(XXX omitted)
    @Override
    public void write(byte[] aByte) {
        QueueMessage msg = new QueueMessage(topic, aByte);
        try {
            queue.put(msg);
        } catch (InterruptedException e) {
            throw new SinetStreamIOException(e);
        }
    }
(XXX omitted)
}

write() is a method that sends the byte array to the messaging system. Here, the byte array received as an argument is wrapped in the QueueMessage class and sent to queue.

4.2.4 QueueAsyncMessageReader.java XXX

PluginAsyncMessageReaderを実装したクラスになります。

public class QueueAsyncMessageReader implements PluginAsyncMessageReader {
(中略)
    public QueueAsyncMessageReader(ReaderParameters params, BlockingQueue<QueueMessage> queue) {
(中略)
        executor = Executors.newSingleThreadExecutor();
        future = executor.submit(this::pollingTask);
    }

    private void pollingTask() {
        try {
            while (!closed.get()) {
                onMessage(queue.take());
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void onMessage(PluginMessageWrapper message) {
        for (Consumer<PluginMessageWrapper> callback : onMessageCallbacks) {
            try {
                callback.accept(message);
            } catch (Throwable ex) {
                onFailure(ex);
            }
        }
    }
(中略)
    @Override
    public void addOnMessageCallback(Consumer<PluginMessageWrapper> onMessage, Consumer<Throwable> onFailure) {
        if (Objects.nonNull(onMessage)) {
            onMessageCallbacks.add(onMessage);
        }
        if (Objects.nonNull(onFailure)) {
            onFailureCallbacks.add(onFailure);
        }
    }
(中略)
}

addOnMessageCallback()は、メッセージングシステムからメッセージの取得した際に呼び出すコールバック関数を登録します。 また、コンストラクタで起動したexecutorのスレッドでpollingTask() を実行し queue からのメッセージの取得を行います。 メッセージの取得に成功すると登録されているコールバック関数をメッセージを引数にして呼び出します。

4.2.5 QueueAsyncMessageWriter.java XXX

PluginAsyncMessageWriterを実装したクラスになります。

public class QueueAsyncMessageWriter implements PluginAsyncMessageWriter {
(中略)
    private final DefaultDeferredManager manager =
            new DefaultDeferredManager(Executors.newFixedThreadPool(4));
(中略)
    @Override
    public Promise<?, ? extends Throwable, ?> write(byte[] bytes) {
        if (closed.get()) {
            throw new SinetStreamIOException();
        }
        return manager.when(() -> enqueue(bytes));
    }

    private void enqueue(byte[] bytes) {
        QueueMessage msg = new QueueMessage(topic, bytes);
        try {
            queue.put(msg);
        } catch (InterruptedException e) {
            throw new SinetStreamIOException(e);
        }
    }
(中略)
}

PluginAsyncMessageWriterは非同期APIを想定しているので、メッセージ送信処理のwrite()ではqueueへの追加を直接は行っていません。 manager.when()を呼び出すことでmanagerが管理するスレッドプールにqueueへのメッセージ追加のタスクを依頼しています。 そのためwrite()はブロックせずに直ぐに返ります。

4.3 Create provider configuration files.

The following two files are created in META-INF/services/ of the resource directory.

4.4 Create a jar file

The steps to create a plugin jar file are shown in the followings:

  1. Install Gradle
  2. Run gradle to create a jar file.
    $ gradle jar
    
  3. Confirm that a jar file has been created in build/libs/.
    $ ls build/libs/
    SINETStream-queue-1.2.0.jar
    

4.5 Source code

The followings are links to example files of the plugin implementations.