各位大佬,想看那种网络设备/操作系统/数据库/中间件的测评命令清单,可在留言区留言!
依据 GB/T 22239-2019《信息安全技术 网络安全等级保护基本要求》第三级”安全计算环境” 条款,结合Kafka官方安全指南及现场测评实践。
适用版本:Kafka 2.4.x / 2.5.x / 2.6.x / 2.7.x / 2.8.x / 3.0.x / 3.1.x / 3.2.x / 3.3.x / 3.4.x / 3.5.x / 3.6.x
一、身份鉴别
1.1 Broker认证配置
| 控制项 | 测评命令/配置 | 达标判据 |
|---|---|---|
| 客户端认证 | server.properties sasl.enabled.mechanisms | 启用SASL |
| 认证机制 | sasl.mechanism.inter.broker.protocol | GSSAPI/SCRAM/PLAIN |
| 超级用户 | super.users | 配置超级用户 |
| 证书认证 | ssl.client.auth | required |
| 会话超时 | connections.max.idle.ms | 合理设置 |
Kafka特有配置:
# 查看Kafka配置目录
ls-la$KAFKA_HOME/config/
# 查看Broker核心配置
cat$KAFKA_HOME/config/server.properties
# 关键安全配置检查
grep-E"security|sasl|ssl|auth|password|kerberos|acl|super"$KAFKA_HOME/config/server.properties
# 关键配置项:
# listeners=SASL_SSL://:9093,SSL://:9092,PLAINTEXT://:9092
# security.inter.broker.protocol=SASL_SSL
# sasl.enabled.mechanisms=GSSAPI,SCRAM-SHA-256,SCRAM-SHA-512
# sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# ssl.client.auth=required
# super.users=User:admin;User:kafka
# authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# allow.everyone.if.no.acl.found=false
# connections.max.idle.ms=600000
# 查看认证机制详情
cat$KAFKA_HOME/config/server.properties |grep-E"sasl.enabled.mechanisms|sasl.mechanism"
# 支持的SASL机制:
# GSSAPI - Kerberos认证
# PLAIN - 明文认证(不推荐生产环境)
# SCRAM-SHA-256 - 推荐
# SCRAM-SHA-512 - 推荐
# OAUTHBEARER - OAuth 2.0
# 查看JAAS配置(Kafka Server)
cat$KAFKA_HOME/config/kafka_server_jaas.conf
# 关键配置:
# KafkaServer {
# org.apache.kafka.common.security.scram.ScramLoginModule required
# username="admin"
# password="admin-secret"
# user_admin="admin-secret"
# user_alice="alice-secret";
# };
# 查看Kerberos配置(如启用)
cat$KAFKA_HOME/config/kafka_server_jaas.conf |grep-A10"GSSAPI"
cat /etc/krb5.conf |grep-E"default_realm|kdc"
# 查看ZooKeeper认证配置
cat$KAFKA_HOME/config/server.properties |grep-E"zookeeper|zk"
# zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
# zookeeper.set.acl=true # 启用ZooKeeper ACL
# 查看ZooKeeper JAAS配置
cat$KAFKA_HOME/config/kafka_server_jaas.conf |grep-A5"Client"
1.2 客户端认证配置
# 查看生产者JAAS配置
cat$KAFKA_HOME/config/producer_jaas.conf 2>/dev/null
# 查看消费者JAAS配置
cat$KAFKA_HOME/config/consumer_jaas.conf 2>/dev/null
# 查看客户端属性配置
cat$KAFKA_HOME/config/client.properties
# 关键配置:
# security.protocol=SASL_SSL
# sasl.mechanism=SCRAM-SHA-256
# sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="alice" password="alice-secret";
# 查看SSL客户端配置
cat$KAFKA_HOME/config/client-ssl.properties 2>/dev/null
# 查看控制台生产者安全配置
kafka-console-producer.sh --broker-list localhost:9093 --topictest\
--producer.config$KAFKA_HOME/config/client.properties 2>&1|head-5
# 查看控制台消费者安全配置
kafka-console-consumer.sh --bootstrap-server localhost:9093 --topictest\
--consumer.config$KAFKA_HOME/config/client.properties 2>&1|head-5
二、访问控制
2.1 ACL权限管理
| 控制项 | 测评命令 | 达标判据 |
|---|---|---|
| Topic ACL | kafka-acls.sh --list --topic | 配置Topic权限 |
| Group ACL | kafka-acls.sh --list --group | 配置消费者组权限 |
| Cluster ACL | kafka-acls.sh --list --cluster | 配置集群权限 |
| 事务ID ACL | kafka-acls.sh --list --transactional-id | 配置事务权限 |
| Delegation Token | kafka-delegation-tokens.sh --list | 配置委托令牌 |
Kafka特有配置:
# 查看ACL授权器配置
cat$KAFKA_HOME/config/server.properties |grep"authorizer.class.name"
# 应配置为:
# authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# 查看默认ACL策略
cat$KAFKA_HOME/config/server.properties |grep"allow.everyone.if.no.acl.found"
# 应设置为false(生产环境)
# 查看Topic ACL列表
kafka-acls.sh --bootstrap-server localhost:9093 --list--topic"*"\
--command-config $KAFKA_HOME/config/client.properties 2>/dev/null
# 查看特定Topic ACL
kafka-acls.sh --bootstrap-server localhost:9093 --list--topic test-topic \
--command-config $KAFKA_HOME/config/client.properties 2>/dev/null
# 查看消费者组ACL
kafka-acls.sh --bootstrap-server localhost:9093 --list--group"*"\
--command-config $KAFKA_HOME/config/client.properties 2>/dev/null
# 查看集群级ACL
kafka-acls.sh --bootstrap-server localhost:9093 --list--cluster\
--command-config $KAFKA_HOME/config/client.properties 2>/dev/null
# 查看事务ID ACL
kafka-acls.sh --bootstrap-server localhost:9093 --list --transactional-id "*"\
--command-config $KAFKA_HOME/config/client.properties 2>/dev/null
# 查看委托令牌(Delegation Token)
kafka-delegation-tokens.sh --bootstrap-server localhost:9093 --list\
--command-config $KAFKA_HOME/config/client.properties 2>/dev/null
# 查看主体权限(Principal)
kafka-acls.sh --bootstrap-server localhost:9093 --list--principal"User:alice"\
--command-config $KAFKA_HOME/config/client.properties 2>/dev/null
# 查看IP白名单配置
cat$KAFKA_HOME/config/server.properties |grep-E"advertised.listeners|listeners"
2.2 资源权限详解
# Topic权限类型:
# --operation Create - 创建Topic
# --operation Delete - 删除Topic
# --operation Describe - 查看元数据
# --operation DescribeConfigs - 查看配置
# --operation Alter - 修改配置
# --operation AlterConfigs - 修改动态配置
# --operation Read - 消费消息
# --operation Write - 生产消息
# 查看Topic创建权限
kafka-acls.sh --bootstrap-server localhost:9093 --list--topic"*"--operation Create \
--command-config $KAFKA_HOME/config/client.properties 2>/dev/null
# 查看Topic删除权限
kafka-acls.sh --bootstrap-server localhost:9093 --list--topic"*"--operation Delete \
--command-config $KAFKA_HOME/config/client.properties 2>/dev/null
# 查看集群权限(创建Topic等管理操作)
kafka-acls.sh --bootstrap-server localhost:9093 --list--cluster--operation Create \
--command-config $KAFKA_HOME/config/client.properties 2>/dev/null
# 查看Idempotent Producer权限(精确一次语义)
kafka-acls.sh --bootstrap-server localhost:9093 --list--cluster--operation IdempotentWrite \
--command-config $KAFKA_HOME/config/client.properties 2>/dev/null
# 查看消费者组权限
kafka-acls.sh --bootstrap-server localhost:9093 --list--group"consumer-group-1"\
--command-config $KAFKA_HOME/config/client.properties 2>/dev/null
# 查看生产者事务权限
kafka-acls.sh --bootstrap-server localhost:9093 --list --transactional-id "prod-1"\
--command-config $KAFKA_HOME/config/client.properties 2>/dev/null
三、安全审计
3.1 日志与审计配置
| 控制项 | 测评命令/配置 | 达标判据 |
|---|---|---|
| 授权日志 | authorizer.class.name | 启用ACL日志 |
| 请求日志 | log4j.logger.kafka.request.logger | 记录请求 |
| 操作审计 | log4j.logger.kafka.authorizer.logger | 记录授权 |
| 日志保留 | log.retention.hours | ≥168(7天) |
| 日志保护 | 文件权限 | 640 kafka |
Kafka特有配置:
# 查看日志配置
cat$KAFKA_HOME/config/log4j.properties
cat$KAFKA_HOME/config/tools-log4j.properties
# 关键审计配置:
# log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender
# log4j.logger.kafka.request.logger=WARN, requestAppender
# log4j.logger.kafka.network.Processor=INFO
# log4j.logger.kafka.server.KafkaApis=INFO
# 查看授权日志配置
cat$KAFKA_HOME/config/log4j.properties |grep-i"authorizer"
# 查看请求日志配置
cat$KAFKA_HOME/config/log4j.properties |grep-i"request"
# 查看日志目录
ls-la$KAFKA_HOME/logs/
ls-la /var/log/kafka/ 2>/dev/null
ls-la /tmp/kafka-logs/ 2>/dev/null
# 查看日志文件
ls-la$KAFKA_HOME/logs/server.log
ls-la$KAFKA_HOME/logs/controller.log
ls-la$KAFKA_HOME/logs/state-change.log
ls-la$KAFKA_HOME/logs/kafka-authorizer.log 2>/dev/null
# 查看授权日志内容
cat$KAFKA_HOME/logs/kafka-authorizer.log 2>/dev/null |tail-50
grep-i"acl\|authorized\|denied"$KAFKA_HOME/logs/server.log |tail-30
# 查看请求日志内容
cat$KAFKA_HOME/logs/kafka-request.log 2>/dev/null |tail-50
# 查看日志保留配置
cat$KAFKA_HOME/config/server.properties |grep-E"log.retention|log.segment"
# 关键配置:
# log.retention.hours=168 # 7天
# log.retention.bytes=1073741824 # 1GB
# log.segment.bytes=1073741824
# log.retention.check.interval.ms=300000
# 查看日志清理策略
cat$KAFKA_HOME/config/server.properties |grep"log.cleanup.policy"
# delete - 删除旧日志
# compact - 日志压缩
# 查看审计日志文件权限
ls-la$KAFKA_HOME/logs/*.log |head-5
stat-c'%a %U:%G'$KAFKA_HOME/logs/server.log
3.2 审计事件分析
# 查看授权失败事件
grep-i"Principal = Denied"$KAFKA_HOME/logs/server.log |tail-20
grep-i"not authorized"$KAFKA_HOME/logs/server.log |tail-20
# 查看认证失败事件
grep-i"authentication failed\|sasl\|login failed"$KAFKA_HOME/logs/server.log |tail-20
# 查看Topic创建/删除事件
grep-i"Creating topic\|Deleting topic"$KAFKA_HOME/logs/controller.log |tail-20
# 查看配置变更事件
grep-i"DynamicConfig\|Config changed"$KAFKA_HOME/logs/server.log |tail-20
# 查看分区重分配事件
grep-i"Partition reassign"$KAFKA_HOME/logs/server.log |tail-20
# 查看消费者组变更
grep-i"GroupMetadataManager\|consumer group"$KAFKA_HOME/logs/server.log |tail-20
# 使用kafka-acls.sh查看操作日志(需配置)
kafka-acls.sh --bootstrap-server localhost:9093 --list\
--command-config $KAFKA_HOME/config/client.properties 2>&1|head-20
# 查看连接日志
grep-i"Accepted connection\|Connection from"$KAFKA_HOME/logs/server.log |tail-20
# 查看断开连接日志
grep-i"Connection disconnected\|Expired session"$KAFKA_HOME/logs/server.log |tail-20
四、入侵防范
4.1 系统加固
# 查看Kafka版本
cat$KAFKA_HOME/RELEASE 2>/dev/null
ls$KAFKA_HOME/libs/kafka_* |head-1
# 查看CVE漏洞(需外部扫描)
# 检查是否使用存在漏洞的版本(如CVE-2021-44228 Log4Shell)
# 查看Log4j版本(Log4Shell检查)
ls-la$KAFKA_HOME/libs/log4j-core-*.jar
# 应使用2.17.1+版本
# 查看ZooKeeper版本(依赖安全)
ls-la$KAFKA_HOME/libs/zookeeper-*.jar
# 查看Scala版本
ls-la$KAFKA_HOME/libs/scala-library-*.jar
# 查看网络线程配置
cat$KAFKA_HOME/config/server.properties |grep-E"num.network.threads|num.io.threads"
# 查看连接限制
cat$KAFKA_HOME/config/server.properties |grep-E"connections.max.idle|max.connections"
# 查看请求大小限制
cat$KAFKA_HOME/config/server.properties |grep-E"max.request.size|message.max.bytes"
# 关键配置:
# max.request.size=1048576 # 1MB
# message.max.bytes=1000012
# 查看重试和超时配置
cat$KAFKA_HOME/config/server.properties |grep-E"retry|timeout"
# 查看线程池配置
cat$KAFKA_HOME/config/server.properties |grep-E"num.threads|num.network|num.io"
# 查看内存配置
cat$KAFKA_HOME/config/server.properties |grep-E"heap|memory|buffer"
ps-ef|grep kafka |grep-oE"\-Xmx[0-9]+[mg]|\-Xms[0-9]+[mg]"
# 查看文件描述符限制
cat /proc/$(pgrep -f"kafka.Kafka")/limits |grep"Max open files"
ulimit-n
4.2 网络安全防护
# 查看监听配置
cat$KAFKA_HOME/config/server.properties |grep-E"listeners|advertised.listeners"
# 关键配置:
# listeners=SASL_SSL://:9093,CONTROLLER://:9094
# advertised.listeners=SASL_SSL://broker1.example.com:9093
# listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 查看协议映射
cat$KAFKA_HOME/config/server.properties |grep"listener.security.protocol.map"
# 查看防火墙规则
iptables -L-n|grep-E"9092|9093|9094|2181"
firewall-cmd --list-all |grep-E"9092|9093|9094|2181"
# 查看Controller配置(KRaft模式,Kafka 3.0+)
cat$KAFKA_HOME/config/server.properties |grep-E"controller|kraft|node.id|process.roles"
# KRaft模式关键配置:
# process.roles=broker,controller
# node.id=1
# controller.quorum.voters=1@localhost:9093
# 查看ZooKeeper连接(传统模式)
cat$KAFKA_HOME/config/server.properties |grep"zookeeper.connect"
# 查看ZooKeeper安全
cat$KAFKA_HOME/config/server.properties |grep"zookeeper.set.acl"
# 应设置为true
# 查看Broker间安全
cat$KAFKA_HOME/config/server.properties |grep"security.inter.broker.protocol"
# 应设置为SASL_SSL或SSL
五、传输与数据安全
5.1 SSL/TLS加密
# 查看SSL配置
cat$KAFKA_HOME/config/server.properties |grep-E"ssl.|ssl.keystore|ssl.truststore"
# 关键配置:
# ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
# ssl.keystore.password=test1234
# ssl.key.password=test1234
# ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
# ssl.truststore.password=test1234
# ssl.client.auth=required
# ssl.enabled.protocols=TLSv1.2
# ssl.cipher.suites=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
# 查看证书详情
keytool -list-v-keystore$KAFKA_HOME/config/kafka.server.keystore.jks -storepass test1234 2>/dev/null |head-20
# 查看证书有效期
keytool -list-v-keystore$KAFKA_HOME/config/kafka.server.keystore.jks 2>/dev/null |grep-E"Valid from|until"
# 查看协议版本(应禁用TLSv1.0/1.1)
cat$KAFKA_HOME/config/server.properties |grep"ssl.enabled.protocols"
# 应配置为:TLSv1.2,TLSv1.3
# 查看密码套件
cat$KAFKA_HOME/config/server.properties |grep"ssl.cipher.suites"
# 应禁用弱密码套件
# 查看客户端证书认证
cat$KAFKA_HOME/config/server.properties |grep"ssl.client.auth"
# 应设置为required
# 查看国密SSL配置(如使用国密版JDK)
cat$KAFKA_HOME/config/server.properties |grep-E"gmssl|sm2|sm3|sm4"
# 查看端点识别算法
cat$KAFKA_HOME/config/server.properties |grep"ssl.endpoint.identification.algorithm"
# 应设置为HTTPS
5.2 数据加密与压缩
# 查看数据加密配置(Kafka 3.0+支持KMS集成)
cat$KAFKA_HOME/config/server.properties |grep-E"encryption|kms|key"
# 查看日志压缩配置
cat$KAFKA_HOME/config/server.properties |grep-E"compression|compression.type"
# 查看消息格式版本
cat$KAFKA_HOME/config/server.properties |grep"log.message.format.version"
# 查看事务配置
cat$KAFKA_HOME/config/server.properties |grep-E"transaction|transactional"
# 关键配置:
# transaction.state.log.replication.factor=3
# transaction.state.log.min.isr=2
# transaction.state.log.num.partitions=50
# 查看幂等生产者配置
cat$KAFKA_HOME/config/server.properties |grep"enable.idempotence"
# 查看Exactly-Once语义配置
cat$KAFKA_HOME/config/server.properties |grep-E"transaction|isolation"
六、高可用与灾备
# 查看副本因子配置
cat$KAFKA_HOME/config/server.properties |grep"default.replication.factor"
cat$KAFKA_HOME/config/server.properties |grep"offsets.topic.replication.factor"
cat$KAFKA_HOME/config/server.properties |grep"transaction.state.log.replication.factor"
# 关键配置(生产环境):
# default.replication.factor=3
# offsets.topic.replication.factor=3
# transaction.state.log.replication.factor=3
# min.insync.replicas=2
# 查看最小同步副本
cat$KAFKA_HOME/config/server.properties |grep"min.insync.replicas"
# 查看Unclean Leader选举
cat$KAFKA_HOME/config/server.properties |grep"unclean.leader.election.enable"
# 应设置为false
# 查看Controller配置
cat$KAFKA_HOME/config/server.properties |grep-E"controller|zookeeper"
# 查看分区重分配配置
cat$KAFKA_HOME/config/server.properties |grep"leader.imbalance"
# 查看备份配置
ls-la /backup/kafka/ 2>/dev/null
ls-la$KAFKA_HOME/backup/ 2>/dev/null
# 查看Topic列表
kafka-topics.sh --bootstrap-server localhost:9093 --list\
--command-config $KAFKA_HOME/config/client.properties 2>/dev/null
# 查看Topic详情(副本分布)
kafka-topics.sh --bootstrap-server localhost:9093 --describe--topictest\
--command-config $KAFKA_HOME/config/client.properties 2>/dev/null
# 查看消费者组状态
kafka-consumer-groups.sh --bootstrap-server localhost:9093 --list\
--command-config $KAFKA_HOME/config/client.properties 2>/dev/null
# 查看Broker状态
kafka-broker-api-versions.sh --bootstrap-server localhost:9093 \
--command-config $KAFKA_HOME/config/client.properties 2>/dev/null |head-10
七、Kafka与大数据生态安全集成
7.1 ZooKeeper安全集成
# 查看ZooKeeper连接
cat$KAFKA_HOME/config/server.properties |grep"zookeeper.connect"
# 查看ZooKeeper ACL
cat$KAFKA_HOME/config/server.properties |grep"zookeeper.set.acl"
# 应设置为true
# 查看ZooKeeper JAAS配置
cat$KAFKA_HOME/config/kafka_server_jaas.conf |grep-A5"Client"
# 查看ZooKeeper SSL配置(ZooKeeper 3.5+)
cat$KAFKA_HOME/config/server.properties |grep-E"zookeeper.ssl|zookeeper.clientCnxnSocket"
7.2 Schema Registry安全
# 查看Schema Registry配置(如使用Confluent)
cat /etc/schema-registry/schema-registry.properties 2>/dev/null |grep-E"security|ssl|auth"
# 查看Schema Registry认证
cat /etc/schema-registry/schema-registry.properties 2>/dev/null |grep-E"authentication|kafkastore.security"
# 查看Avro序列化安全
cat$KAFKA_HOME/config/producer.properties |grep"schema.registry"
7.3 Kafka Connect安全
# 查看Connect Worker配置
cat$KAFKA_HOME/config/connect-distributed.properties |grep-E"security|ssl|sasl"
# 查看Connect连接器配置
cat$KAFKA_HOME/config/connect-standalone.properties |grep-E"security|ssl|sasl"
# 查看Connect密钥配置
cat$KAFKA_HOME/config/connect-distributed.properties |grep-E"config.providers|config.providers.file"
八、一键巡检脚本(Kafka)
#!/bin/bash
# Apache Kafka 等保三级一键巡检脚本
# 适用:Kafka 2.4.x - 3.6.x
KAFKA_HOME=${1:-/opt/kafka}
BOOTSTRAP_SERVER=${2:-localhost:9093}
CLIENT_CONFIG=${3:-$KAFKA_HOME/config/client.properties}
echo"===== Apache Kafka 等保三级巡检 ====="
echo"巡检时间:$(date)"
echo"主机名:$(hostname)"
echo"KAFKA_HOME: $KAFKA_HOME"
echo"BOOTSTRAP_SERVER: $BOOTSTRAP_SERVER"
echo""
if[!-d"$KAFKA_HOME"];then
echo"错误:未找到Kafka安装目录 $KAFKA_HOME"
exit1
fi
echo"===== 1 身份鉴别 ====="
echo"--- 版本信息 ---"
ls$KAFKA_HOME/libs/kafka_* 2>/dev/null |head-1|grep-oP'kafka_\K[0-9]+\.[0-9]+\.[0-9]+'
echo"--- 认证机制 ---"
grep-E"sasl.enabled.mechanisms|sasl.mechanism"${KAFKA_HOME}/config/server.properties 2>/dev/null |head-3
echo"--- 超级用户 ---"
grep"super.users"${KAFKA_HOME}/config/server.properties 2>/dev/null
echo"--- JAAS配置 ---"
cat${KAFKA_HOME}/config/kafka_server_jaas.conf 2>/dev/null |head-10||echo"未找到JAAS配置"
echo"--- 客户端认证 ---"
grep"ssl.client.auth"${KAFKA_HOME}/config/server.properties 2>/dev/null
echo""
echo"===== 2 访问控制 ====="
echo"--- 授权器配置 ---"
grep"authorizer.class.name"${KAFKA_HOME}/config/server.properties 2>/dev/null
echo"--- 默认ACL策略 ---"
grep"allow.everyone.if.no.acl.found"${KAFKA_HOME}/config/server.properties 2>/dev/null
echo"--- ACL列表(需认证)---"
kafka-acls.sh --bootstrap-server ${BOOTSTRAP_SERVER}--list\
--command-config ${CLIENT_CONFIG}2>/dev/null |head-20||echo"无法获取ACL(需检查认证配置)"
echo"--- 超级用户权限 ---"
grep"super.users"${KAFKA_HOME}/config/server.properties 2>/dev/null
echo""
echo"===== 3 安全审计 ====="
echo"--- 授权日志配置 ---"
grep-i"authorizer"${KAFKA_HOME}/config/log4j.properties 2>/dev/null |head-3
echo"--- 日志目录 ---"
ls-la${KAFKA_HOME}/logs/ 2>/dev/null |head-5
ls-la /var/log/kafka/ 2>/dev/null |head-5
echo"--- 日志保留配置 ---"
grep-E"log.retention"${KAFKA_HOME}/config/server.properties 2>/dev/null |head-3
echo"--- 最近授权事件 ---"
grep-i"acl\|authorized\|denied"${KAFKA_HOME}/logs/server.log 2>/dev/null |tail-10||echo"无授权日志"
echo""
echo"===== 4 入侵防范 ====="
echo"--- Log4j版本检查 ---"
ls${KAFKA_HOME}/libs/log4j-core-*.jar 2>/dev/null |head-1
echo"[检查] 确认版本>=2.17.1(防Log4Shell)"
echo"--- 网络线程配置 ---"
grep-E"num.network.threads|num.io.threads"${KAFKA_HOME}/config/server.properties 2>/dev/null
echo"--- 请求大小限制 ---"
grep-E"max.request.size|message.max.bytes"${KAFKA_HOME}/config/server.properties 2>/dev/null
echo"--- 连接限制 ---"
grep-E"connections.max.idle|max.connections"${KAFKA_HOME}/config/server.properties 2>/dev/null
echo"--- 内存配置 ---"
ps-ef|grep kafka |grep-oE"\-Xmx[0-9]+[mg]|\-Xms[0-9]+[mg]"|head-1
echo""
echo"===== 5 传输安全 ====="
echo"--- SSL配置 ---"
grep-E"ssl.keystore|ssl.truststore|ssl.client.auth"${KAFKA_HOME}/config/server.properties 2>/dev/null |head-5
echo"--- SSL协议版本 ---"
grep"ssl.enabled.protocols"${KAFKA_HOME}/config/server.properties 2>/dev/null
echo"--- 密码套件 ---"
grep"ssl.cipher.suites"${KAFKA_HOME}/config/server.properties 2>/dev/null
echo"--- 证书文件 ---"
ls-la${KAFKA_HOME}/config/*.jks ${KAFKA_HOME}/config/*.p12 2>/dev/null |head-3
echo"--- 监听配置 ---"
grep-E"^listeners|advertised.listeners"${KAFKA_HOME}/config/server.properties 2>/dev/null
echo""
echo"===== 6 高可用与灾备 ====="
echo"--- 副本因子配置 ---"
grep-E"default.replication.factor|offsets.topic.replication.factor"${KAFKA_HOME}/config/server.properties 2>/dev/null
echo"--- 最小同步副本 ---"
grep"min.insync.replicas"${KAFKA_HOME}/config/server.properties 2>/dev/null
echo"--- Unclean Leader选举 ---"
grep"unclean.leader.election.enable"${KAFKA_HOME}/config/server.properties 2>/dev/null
echo"--- KRaft模式 ---"
grep-E"process.roles|node.id|controller.quorum.voters"${KAFKA_HOME}/config/server.properties 2>/dev/null |head-3
echo"--- ZooKeeper模式 ---"
grep"zookeeper.connect"${KAFKA_HOME}/config/server.properties 2>/dev/null |head-1
echo"--- ZooKeeper ACL ---"
grep"zookeeper.set.acl"${KAFKA_HOME}/config/server.properties 2>/dev/null
echo""
echo"===== 通用安全检查 ====="
echo"--- 进程运行用户 ---"
ps-ef|grep-E"kafka.Kafka|kafka-server"|grep-vgrep|awk'{print $1}'|sort|uniq-c
echo"--- 配置文件权限 ---"
ls-la${KAFKA_HOME}/config/server.properties ${KAFKA_HOME}/config/kafka_server_jaas.conf 2>/dev/null |awk'{print $1, $3, $4, $9}'
echo"--- 日志文件权限 ---"
ls-la${KAFKA_HOME}/logs/server.log 2>/dev/null |awk'{print $1, $3, $4, $9}'
echo"--- 端口监听 ---"
ss -tulnp|grep-E"9092|9093|9094|2181"2>/dev/null |head-5
echo"--- 环境变量检查 ---"
env|grep-E"KAFKA|PASSWORD|SECRET|KEY"|grep-v"HIST"|awk-F='{print $1}'|head-5
echo"[警告] 避免在环境变量中存储敏感信息"
echo""
echo"===== 巡检完成 ====="
echo"重点关注以下高风险项:"
echo"1. 未启用SASL认证(listeners=PLAINTEXT)"
echo"2. 未配置ACL授权器(authorizer.class.name为空)"
echo"3. 允许无ACL访问(allow.everyone.if.no.acl.found=true)"
echo"4. 未启用SSL/TLS加密(listeners=SASL_PLAINTEXT)"
echo"5. 客户端证书认证非强制(ssl.client.auth!=required)"
echo"6. 使用弱SSL协议(ssl.enabled.protocols包含TLSv1.0/1.1)"
echo"7. Log4j版本过低(<2.17.1,存在Log4Shell漏洞)"
echo"8. 副本因子过低(default.replication.factor<3)"
echo"9. 允许Unclean Leader选举(unclean.leader.election.enable=true)"
echo"10. ZooKeeper未启用ACL(zookeeper.set.acl=false)"
九、高风险项重点核查清单
| 检查项 | 验证命令 | 不合规判定 | 整改建议 |
|---|---|---|---|
| 未启用SASL认证 | grep listeners server.properties | 包含PLAINTEXT | 启用SASL_SSL或SSL |
| 未配置ACL授权器 | grep authorizer.class.name | 为空或SimpleAclAuthorizer | 配置AclAuthorizer |
| 允许无ACL访问 | grep allow.everyone.if.no.acl.found | true或未配置 | 设置为false |
| 超级用户未配置 | grep super.users | 为空 | 配置超级用户 |
| 未启用SSL | grep listeners | 无SASL_SSL或SSL | 启用SSL监听 |
| 客户端证书非强制 | grep ssl.client.auth | 不为required | 设置为required |
| 弱SSL协议 | grep ssl.enabled.protocols | 包含TLSv1.0/1.1 | 仅启用TLSv1.2+ |
| Log4j版本过低 | ls log4j-core-*.jar | <2.17.1 | 升级Log4j |
| 副本因子过低 | grep default.replication.factor | <3 | 设置为≥3 |
| ZooKeeper无ACL | grep zookeeper.set.acl | false或未配置 | 设置为true |
十、Kafka与RabbitMQ/RocketMQ对比
| 对比项 | Apache Kafka | RabbitMQ | RocketMQ |
|---|---|---|---|
| 架构 | 分布式日志 | 消息代理 | 分布式消息 |
| 吞吐量 | 极高 | 高 | 高 |
| 延迟 | 毫秒级 | 微秒级 | 毫秒级 |
| 安全认证 | SASL/SSL | SASL/SSL | AK/SK/SSL |
| ACL支持 | 完善 | 完善 | 完善 |
| 事务消息 | 支持 | 支持 | 支持 |
| 消息追踪 | 有限 | 完善 | 完善 |
| 云原生 | 优秀 | 良好 | 良好 |
| 等保合规 | 中 | 中 | 中 |
| 国密支持 | 需配置 | 需配置 | 需配置 |
十一、等保测评执行要点
1. 关键安全配置检查
# server.properties 生产环境推荐配置
# 监听配置(仅启用安全协议)
listeners=SASL_SSL://:9093
advertised.listeners=SASL_SSL://broker1.example.com:9093
listener.security.protocol.map=SASL_SSL:SASL_SSL
security.inter.broker.protocol=SASL_SSL
# SASL认证
sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# SSL配置
ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
ssl.keystore.password=${file:/var/private/ssl/keystore-password:keystore_password}
ssl.key.password=${file:/var/private/ssl/key-password:key_password}
ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
ssl.truststore.password=${file:/var/private/ssl/truststore-password:truststore_password}
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.endpoint.identification.algorithm=HTTPS
# ACL授权
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false
super.users=User:admin;User:kafka
# 高可用配置
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
# ZooKeeper安全
zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
zookeeper.set.acl=true
2. JAAS配置示例
// kafka_server_jaas.conf
KafkaServer{
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_alice="alice-secret"
user_bob="bob-secret";
};
Client{
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="zookeeper-secret";
};
3. 现场访谈要点
- 是否启用SASL/SSL认证(生产环境必须)
- 是否配置ACL并禁用默认允许策略
- 是否配置超级用户进行紧急管理
- 是否启用SSL客户端证书认证
- 是否配置审计日志并定期分析
- 是否启用ZooKeeper ACL保护元数据
- 是否配置副本因子≥3和min.insync.replicas≥2
- 是否禁用Unclean Leader选举
- 是否升级到无漏洞的Log4j版本
4. 版本差异
| 功能项 | Kafka 2.x | Kafka 3.0 | Kafka 3.5+ |
|---|---|---|---|
| KRaft模式 | 预览 | 支持 | 推荐 |
| ZK依赖 | 必需 | 可选 | 可选 |
| 安全增强 | 基础 | 增强 | 完善 |
| 云原生 | 良好 | 优秀 | 优秀 |
| 等保合规 | 基础 | 增强 | 完善 |
参考标准:GB/T 22239-2019、GB/T 28448-2019、Apache Kafka Security Documentation
适用版本:Kafka 2.4.x / 2.5.x / 2.6.x / 2.7.x / 2.8.x / 3.0.x / 3.1.x / 3.2.x / 3.3.x / 3.4.x / 3.5.x / 3.6.x
验证环境:x86_64 / ARM64 / 国产化芯片(飞腾/鲲鹏/龙芯/海光/兆芯/申威)
声明:来自汪汪虚拟空间,仅代表创作者观点。链接:https://eyangzhen.com/7391.html