English

SINETStreamからKafkaのSASL/SCRAM認証を利用する

概要

SASL/SCRAM認証をおこなうように設定をされた Kafaka ブローカーをSINETStreamから利用する方法について説明する。

この文書のおもな記述の流れを以下に示す。

  1. 設定手順の前提条件について
  2. KafkaブローカーにSASL/SCRAM認証を設定する手順について
  3. SINETStreamからSASL/SCRAM認証を行うKafkaブローカーにアクセスする手順について
  4. 認証エラーとなった場合のSINETStreamの挙動について

前提条件

Kafkaブローカーの構成や設定については様々な状況が想定される。 設定手順の記述を簡潔にするために、ここでは以下の前提条件をおく。

(*) 動作確認用の証明書の作成手順については「プライベート認証局における証明書の作成手順」を参照のこと。

設定例を示す場合のホスト名などの値を以下に示す。

実際に設定を行う際は、以下の値に対応する箇所を環境に合わせて適宜変更すること。 また SCRAM-SHA-512を利用する場合は SCRAM-SHA-256 となっている箇所をSCRAM-SHA-512に変更すること。

ブローカー側の設定手順

KafkaブローカーにSASL/SCRAM-SHA-256 認証を設定する手順について説明する。 また、SSL/TLS接続に関する設定もあわせて行う。

以下の手順で設定を行う。

  1. KafkaブローカーのSASL認証に関するプロパティを設定ファイルに記述する
  2. ZooKeeperにSCRAM資格情報を登録する
  3. Kafkaブローカーで読み込めるようにするためにPEM形式の証明書、秘密鍵をPKCS#12形式に変換する
  4. KafkaブローカーのSSL/TLS接続に関するプロパティを設定ファイルに記述する

SASL認証に関するプロパティを設定ファイルに記述する

Kafkaブローカーのプロパティファイル /srv/kafka/config/server.properties に以下の内容を追加する。

listeners=SASL_SSL://:9094
advertised.listeners=SASL_SSL://broker.example.org:9094
sasl.enabled.mechanisms=SCRAM-SHA-256
listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required;

追加したプロパティに関する簡単な説明を以下に記す。

ZooKeeperにSCRAM資格情報を登録する

kafka-configs.shコマンドを用いてZooKeeperにSCRAM資格情報を登録する。 --entity-nameにユーザ名を--add-configpasswordパラメータにパスワードを指定する。

$ /srv/kafka/bin/bin/kafka-configs.sh --zookeeper broker.example.org:2181 --alter --entity-type users \
      --entity-name user01 --add-config 'SCRAM-SHA-256=[iterations=8192,password=user01-pass]'
$ /srv/kafka/bin/bin/kafka-configs.sh --zookeeper broker.example.org:2181 --alter --entity-type users \
      --entity-name user02 --add-config 'SCRAM-SHA-256=[iterations=8192,password=user02-pass]'
$ /srv/kafka/bin/bin/kafka-configs.sh --zookeeper broker.example.org:2181 --alter --entity-type users \
      --entity-name user03 --add-config 'SCRAM-SHA-256=[iterations=8192,password=user03-pass]'

証明書、秘密鍵のファイルを PKCS#12 形式に変換する

SSL/TLS接続のための証明書を、Kafkaブローカーが読み込む形式に合わせて変換する。

トラストストアにはCA証明書とその秘密鍵を格納する。 キーストアにはKafkaブローカーのサーバ証明書とその秘密鍵を格納する。 トラストストア、キーストアともに PKCS#12形式のファイルとする。

まずトラストストアを作成する。openssl pkcs12コマンドを用いてPEM形式の証明書、秘密鍵をPKCS#12形式のファイルに変換する。 -in, -inkey でCA証明書とその秘密鍵のファイル名を指定する。-outで出力先のファイル名を指定し、-passout でトラストストアに設定するパスワードを指定する。

$ sudo mkdir -p /srv/kafka/config/cert
$ sudo openssl pkcs12 -export -in /etc/pki/CA/cacert.pem \
         -inkey /etc/pki/CA/private/cakey.pem -name private-ca \
         -CAfile /etc/pki/CA/cacert.pem -caname private-ca \
         -out /srv/kafka/config/cert/truststore.p12 \
         -passout pass:trust-pass-00

次に同様の手順でキーストアを作成する。キーストアにはブローカーのサーバ証明書を格納するので、 -in, -inkeyにはサーバ証明書とその秘密鍵のファイル名を指定する。 また-passoutにはキーストアに設定するパスワードを指定する。

$ sudo openssl pkcs12 -export -in /etc/pki/CA/certs/broker.crt \
         -inkey /etc/pki/CA/private/broker.key -name broker \
         -CAfile /etc/pki/CA/cacert.pem -caname private-ca \
         -out /srv/kafka/config/cert/keystore.p12 \
         -passout pass:key-pass-00

SSL/TLS接続に関するプロパティを設定ファイルに記述する

Kafkaブローカーのプロパティファイル /srv/kafka/config/server.properties に以下の内容を追加する。

ssl.truststore.location=/srv/kafka/config/cert/truststore.p12
ssl.truststore.password=trust-pass-00
ssl.truststore.type=pkcs12
ssl.keystore.location=/srv/kafka/config/cert/keystore.p12
ssl.keystore.password=key-pass-00
ssl.keystore.type=pkcs12

追加したプロパティに関する簡単な説明を以下に記す。

Kafkaブローカーの再起動

プロパティファイルの変更内容をKafkaブローカーに反映させるため、ブローカーを再起動する。

$ sudo /srv/kafka/bin/kafka-server-stop.sh
$ sudo /srv/kafka/bin/kafka-server-start.sh /srv/kafka/config/server.properties

サービスを中断することなく設定変更を行うにはKafkaブローカーを複数台構成とし、 ローリング再起動による設定変更の反映を行う必要がある。

SINETStream(クライアント側)の設定手順

SINETStreamからSASL/SCRAM-SHA-256認証を行うKafkaブローカーを利用するための設定について説明する。

以下の作業が必要となる。

  1. 証明書の準備
  2. SINETStreamの設定ファイルの作成
  3. SINETStreamを利用するプログラムの作成

証明書の準備

認証には直接関係しないが、SSL/TLS接続を利用するので必要となる証明書をクライアント環境に準備する。 以下のものが必要となる。

SINETStreamでは証明書の配置場所を定めてはいないので、配置する場所は利用者の判断にゆだねられる。 SINETStreamは、設定ファイルに記されたパスから証明書を読み込む。

SINETStreamの設定ファイルを作成する

設定ファイルの例を以下に示す。

service-kafka-sasl-scram:
  brokers: broker.example.org:9094
  type: kafka
  topic: topic-001
  tls:
    ca_certs: /opt/certs/cacert.pem
  security_protocol: SASL_SSL
  sasl_mechanism: SCRAM-SHA-256
  sasl_plain_username: user01
  sasl_plain_password: user01-pass

brokers, type, topic, tls についてはSASL認証を利用しない場合の設定ファイルと同様なので説明を省く。

SASL認証に関するパラメータの設定内容について以下に示す。

SINETStreamを利用するプログラムを作成する

SINETStreamを利用するプログラム自体は、 SASL/SCRAM認証を行うKafkaブローカーを利用する場合と認証なしのKafkaブローカーを利用する場合で変わりはない。

Python APIの MessageWriter を利用する場合の例を以下に示す。認証に関わる処理は存在していない。

with sinetstream.MessageWriter(service='service-kafka-sasl-scram') as writer:
    writer.publish(b'message 001')

認証情報をプログラムから設定したい場合は、コンストラクタの引数に認証情報のパラメータを追加すればよい。

user_passwd = {
    'sasl_plain_username': 'user01',
    'sasl_plain_password': 'user01-pass',
}
with sinetstream.MessageWriter(service='service-kafka-sasl-scram', **user_passwd) as writer:
    writer.publish(b'message 001')

認証エラーとなる場合の挙動について

Python API

認証でエラーになった場合、例外 sinetstream.error.ConnectionError が発生する。 例外が発生するメソッドを以下に示す。

Java API

認証でエラーになった場合、例外 jp.ad.sinet.stream.api.AuthenticationException が発生する。例外が発生するメソッドを以下に示す。