English

SINETStreamからKafkaのSSL/TLS認証(クライアント認証)を利用する

概要

SSL/TLSの双方向認証をおこなうように設定をされた Kafaka ブローカーをSINETStreamから利用する方法について説明する。

この文書のおもな記述の流れを以下に示す。

  1. 設定手順の前提条件について
  2. KafkaブローカーにSSL/TLSの双方向認証を設定する手順について
  3. SINETStreamからSSL/TLSの双方向認証を行うKafkaブローカーにアクセスする手順について
  4. 認証エラーとなった場合のSINETStreamの挙動について

前提条件

Kafkaブローカーの構成やSSL/TLSの設定については様々な状況が想定される。 設定手順の記述を簡潔にするために、ここでは以下の前提条件をおく。

(*) 動作確認用の証明書の作成手順については「プライベート認証局における証明書の作成手順」を参照のこと。

設定例を示す場合のホスト名などの値を以下に示す。

実際に設定を行う際は、以下の値に対応する箇所を環境に合わせて適宜変更すること。

ブローカー側の設定手順

KafkaブローカーにSSL/TLSの双方向認証を設定する手順について説明する。

以下の手順で設定を行う。

  1. Kafkaブローカーで読み込めるようにするためにPEM形式の証明書、秘密鍵をPKCS#12形式に変換する
  2. KafkaブローカーのSSL/TLS認証に関するプロパティを設定ファイルに記述する

証明書、秘密鍵のファイルを PKCS#12 形式に変換する

Kafkaブローカーが読み込む形式に合わせて証明書の形式を変換する。

トラストストアにはCA証明書とその秘密鍵を格納する。 キーストアにはKafkaブローカーのサーバ証明書とその秘密鍵を格納する。 トラストストア、キーストアともに PKCS#12形式のファイルとする。

まずトラストストアを作成する。openssl pkcs12コマンドを用いてPEM形式の証明書、秘密鍵をPKCS#12形式のファイルに変換する。 -in, -inkey でCA証明書とその秘密鍵のファイル名を指定する。-outで出力先のファイル名を指定し、-passout でトラストストアに設定するパスワードを指定する。

$ sudo mkdir -p /srv/kafka/config/cert
$ sudo openssl pkcs12 -export -in /etc/pki/CA/cacert.pem \
         -inkey /etc/pki/CA/private/cakey.pem -name private-ca \
         -CAfile /etc/pki/CA/cacert.pem -caname private-ca \
         -out /srv/kafka/config/cert/truststore.p12 \
         -passout pass:trust-pass-00

次に同様の手順でキーストアを作成する。キーストアにはブローカーのサーバ証明書を格納するので、 -in, -inkeyにはサーバ証明書とその秘密鍵のファイル名を指定する。 また-passoutにはキーストアに設定するパスワードを指定する。

$ sudo openssl pkcs12 -export -in /etc/pki/CA/certs/broker.crt \
         -inkey /etc/pki/CA/private/broker.key -name broker \
         -CAfile /etc/pki/CA/cacert.pem -caname private-ca \
         -out /srv/kafka/config/cert/keystore.p12 \
         -passout pass:key-pass-00

SSL/TLS認証に関するプロパティを設定ファイルに記述する

Kafkaブローカーのプロパティファイル /srv/kafka/config/server.properties に以下の内容を追加する。

listeners=SSL://:9093
advertised.listeners=SSL://broker.example.org:9093
ssl.truststore.location=/srv/kafka/config/cert/truststore.p12
ssl.truststore.password=trust-pass-00
ssl.truststore.type=pkcs12
ssl.keystore.location=/srv/kafka/config/cert/keystore.p12
ssl.keystore.password=key-pass-00
ssl.keystore.type=pkcs12
ssl.client.auth=required

追加したプロパティに関する簡単な説明を以下に記す。

プロパティファイルの変更内容をKafkaブローカーに反映させるため、ブローカーを再起動する。

$ sudo /srv/kafka/bin/kafka-server-stop.sh
$ sudo /srv/kafka/bin/kafka-server-start.sh /srv/kafka/config/server.properties

サービスを中断することなく設定変更を行うにはKafkaブローカーを複数台構成とし、 ローリング再起動による設定変更の反映を行う必要がある。

SINETStream(クライアント側)の設定手順

SINETStreamからSSL/TLS認証を行うKafkaブローカーを利用するための設定について説明する。 以下の作業が必要となる。

  1. 証明書の準備
  2. SINETStreamの設定ファイルの作成
  3. SINETStreamを利用するプログラムの作成

証明書の準備

SSL/TLSの双方向認証を利用するのに必要となる証明書類をクライアント環境に準備する。 以下のものが必要となる。

プライベート認証局などで作成した証明書をクライアント環境に配置する。 SINETStreamでは証明書の配置場所を定めてはいないので、配置する場所は利用者の判断にゆだねられる。 SINETStreamは、設定ファイルに記されたパスから証明書を読み込む。

SINETStreamの設定ファイルを作成する

設定ファイルの例を以下に示す。

service-kafka-ssl:
  brokers: broker.example.org:9093
  type: kafka
  topic: topic-001
  tls:
    ca_certs: /opt/certs/cacert.pem
    certfile: /home/user01/certs/client0.crt
    keyfile: /home/user01/certs/client0.key

brokers, type, topic については認証を利用しない場合の設定ファイルと同様なので説明を省く。 SSL/TLS認証に関わる設定は tls: 以降の行が該当する。 各パラメータの意味を以下に示す。

SINETStreamを利用するプログラムを作成する

SINETStreamを利用するプログラム自体は、 SSL/TLS認証を行うKafkaブローカーを利用する場合と認証なしのKafkaブローカーを利用する場合で変わりはない。

Python APIの MessageWriter を利用する場合の例を以下に示す。認証に関わる処理は存在していない。

with sinetstream.MessageWriter(service='service-kafka-ssl') as writer:
    writer.publish(b'message 001')

認証情報をプログラムから設定したい場合は、コンストラクタの引数に認証情報のパラメータを追加すればよい。

tls = {
    'ca_certs': '/opt/certs/cacert.pem',
    'certfile': '/home/user01/certs/client0.crt',
    'keyfile': '/home/user01/certs/client0.key',
}
with sinetstream.MessageWriter(service='service-kafka', tls=tls) as writer:
    writer.publish(b'message 001')

認証エラーとなる場合の挙動について

Python API

認証でエラーになった場合、例外 sinetstream.error.ConnectionError が発生する。 例外が発生するメソッドを以下に示す。

Java API

認証でエラーになった場合、例外 jp.ad.sinet.stream.api.AuthenticationException が発生する。例外が発生するメソッドを以下に示す。