How to use a Kafka broker with authorization
Overview
This page describes how to connect from SINETStream to a Kafka broker that requires authorization.
The description will be made in the following order.
- Prerequisites
- Configurations on the Kafka broker (server side)
- Configurations on SINETStream (client side)
- Behavior on authorization errors
Prerequisites
Though the configuration and setting of a Kafka broker may vary, the following conditions are assumed for simplicity in this document.
- The Kafka broker consists of one node
- ZooKeeper is running on the same host as the Kafka broker
- The Kafka broker has been configured to use SASL/SCRAM authentication (1)(2)
- The Kafka broker has been configured to use SSL/TLS connection (*1)
- The target user of an access control list (ACL) has been registered (*1)
(*1) Refer to How to use a Kafka broker with SASL/SCRAM authentication for configuring authentication.
(*2) The configuration procedure for a Kafka broker is similar for authentication methods other than SASL/SCRAM.
The following values are used in the examples.
In practice, use appropriate values for your environment.
- Kafka broker
    - Hostname
        - broker.example.org
 
- Port
        - 9094
 
- Installed directory
        - /srv/kafka
 
- Property file path
        - /srv/kafka/config/server.properties
 
- Username/password
        - user01/- user01-pass- Permission: write
 
- user02/- user02-pass- Permission: read
 
- user03/- user03-pass- Permission: read, write
 
 
 
- Hostname
        
- ZooKeeper
    - Hostname
        - broker.example.org
 
- Port
        - 2181
 
 
- Hostname
        
- Certificate (client side)
    - CA certificate
        - /opt/certs/cacert.pem
 
 
- CA certificate
        
Configurations on the Kafka broker (server side)
The following procedure is needed for the Kafka broker to perform authorization.
- Edit the Kafka broker’s properties file
- Register ACL settings in ZooKeeper
Edit the Kafka broker’s properties file
Add the following line to the Kafka broker’s properties file /srv/kafka/config/server.properties.
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
authorizer.class.name is the class name used for authorization.
Use kafka.security.auth.SimpleAclAuthorizer to perform authorization according to the ACL set in ZooKeeper.
The following lines may also be added.
allow.everyone.if.no.acl.found=true
super.users=User:Admin1;User:Admin2
- allow.everyone.if.no.acl.found- When no ACL is set, whether to allow all users to access.
 
- super.users- List of superusers (each preceded by “User:” and separated by semicolon “;”).
 
Restart the Kafka broker to apply the changes in the properties file.
$ sudo /srv/kafka/bin/kafka-server-stop.sh
$ sudo /srv/kafka/bin/kafka-server-start.sh /srv/kafka/config/server.properties
In order to change the settings without interrupting the service, configure multiple Kafka brokers and reflect the changes by rolling restart.
Register ACL settings in ZooKeeper
In this example, the procedure to grant the following permissions is shown.
| Username | Permission | 
|---|---|
| user01 | write | 
| user02 | read | 
| user03 | read, write | 
Use the kafka-acls.sh command to configure ACL.
The important arguments for the command are:
- --authorizer-properties- zookeeper.connect- The address of ZooKeeper where the ACL is saved
 
 
- --allow-principal- The user name to be authorized (each preceded by “User:”)
 
- --topic- The topic name that requires authorization
- *means all topics.
 
- --group- The consumer group name that requires authorization
- *means all consumer groups.
 
- --producer- Configure ACL for producer.
 
- --consumer- Configure ACL for consumer.
 
Examples:
Grant write permission to user01.
$ sudo /srv/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
       --add --allow-principal User:user03 --producer --topic '*'
Grant read permission to user02.
$ sudo /srv/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
       --add --allow-principal User:user02 --consumer --topic '*' --group '*'
Grant read and write permission to user03.
$ sudo /srv/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
       --add --allow-principal User:user01 --producer --topic '*'
$ sudo /srv/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
       --add --allow-principal User:user01 --consumer --topic '*' --group '*'
Verify that the ACL is set on ZooKeeper.
$ sudo /srv/kafka/bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list
The following output indicates that the ACL is set on ZooKeeper.
Current ACLs for resource `Group:LITERAL:*`:
        User:user02 has Allow permission for operations: Read from hosts: *
        User:user03 has Allow permission for operations: Read from hosts: *
Current ACLs for resource `Topic:LITERAL:*`:
        User:user03 has Allow permission for operations: Write from hosts: *
        User:user03 has Allow permission for operations: Create from hosts: *
        User:user02 has Allow permission for operations: Read from hosts: *
        User:user01 has Allow permission for operations: Create from hosts: *
        User:user01 has Allow permission for operations: Write from hosts: *
        User:user03 has Allow permission for operations: Describe from hosts: *
        User:user03 has Allow permission for operations: Read from hosts: *
        User:user01 has Allow permission for operations: Describe from hosts: *
        User:user02 has Allow permission for operations: Describe from hosts: *
Refer to Authorization and ACLs of the Apache Kafka’s documentation for details on how to set ACLs.
Configurations on SINETStream (client side)
The following procedure is needed for SINETStream to connect to the Kafka broker with authorization.
- Prepare certificate
- Edit the SINETStream’s configuration file
- Create a program that uses SINETStream
Prepare certificate
The following certificate is required on the client side to use SSL/TLS connection.
- A CA certificate
Deploy the certificate to your convenient location. SINETStream reads the certificate from the path specified in the configuration file.
Edit the SINETStream’s configuration file
To connect to the Kafka broker with authorization, some configurations are required to determine the subject of authorization.
An example of SINETStream’s configuration file is shown below.
This is identical to the configuration for SASL/SCRAM authentication.
service-kafka:
  brokers: broker.example.org:9094
  type: kafka
  topic: topic-001
  consistency: AT_LEAST_ONCE
  tls:
    ca_certs: /opt/certs/cacert.pem
  security_protocol: SASL_SSL
  sasl_mechanism: SCRAM-SHA-256
  sasl_plain_username: user03
  sasl_plain_pasword: user03-pass
The settings for brokers, type, topic, consistency, tls are identical to those without authentication.
Settings related to SASL authentication and SSL/TLS connection are:
- security_protocol- Communication protocol with broker
- Specify SASL_SSLto use SASL authentication for SSL/TLS connection.
 
- sasl_mechanism- SASL authentication mechanism
- Specify SCRAM-SHA-256to use SASL/SCRAM-SHA-256.
 
- sasl_plain_username- The username for SASL/SCRAM authentication
 
- sasl_plain_password- The password for SASL/SCRAM authentication
 
Create a program that uses SINETStream
Your program will be identical with or without authorization.
For example, a program that uses MessageWriter of the SINETStream’s Python API is shown below.
with sinetstream.MessageWriter(service='service-kafka') as writer:
    writer.publish(b'message 001')
As you see, no code is written for authorization.
Behavior on authorization errors
Python API
MessageReader raises the sinetstream.error.AuthorizationError exception when it tries to read a message from a topic without read permission.
MessageWriter behaves differently depending on the consistency setting when it tries to write a message to a topic without write permission.
- If consistencyisAT_MOST_ONCE- The message will not be written to the topic.
- No exception is raised because publish()does not wait for the broker to respond.
 
- Otherwise
    - The message will not be written to the topic.
- The sinetstream.error.AuthorizationErrorexception is raised.
 
The methods listed below raises the “sinetstream.error.AuthorizationError” exception when an authorization error occurs.
- sinetstream.MessageWriter.publish()
- sinetstream.MessageReader.__iter___().__next__()
Java API
jp.ad.sinet.stream.api.MessageReader#read throws jp.ad.sinet.stream.api.AuthorizationException when it tries to read a message from a topic without read permission.
jp.ad.sinet.stream.api.MessageWriter#write behaves differently depending on the consistency setting when it tries to write a message to a topic without write permission.
- If consistencyisAT_MOST_ONCE- The message will not be written to the topic.
- No exception is thrown because write()does not wait for the broker to respond.
 
- Otherwise
    - The message will not be written to the topic.
- The jp.ad.sinet.stream.api.AuthorizationExceptionexception is thrown.