背景:准备三台服务器,需要安装zook集群和kafka集群。kafka集群的认证需要配置使用ssl
序号 | 名称 | ip |
---|---|---|
1 | zook01、kafka01 | 192.168.0.251 |
2 | zook02、kafka02 | 192.168.0.250 |
3 | zook03、kafka03 | 192.168.0.249 |
- 配置hosts解析
一、安装zookeeper集群
接着上一篇博客《zookeeper 3.7集群安装》,因此我们就直接使用上一篇的Zookeeper配置,最终会搭建一个既有SASL认证,也有SSL认证的Kafka集群。当然SSL和SASL两种安全机制是独立的,单独使用SASL或者SSL都是可以的。
导入kafka的相关jar
从kafka/libs目录下复制以下几个jar包到zookeeper的lib目录下:
kafka-clients-2.8.0.jar
lz4-java-1.7.1.jar
slf4j-api-1.7.30.jar
slf4j-log4j12-1.7.30.jar
snappy-java-1.1.8.1.jar
1、给zoo.cfg 添加SASL认证
[root@localhost bin]# cat ../conf/zoo.cfg |grep -v '^$'|grep -v '^#'
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zookeeper
clientPort=2181
####新添加的两行在这里
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
#####
jaasLoginRenew=3600000
dataLog=/tmp/zookeeper/logs
server.1=0.0.0.0:2888:3888
server.2=192.168.0.250:2888:3888
server.3=192.168.0.249:2888:3888
新添加的两行,用来支持SASL认证。
2、创建Zookeeper认证文件
[root@localhost conf]# cat zookeeper_jaas.conf
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_admin="admin1234"
user_kafka="kafka1234";
};
####这两个参数说明一下:user_admin="admin1234"这句配置的是超级用户admin,后面引号里设置的则是它的密码admin1234。因此下面的user_kafka="kafka1234"设置的是Kafka连接Zookeeper要用的账户和密码,这个账户和密码在后面Kafka的配置中还要用,请先记住这点。其意思就是,一个叫做kafka的账户名,密码是kafka1234。前面的user_就是为了识别这个配置是一个账户名用的####
3、启动时加载认证文件
回到cd ../bin
目录,使用vi zookeeper-start.sh
命令,创建一个新的启动脚本,用于Zookeeper启动时加载认证文件,把下面的内容粘贴进去,保存退出。注意这里的文件路径和启动路径都是在/bin
下的,如果你配置的东西不在这里,要修改路径。
[root@localhost bin]# cat zook-start.sh
export JVMFLAGS="-Djava.security.auth.login.config=../conf/zookeeper_jaas.conf -Dzookeeper.4lw.commands.whitelist=*"
./zkServer.sh restart &
[root@localhost bin]# chmod u+x zookeeper-start.sh
使用命令chmod u+x zookeeper-start.sh
给脚本文件赋权。
4、启动
到此如果在/bin目录下直接输入命令sh zookeeper-start.sh就可以启动Zookeeper,有如下字样Zookeeper就是启动成功了。
如果不放心还可以用ps –ef | grep “zookeeper”来看下启动的现成是不是加载了zookeeper_sasl.conf文件,截取一部分输出,下面红框就是加载的认证文件。也可以用zkCli.sh去连接服务器看看是不是真的可用。那么到此Zookeeper配置启动完毕。
二、安装kafka集群并配置Scram账户认证
需要对于Kafka做了一个权限的配置,本篇是针对Kafka如何配置账户认账做一个详细的演示,理论知识会稍有涉及但不是重点。更多内容请点击【Apache Kafka API AdminClient 目录】
官网的介绍关于kafka权限支持 Authentication of connections to brokers from clients (producers and consumers), other brokers and tools, using either SSL or SASL. Kafka supports the following SASL mechanisms: • SASL/GSSAPI (Kerberos) - starting at version 0.9.0.0 • SASL/PLAIN - starting at version 0.10.0.0 • SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512 - starting at version 0.10.2.0 • SASL/OAUTHBEARER - starting at version 2.0
Kafka本身支持的认证方式有很多,同时支持SSL或者SASL,上面展示的四种是kafka官方推荐的SASL认证方式,如何选择一个合适的认证方式呢?我们可以做一个横向对比。
认证方式 说明 SASL/GSSAPI 主要是给 Kerberos 用户使用的,如果当前已经有了Kerberos认证,只需要给集群中每个Broker和访问用户申请Principals,然后在Kafka的配置文件中开启Kerberos的支持即可,官方参考:[Authentication using SASL/Kerberos]。 SASL/PLAIN 是一种简单的用户名/密码身份验证机制,通常与TLS/SSL一起用于加密,以实现安全身份验证。是一种比较容易使用的方式,但是有一个很明显的缺点,这种方式会把用户账户文件配置到一个静态文件中,每次想要添加新的账户都需要重启Kafka去加载静态文件,才能使之生效,十分的不方便,官方参考[Authentication using SASL/PLAIN]。 SASL/SCRAM 通过将认证用户信息保存在 ZooKeeper 里面,从而动态的获取用户信息,相当于把ZK作为一个认证中心使用了。这种认证可以在使用过程中,使用 Kafka 提供的命令动态地创建和删除用户,无需重启整个集群,十分方便。官方参考[Authentication using SASL/SCRAM]。 SASL/OAUTHBEARER kafka 2.0 版本引入的新认证机制,主要是为了实现与 OAuth 2 框架的集成。Kafka 不提倡单纯使用 OAUTHBEARER,因为它生成的不安全的 Json Web Token,必须配以 SSL 加密才能用在生产环境中。官方参考[Authentication using SASL/OAUTHBEARER] 。 如果使用SASL/GSSAPI那么需要新搭建Kerberos不太划算;SASL/PLAIN的方式可能会在使用过程中频繁的重启,非常的繁琐;而SASL/OAUTHBEARER属于Kafka新提供的,而且也没有这方面的需求,可以等等市场反应再说。因此综合来说最终选择了SASL/SCRAM的认证方法增强Kafka的安全功能,这也是本篇博客推荐和使用的方式。
版本说明
apache-zookeeper-3.7.0-bin.tar.gz kafka_2.12-2.8.0.tgz CentOS Linux release 7.9.2009 (Core) [root@localhost ~]# java -version openjdk version "1.8.0_292"
Kakfa配置
1、下载安装kakfa
官网下载需要的版本kafka-2.12-2.8.0.tar.gz
[root@localhost ~]# wget https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz
[root@localhost ~]# ls kafka_2.12-2.8.0.tgz
kafka_2.12-2.8.0.tgz
[root@localhost ~]# tar xaf kafka_2.12-2.8.0.tgz
[root@localhost ~]# mv kafka_2.12-2.8.0 /usr/kafka
2、配置kafka
全部配置如下,3台机器除了broker.id是唯一的,必须不一样l,listerners、advertised.listeners配置为对应的主机host。其他的配置相同。下面分段进行分析说明。
[root@localhost ~]# cd /usr/kafka/config/
[root@localhost config]# cat server.properties |grep -v "^#" |grep -v "^$"
broker.id=0
listeners=SASL_PLAINTEXT://kafka01:9092,PLAINTEXT://kafka01:9093,SASL_SSL://Kafka01:9094
advertised.listeners=SASL_PLAINTEXT://kafka01:9092,PLAINTEXT://kafka01:9093,SASL_SSL://kafka01:9094
ssl.truststore.location=/root/jks_ssl/certificates/kafka.truststore
ssl.truststore.password=kafka1234567
ssl.keystore.location=/root/jks_ssl/certificates/kafka.keystore
ssl.keystore.password=kafka1234567
ssl.key.password=kafka1234567
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.client.auth=required
ssl.keystore.type=JKS
ssl.truststore.type=JKS
security.inter.broker.protocol=PLAINTEXT
sasl.mechanism.inter.broker.protocol=SSL,PLAIN,SCRAM-SHA-512
sasl.enabled.mechanisms=SSL,PLAIN,SCRAM-SHA-512
zookeeper.set.acl=true
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:admin;User:kafka
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/kafka/logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.0.249:2181,192.168.0.250:2181,192.168.0.251:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
3、参数说明
3.1、通用配置参数
#服务器id,这个是每个机器的唯一id。每个机器都是独一无二的,习惯性就按顺序来。 broker.id=1 #监听端口,这里也可以用hostname代替,只要host里面配置好了都可以,这里为Kafka broker配置了三个listeners,一个是文本加密,一个是明文传输;另一个使用SSL加密进行数据传输 # 如果是公网环境,clients通过公网(或外网)去连接broker,那么advertiesd.listeners就必须配置成所在机器的公网IP listeners=SASL_PLAINTEXT://kafka01:9092,PLAINTEXT://kafka01:9093,SASL_SSL://Kafka01:9094 advertised.listeners=SASL_PLAINTEXT://kafka01:9092,PLAINTEXT://kafka01:9093,SASL_SSL://kafka01:9094 #存放当前数据的目录,这里所有机器尽量都一样,这样操作起来比较容易 log.dirs=/usr/kafka/logs #连接Zookeeper。 zookeeper.connect=192.168.0.249:2181,192.168.0.250:2181,192.168.0.251:2181 #如果需要用Zookeeper里面创建的“/kakfa”的节点,所以最后一个ip上跟着一个/kafka,看起来比较整洁。如果没有这样的需求不需要带。 #zookeeper.connect=192.168.0.249:2181,192.168.0.250:2181,192.168.0.251:2181/kafka
3.2、SSL参数配置
参考: Kafka配置SSL(云环境)
利用脚本setup_ssl_for_servers.sh生产SSL证书
ssl.truststore.location=/root/jks_ssl/certificates/kafka.truststore # 提供SSL truststore的文件 ssl.truststore.password=kafka1234567 # 提供truststore密码 ssl.keystore.location=/root/jks_ssl/certificates/kafka.keystore # 提供SSL keystore的文件 ssl.keystore.password=kafka1234567 # 提供keystore密码 ssl.key.password=kafka1234567 # keystore中的私钥密码 ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 ssl.client.auth=required # 设置clients也要开启认证 ssl.keystore.type=JKS #ssl文件格式 ssl.truststore.type=JKS #ssl文件格式 security.inter.broker.protocol=PLAINTEXT #Broker内部联络使用的security协议 sasl.mechanism.inter.broker.protocol=SSL,PLAIN,SCRAM-SHA-512 #Broker内部联络使用的sasl协议,这里也可以配置多个,比如SCRAM-SHA-512,SCRAM-SHA-256并列使用 sasl.enabled.mechanisms=SSL,PLAIN,SCRAM-SHA-512 #Broker允许使用的sasl协议,这里也可以配多个PLAIN,SCRAM-SHA-512,SCRAM-SHA-256
3.3、zook认证参数
zookeeper.set.acl=true #设置zookeeper是否使用ACL#设置ACL类(低于 2.4.0 版本推荐使用 SimpleAclAuthorizer)#authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer#设置ACL类(高于 2.4.0 版本推荐使用 AclAuthorizer)authorizer.class.name=kafka.security.authorizer.AclAuthorizer#设置Kafka超级用户账号,这两个分别对应zookeeper_jaas.conf中的user_super="super1234"和user_kafka="kafka1234";super.users=User:admin;User:kafka
3.4其他参数
num.network.threads=3 #broker处理消息的最大线程数,一般情况下数量为cpu核数num.io.threads=8 #broker处理磁盘IO的线程数,数值为cpu核数2倍socket.send.buffer.bytes=102400 #socket的发送缓冲区,socket的调优参数SO_SNDBUFFsocket.receive.buffer.bytes=102400 #socket的接受缓冲区,socket的调优参数SO_RCVBUFFsocket.request.max.bytes=104857600 #socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖log.dirs=/usr/kafka/logs #kafka存放数据的路径。可以是多个,多个使用逗号分隔即可。num.partitions=1 #每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖num.recovery.threads.per.data.dir=1 #每个数据目录用来日志恢复的线程数目。默认1。offsets.topic.replication.factor=3 #复制因子topic的offset的备份份数。建议设置更高的数字保证更高的可用性。默认3transaction.state.log.replication.factor=3 #复制因子transaction.state.log.min.isr=3 #复制因子log.retention.hours=168 #数据文件保留多长时间,也可以写成log.retention.minutes=300log.segment.bytes=1073741824 #topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖log.retention.check.interval.ms=300000 #文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
3.5、其他推荐配置
#每条最大消息设置为3MB,超过此size会报错,可以自由调整replica.fetch.max.bytes=3145728message.max.bytes=3145728#默认的备份数量,可以自由调整default.replication.factor=2#默认的partion数量,可以自由调整num.partitions=3#是否允许彻底删除topic,低版本这里设置为false则是隐藏topicdelete.topic.enable=true#如果topic不存在,是否允许创建一个新的。这里特别推荐设置为false,否则可能会因为手滑多出很多奇奇怪怪的topic出来auto.create.topics.enable=false
4、创建Kafka认证文件
在./config
目录下,我们还需要在启动的时候加载一个认证文件。所以直接vi kafka-broker-jaas.conf
创建一个认证文件,粘贴下面内容,保存退出。
[root@localhost config]# vim kafka-broker-jaas.properties KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin1234";};Client { org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka1234";};
这里要解释一下这两块的内容:首先KafkaServer这里配置的是Kafka服务器本身的超级账户admin和其密码,使用的是ScramLoginModule模式,也就是标题的登陆认证方式。直接使用这个超级账户登陆,整个Kafka集群就相当于对你打开了大门。需要设计一些Kafka工具的时候可以使用,所以好好保存不要泄露了。后面配置的Client是用来登陆Zookeeper使用的,也就是上面我们配置到zookeeper_sasl.conf 文件中的user_kafka="kafka1234"一行所对应的,这里看到登陆Zookeeper要用的账户就是kafka,密码就是kafka1234。这点设计的比较绕,需要多理解理解。
5、启动时加载认证文件
cd ./bin
文件下执行vi kafka-starter.sh
,粘贴下面内容到文件里,保存退出。然后使用命令chmod u+x kafka-starter.sh
给脚本文件赋权。
[root@localhost bin]# vim kafka-start.sh #!/bin/bashexport KAFKA_OPTS="-Djava.security.auth.login.config=/usr/kafka/config/kafka-broker-jaas.properties"export JMX_PORT=9999export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"./kafka-server-start.sh -daemon ../config/server.properties[root@localhost bin]# chmod u+x kafka-start.sh
第一行KAFKA_OPTS
配置的是加载的认证文件的路径;第二行JMX_PORT
是监控端口,可以不配置;第三行KAFKA_HEAP_OPTS
是配置启动占用内存的,随意调整,也可以不配置用默认的;第四行执行Kafka开始脚本和做好的配置文件。
6、(修改)加载类配置认证文件
当上述步骤都配置完毕以后有些/bin目录下的命令(比如kafka-console-producer.sh)都不能直接使用了,需要带着用户名密码才可以,这就给我们做一些简单的测试造成了很大的麻烦。我们可以通过在启动类中配置认证文件,从而跳过用户名密码的输入,这一步就是让Kafka服务器识别SASL/PLAIN的认证方式。具体做法就是vi kafka-run-class.sh打开这个脚本,然后把下面的一行贴进去,文件开头,文件末尾都可以,不要贴到循环或者if条件语句中就行。保存退出就可以准备启动了,再次提醒认证文件路径要写对。
[root@localhost bin]# vim kafka-run-class.sh #.......if [ -z "$KAFKA_OPTS" ]; then#修改kafka-run-class.sh下面这一行配置KAFKA_OPTS文件位置 KAFKA_OPTS="-Djava.security.auth.login.config=/usr/kafka/config/kafka-broker-jaas.properties"fi#.......
7、启动
执行sh kafka-starter.sh
启动Kafka。所有的Kafka机器按照上面的步骤配置一边,然后启动完毕,就可以开始使用了。第一运行会打印下面的started字样,如果不是第一次启动,则会把所有内容加载一遍输出,这个字段就不太好找了。
[2021-06-08 21:14:43,863] INFO Kafka commitId : 21234bee31165527 (org.apache.kafka.common.utils.AppInfoParser)[2021-06-08 21:14:43,864] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
FAQ:可能的报错
这里照例列出笔者在配置过程中遇到的报错。
inter.broker.listener.name must be a listener name defined in advertised.listeners.
[2021-02-18 15:50:42,400] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$) java.lang.IllegalArgumentException: requirement failed: inter.broker.listener.name must be a listener name defined in advertised.listeners. The valid options based on currently configured listeners are PLAINTEXT at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1781) at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1756) at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1312) at kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:34) at kafka.Kafka$.main(Kafka.scala:68) at kafka.Kafka.main(Kafka.scala)
这个报错有2种可能:
没有配置advertised.listeners这个参数,配置上即可; 参数security.inter.broker.protocol=SASL_PLAINTEXT里面配置的内容在listeners 和advertised.listeners里面都没有。因为这两个参数可以给不同的端口配置不同的安全协议,比如listeners=SASL_PLAINTEXT:// 192.168.33.101:9092, PLAINTEXT:// 192.168.33.101:9093, SASL_SSL:// 192.168.33.101:9094。这样用户就可以通过9092端口使用SASL/Scram验证连接,使用9094端口使用SSL+SASL验证连接,以及通过9093端口进行完全不要任何验证的连接,因此PLAINTEXT一定不要在生产环境上配置,太危险了。回到我们的报错,security.inter.broker.protocol配置的内容,必须在listeners里面配置的有才行,比如下面的配置就一定会报这个异常。 listeners= PLAINTEXT:// 192.168.33.101:9093 security.inter.broker.protocol= SASL_PLAINTEXT
############ 下面是一些说明 ############### #listeners和advertised.listeners参数可以根据不同的端口和需求配置不同的认证方式,建议二者保持一致,如果有内外网,就区分端口给不同的上下游
listeners=SASL_PLAINTEXT://192.168.33.101:9092,PLAINTEXT://192.168.33.101:9093,SASL_SSL://192.168.33.101:9094advertised.listeners=SASL_PLAINTEXT://192.168.33.101:9092,PLAINTEXT://192.168.33.101:9093,SASL_SSL://192.168.33.101:9094#security.inter.broker.protocol这个参数的值必须配置一个上面有的才行security.inter.broker.protocol= SASL_PLAINTEXT
ERROR SASL authentication failed using login context ‘Client’ with exception: {}
[2021-02-14 17:47:16,241] ERROR SASL authentication failed using login context 'Client' with exception: {} (org.apache.zookeeper.client.ZooKeeperSaslClient)javax.security.sasl.SaslException: Error in authenticating with a Zookeeper Quorum member: the quorum member's saslToken is null. at org.apache.zookeeper.client.ZooKeeperSaslClient.createSaslToken(ZooKeeperSaslClient.java:312) at org.apache.zookeeper.client.ZooKeeperSaslClient.respondToServer(ZooKeeperSaslClient.java:275) at org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:882) at org.apache.zookeeper.ClientCnxnSocketNIO.doIO(ClientCnxnSocketNIO.java:101) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:363) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1223)[2021-02-14 17:47:16,243] ERROR [ZooKeeperClient Kafka server] Auth failed. (kafka.zookeeper.ZooKeeperClient)[2021-02-14 17:47:16,244] INFO EventThread shut down for session: 0x20000121b750003 (org.apache.zookeeper.ClientCnxn)[2021-02-14 17:47:16,269] ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)org.apache.zookeeper.KeeperException$AuthFailedException: KeeperErrorCode = AuthFailed for /kafka at org.apache.zookeeper.KeeperException.create(KeeperException.java:130) at org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:564) at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1662) at kafka.zk.KafkaZkClient.makeSurePersistentPathExists(KafkaZkClient.scala:1560) at kafka.server.KafkaServer.$anonfun$initZkClient$2(KafkaServer.scala:461) at kafka.server.KafkaServer.$anonfun$initZkClient$2$adapted(KafkaServer.scala:458) at scala.Option.foreach(Option.scala:437) at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:458) at kafka.server.KafkaServer.startup(KafkaServer.scala:233) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:44) at kafka.Kafka$.main(Kafka.scala:82) at kafka.Kafka.main(Kafka.scala)
这种报错是Kafka无法链接Zookeeper导致的,一般有以下两种原因:
使用windows拖拽的方式会导致有无法看见的结束符,不识别conf文件而造成失败。 或者启动的时候没有用新建的bash文件加载配置文件,而是直接启动服务器脚本,导致没有加载conf文件报错。
参考文档:zookeeper和kafka的SASL认证以及生产实践
推荐:kafka可视化客户端工具(Kafka Tool)的基本使用