等保测评命令——Kafka

各位大佬,想看那种网络设备/操作系统/数据库/中间件的测评命令清单,可在留言区留言!

依据 GB/T 22239-2019《信息安全技术 网络安全等级保护基本要求》第三级”安全计算环境” 条款,结合Kafka官方安全指南及现场测评实践。

适用版本:Kafka 2.4.x / 2.5.x / 2.6.x / 2.7.x / 2.8.x / 3.0.x / 3.1.x / 3.2.x / 3.3.x / 3.4.x / 3.5.x / 3.6.x


一、身份鉴别

1.1 Broker认证配置

控制项测评命令/配置达标判据
客户端认证server.properties sasl.enabled.mechanisms启用SASL
认证机制sasl.mechanism.inter.broker.protocolGSSAPI/SCRAM/PLAIN
超级用户super.users配置超级用户
证书认证ssl.client.authrequired
会话超时connections.max.idle.ms合理设置

Kafka特有配置:

# 查看Kafka配置目录
ls-la$KAFKA_HOME/config/

# 查看Broker核心配置
cat$KAFKA_HOME/config/server.properties

# 关键安全配置检查
grep-E"security|sasl|ssl|auth|password|kerberos|acl|super"$KAFKA_HOME/config/server.properties

# 关键配置项:
# listeners=SASL_SSL://:9093,SSL://:9092,PLAINTEXT://:9092
# security.inter.broker.protocol=SASL_SSL
# sasl.enabled.mechanisms=GSSAPI,SCRAM-SHA-256,SCRAM-SHA-512
# sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# ssl.client.auth=required
# super.users=User:admin;User:kafka
# authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# allow.everyone.if.no.acl.found=false
# connections.max.idle.ms=600000

# 查看认证机制详情
cat$KAFKA_HOME/config/server.properties |grep-E"sasl.enabled.mechanisms|sasl.mechanism"

# 支持的SASL机制:
# GSSAPI - Kerberos认证
# PLAIN - 明文认证(不推荐生产环境)
# SCRAM-SHA-256 - 推荐
# SCRAM-SHA-512 - 推荐
# OAUTHBEARER - OAuth 2.0

# 查看JAAS配置(Kafka Server)
cat$KAFKA_HOME/config/kafka_server_jaas.conf

# 关键配置:
# KafkaServer {
#   org.apache.kafka.common.security.scram.ScramLoginModule required
#   username="admin"
#   password="admin-secret"
#   user_admin="admin-secret"
#   user_alice="alice-secret";
# };

# 查看Kerberos配置(如启用)
cat$KAFKA_HOME/config/kafka_server_jaas.conf |grep-A10"GSSAPI"
cat /etc/krb5.conf |grep-E"default_realm|kdc"

# 查看ZooKeeper认证配置
cat$KAFKA_HOME/config/server.properties |grep-E"zookeeper|zk"
# zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
# zookeeper.set.acl=true  # 启用ZooKeeper ACL

# 查看ZooKeeper JAAS配置
cat$KAFKA_HOME/config/kafka_server_jaas.conf |grep-A5"Client"

1.2 客户端认证配置

# 查看生产者JAAS配置
cat$KAFKA_HOME/config/producer_jaas.conf 2>/dev/null

# 查看消费者JAAS配置
cat$KAFKA_HOME/config/consumer_jaas.conf 2>/dev/null

# 查看客户端属性配置
cat$KAFKA_HOME/config/client.properties

# 关键配置:
# security.protocol=SASL_SSL
# sasl.mechanism=SCRAM-SHA-256
# sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="alice" password="alice-secret";

# 查看SSL客户端配置
cat$KAFKA_HOME/config/client-ssl.properties 2>/dev/null

# 查看控制台生产者安全配置
kafka-console-producer.sh --broker-list localhost:9093 --topictest\
--producer.config$KAFKA_HOME/config/client.properties 2>&1|head-5

# 查看控制台消费者安全配置
kafka-console-consumer.sh --bootstrap-server localhost:9093 --topictest\
--consumer.config$KAFKA_HOME/config/client.properties 2>&1|head-5

二、访问控制

2.1 ACL权限管理

控制项测评命令达标判据
Topic ACLkafka-acls.sh --list --topic配置Topic权限
Group ACLkafka-acls.sh --list --group配置消费者组权限
Cluster ACLkafka-acls.sh --list --cluster配置集群权限
事务ID ACLkafka-acls.sh --list --transactional-id配置事务权限
Delegation Tokenkafka-delegation-tokens.sh --list配置委托令牌

Kafka特有配置:

# 查看ACL授权器配置
cat$KAFKA_HOME/config/server.properties |grep"authorizer.class.name"

# 应配置为:
# authorizer.class.name=kafka.security.authorizer.AclAuthorizer

# 查看默认ACL策略
cat$KAFKA_HOME/config/server.properties |grep"allow.everyone.if.no.acl.found"
# 应设置为false(生产环境)

# 查看Topic ACL列表
kafka-acls.sh --bootstrap-server localhost:9093 --list--topic"*"\
  --command-config $KAFKA_HOME/config/client.properties 2>/dev/null

# 查看特定Topic ACL
kafka-acls.sh --bootstrap-server localhost:9093 --list--topic test-topic \
  --command-config $KAFKA_HOME/config/client.properties 2>/dev/null

# 查看消费者组ACL
kafka-acls.sh --bootstrap-server localhost:9093 --list--group"*"\
  --command-config $KAFKA_HOME/config/client.properties 2>/dev/null

# 查看集群级ACL
kafka-acls.sh --bootstrap-server localhost:9093 --list--cluster\
  --command-config $KAFKA_HOME/config/client.properties 2>/dev/null

# 查看事务ID ACL
kafka-acls.sh --bootstrap-server localhost:9093 --list --transactional-id "*"\
  --command-config $KAFKA_HOME/config/client.properties 2>/dev/null

# 查看委托令牌(Delegation Token)
kafka-delegation-tokens.sh --bootstrap-server localhost:9093 --list\
  --command-config $KAFKA_HOME/config/client.properties 2>/dev/null

# 查看主体权限(Principal)
kafka-acls.sh --bootstrap-server localhost:9093 --list--principal"User:alice"\
  --command-config $KAFKA_HOME/config/client.properties 2>/dev/null

# 查看IP白名单配置
cat$KAFKA_HOME/config/server.properties |grep-E"advertised.listeners|listeners"

2.2 资源权限详解

# Topic权限类型:
# --operation Create    - 创建Topic
# --operation Delete    - 删除Topic
# --operation Describe  - 查看元数据
# --operation DescribeConfigs - 查看配置
# --operation Alter     - 修改配置
# --operation AlterConfigs - 修改动态配置
# --operation Read      - 消费消息
# --operation Write     - 生产消息

# 查看Topic创建权限
kafka-acls.sh --bootstrap-server localhost:9093 --list--topic"*"--operation Create \
  --command-config $KAFKA_HOME/config/client.properties 2>/dev/null

# 查看Topic删除权限
kafka-acls.sh --bootstrap-server localhost:9093 --list--topic"*"--operation Delete \
  --command-config $KAFKA_HOME/config/client.properties 2>/dev/null

# 查看集群权限(创建Topic等管理操作)
kafka-acls.sh --bootstrap-server localhost:9093 --list--cluster--operation Create \
  --command-config $KAFKA_HOME/config/client.properties 2>/dev/null

# 查看Idempotent Producer权限(精确一次语义)
kafka-acls.sh --bootstrap-server localhost:9093 --list--cluster--operation IdempotentWrite \
  --command-config $KAFKA_HOME/config/client.properties 2>/dev/null

# 查看消费者组权限
kafka-acls.sh --bootstrap-server localhost:9093 --list--group"consumer-group-1"\
  --command-config $KAFKA_HOME/config/client.properties 2>/dev/null

# 查看生产者事务权限
kafka-acls.sh --bootstrap-server localhost:9093 --list --transactional-id "prod-1"\
  --command-config $KAFKA_HOME/config/client.properties 2>/dev/null

三、安全审计

3.1 日志与审计配置

控制项测评命令/配置达标判据
授权日志authorizer.class.name启用ACL日志
请求日志log4j.logger.kafka.request.logger记录请求
操作审计log4j.logger.kafka.authorizer.logger记录授权
日志保留log.retention.hours≥168(7天)
日志保护文件权限640 kafka

Kafka特有配置:

# 查看日志配置
cat$KAFKA_HOME/config/log4j.properties
cat$KAFKA_HOME/config/tools-log4j.properties

# 关键审计配置:
# log4j.logger.kafka.authorizer.logger=INFO, authorizerAppender
# log4j.logger.kafka.request.logger=WARN, requestAppender
# log4j.logger.kafka.network.Processor=INFO
# log4j.logger.kafka.server.KafkaApis=INFO

# 查看授权日志配置
cat$KAFKA_HOME/config/log4j.properties |grep-i"authorizer"

# 查看请求日志配置
cat$KAFKA_HOME/config/log4j.properties |grep-i"request"

# 查看日志目录
ls-la$KAFKA_HOME/logs/
ls-la /var/log/kafka/ 2>/dev/null
ls-la /tmp/kafka-logs/ 2>/dev/null

# 查看日志文件
ls-la$KAFKA_HOME/logs/server.log
ls-la$KAFKA_HOME/logs/controller.log
ls-la$KAFKA_HOME/logs/state-change.log
ls-la$KAFKA_HOME/logs/kafka-authorizer.log 2>/dev/null

# 查看授权日志内容
cat$KAFKA_HOME/logs/kafka-authorizer.log 2>/dev/null |tail-50
grep-i"acl\|authorized\|denied"$KAFKA_HOME/logs/server.log |tail-30

# 查看请求日志内容
cat$KAFKA_HOME/logs/kafka-request.log 2>/dev/null |tail-50

# 查看日志保留配置
cat$KAFKA_HOME/config/server.properties |grep-E"log.retention|log.segment"

# 关键配置:
# log.retention.hours=168  # 7天
# log.retention.bytes=1073741824  # 1GB
# log.segment.bytes=1073741824
# log.retention.check.interval.ms=300000

# 查看日志清理策略
cat$KAFKA_HOME/config/server.properties |grep"log.cleanup.policy"
# delete - 删除旧日志
# compact - 日志压缩

# 查看审计日志文件权限
ls-la$KAFKA_HOME/logs/*.log |head-5
stat-c'%a %U:%G'$KAFKA_HOME/logs/server.log

3.2 审计事件分析

# 查看授权失败事件
grep-i"Principal = Denied"$KAFKA_HOME/logs/server.log |tail-20
grep-i"not authorized"$KAFKA_HOME/logs/server.log |tail-20

# 查看认证失败事件
grep-i"authentication failed\|sasl\|login failed"$KAFKA_HOME/logs/server.log |tail-20

# 查看Topic创建/删除事件
grep-i"Creating topic\|Deleting topic"$KAFKA_HOME/logs/controller.log |tail-20

# 查看配置变更事件
grep-i"DynamicConfig\|Config changed"$KAFKA_HOME/logs/server.log |tail-20

# 查看分区重分配事件
grep-i"Partition reassign"$KAFKA_HOME/logs/server.log |tail-20

# 查看消费者组变更
grep-i"GroupMetadataManager\|consumer group"$KAFKA_HOME/logs/server.log |tail-20

# 使用kafka-acls.sh查看操作日志(需配置)
kafka-acls.sh --bootstrap-server localhost:9093 --list\
  --command-config $KAFKA_HOME/config/client.properties 2>&1|head-20

# 查看连接日志
grep-i"Accepted connection\|Connection from"$KAFKA_HOME/logs/server.log |tail-20

# 查看断开连接日志
grep-i"Connection disconnected\|Expired session"$KAFKA_HOME/logs/server.log |tail-20

四、入侵防范

4.1 系统加固

# 查看Kafka版本
cat$KAFKA_HOME/RELEASE 2>/dev/null
ls$KAFKA_HOME/libs/kafka_* |head-1

# 查看CVE漏洞(需外部扫描)
# 检查是否使用存在漏洞的版本(如CVE-2021-44228 Log4Shell)

# 查看Log4j版本(Log4Shell检查)
ls-la$KAFKA_HOME/libs/log4j-core-*.jar
# 应使用2.17.1+版本

# 查看ZooKeeper版本(依赖安全)
ls-la$KAFKA_HOME/libs/zookeeper-*.jar

# 查看Scala版本
ls-la$KAFKA_HOME/libs/scala-library-*.jar

# 查看网络线程配置
cat$KAFKA_HOME/config/server.properties |grep-E"num.network.threads|num.io.threads"

# 查看连接限制
cat$KAFKA_HOME/config/server.properties |grep-E"connections.max.idle|max.connections"

# 查看请求大小限制
cat$KAFKA_HOME/config/server.properties |grep-E"max.request.size|message.max.bytes"

# 关键配置:
# max.request.size=1048576  # 1MB
# message.max.bytes=1000012

# 查看重试和超时配置
cat$KAFKA_HOME/config/server.properties |grep-E"retry|timeout"

# 查看线程池配置
cat$KAFKA_HOME/config/server.properties |grep-E"num.threads|num.network|num.io"

# 查看内存配置
cat$KAFKA_HOME/config/server.properties |grep-E"heap|memory|buffer"
ps-ef|grep kafka |grep-oE"\-Xmx[0-9]+[mg]|\-Xms[0-9]+[mg]"

# 查看文件描述符限制
cat /proc/$(pgrep -f"kafka.Kafka")/limits |grep"Max open files"
ulimit-n

4.2 网络安全防护

# 查看监听配置
cat$KAFKA_HOME/config/server.properties |grep-E"listeners|advertised.listeners"

# 关键配置:
# listeners=SASL_SSL://:9093,CONTROLLER://:9094
# advertised.listeners=SASL_SSL://broker1.example.com:9093
# listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# 查看协议映射
cat$KAFKA_HOME/config/server.properties |grep"listener.security.protocol.map"

# 查看防火墙规则
iptables -L-n|grep-E"9092|9093|9094|2181"
firewall-cmd --list-all |grep-E"9092|9093|9094|2181"

# 查看Controller配置(KRaft模式,Kafka 3.0+)
cat$KAFKA_HOME/config/server.properties |grep-E"controller|kraft|node.id|process.roles"

# KRaft模式关键配置:
# process.roles=broker,controller
# node.id=1
# controller.quorum.voters=1@localhost:9093

# 查看ZooKeeper连接(传统模式)
cat$KAFKA_HOME/config/server.properties |grep"zookeeper.connect"

# 查看ZooKeeper安全
cat$KAFKA_HOME/config/server.properties |grep"zookeeper.set.acl"
# 应设置为true

# 查看Broker间安全
cat$KAFKA_HOME/config/server.properties |grep"security.inter.broker.protocol"
# 应设置为SASL_SSL或SSL

五、传输与数据安全

5.1 SSL/TLS加密

# 查看SSL配置
cat$KAFKA_HOME/config/server.properties |grep-E"ssl.|ssl.keystore|ssl.truststore"

# 关键配置:
# ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
# ssl.keystore.password=test1234
# ssl.key.password=test1234
# ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
# ssl.truststore.password=test1234
# ssl.client.auth=required
# ssl.enabled.protocols=TLSv1.2
# ssl.cipher.suites=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384

# 查看证书详情
keytool -list-v-keystore$KAFKA_HOME/config/kafka.server.keystore.jks -storepass test1234 2>/dev/null |head-20

# 查看证书有效期
keytool -list-v-keystore$KAFKA_HOME/config/kafka.server.keystore.jks 2>/dev/null |grep-E"Valid from|until"

# 查看协议版本(应禁用TLSv1.0/1.1)
cat$KAFKA_HOME/config/server.properties |grep"ssl.enabled.protocols"
# 应配置为:TLSv1.2,TLSv1.3

# 查看密码套件
cat$KAFKA_HOME/config/server.properties |grep"ssl.cipher.suites"
# 应禁用弱密码套件

# 查看客户端证书认证
cat$KAFKA_HOME/config/server.properties |grep"ssl.client.auth"
# 应设置为required

# 查看国密SSL配置(如使用国密版JDK)
cat$KAFKA_HOME/config/server.properties |grep-E"gmssl|sm2|sm3|sm4"

# 查看端点识别算法
cat$KAFKA_HOME/config/server.properties |grep"ssl.endpoint.identification.algorithm"
# 应设置为HTTPS

5.2 数据加密与压缩

# 查看数据加密配置(Kafka 3.0+支持KMS集成)
cat$KAFKA_HOME/config/server.properties |grep-E"encryption|kms|key"

# 查看日志压缩配置
cat$KAFKA_HOME/config/server.properties |grep-E"compression|compression.type"

# 查看消息格式版本
cat$KAFKA_HOME/config/server.properties |grep"log.message.format.version"

# 查看事务配置
cat$KAFKA_HOME/config/server.properties |grep-E"transaction|transactional"

# 关键配置:
# transaction.state.log.replication.factor=3
# transaction.state.log.min.isr=2
# transaction.state.log.num.partitions=50

# 查看幂等生产者配置
cat$KAFKA_HOME/config/server.properties |grep"enable.idempotence"

# 查看Exactly-Once语义配置
cat$KAFKA_HOME/config/server.properties |grep-E"transaction|isolation"

六、高可用与灾备

# 查看副本因子配置
cat$KAFKA_HOME/config/server.properties |grep"default.replication.factor"
cat$KAFKA_HOME/config/server.properties |grep"offsets.topic.replication.factor"
cat$KAFKA_HOME/config/server.properties |grep"transaction.state.log.replication.factor"

# 关键配置(生产环境):
# default.replication.factor=3
# offsets.topic.replication.factor=3
# transaction.state.log.replication.factor=3
# min.insync.replicas=2

# 查看最小同步副本
cat$KAFKA_HOME/config/server.properties |grep"min.insync.replicas"

# 查看Unclean Leader选举
cat$KAFKA_HOME/config/server.properties |grep"unclean.leader.election.enable"
# 应设置为false

# 查看Controller配置
cat$KAFKA_HOME/config/server.properties |grep-E"controller|zookeeper"

# 查看分区重分配配置
cat$KAFKA_HOME/config/server.properties |grep"leader.imbalance"

# 查看备份配置
ls-la /backup/kafka/ 2>/dev/null
ls-la$KAFKA_HOME/backup/ 2>/dev/null

# 查看Topic列表
kafka-topics.sh --bootstrap-server localhost:9093 --list\
  --command-config $KAFKA_HOME/config/client.properties 2>/dev/null

# 查看Topic详情(副本分布)
kafka-topics.sh --bootstrap-server localhost:9093 --describe--topictest\
  --command-config $KAFKA_HOME/config/client.properties 2>/dev/null

# 查看消费者组状态
kafka-consumer-groups.sh --bootstrap-server localhost:9093 --list\
  --command-config $KAFKA_HOME/config/client.properties 2>/dev/null

# 查看Broker状态
kafka-broker-api-versions.sh --bootstrap-server localhost:9093 \
  --command-config $KAFKA_HOME/config/client.properties 2>/dev/null |head-10

七、Kafka与大数据生态安全集成

7.1 ZooKeeper安全集成

# 查看ZooKeeper连接
cat$KAFKA_HOME/config/server.properties |grep"zookeeper.connect"

# 查看ZooKeeper ACL
cat$KAFKA_HOME/config/server.properties |grep"zookeeper.set.acl"
# 应设置为true

# 查看ZooKeeper JAAS配置
cat$KAFKA_HOME/config/kafka_server_jaas.conf |grep-A5"Client"

# 查看ZooKeeper SSL配置(ZooKeeper 3.5+)
cat$KAFKA_HOME/config/server.properties |grep-E"zookeeper.ssl|zookeeper.clientCnxnSocket"

7.2 Schema Registry安全

# 查看Schema Registry配置(如使用Confluent)
cat /etc/schema-registry/schema-registry.properties 2>/dev/null |grep-E"security|ssl|auth"

# 查看Schema Registry认证
cat /etc/schema-registry/schema-registry.properties 2>/dev/null |grep-E"authentication|kafkastore.security"

# 查看Avro序列化安全
cat$KAFKA_HOME/config/producer.properties |grep"schema.registry"

7.3 Kafka Connect安全

# 查看Connect Worker配置
cat$KAFKA_HOME/config/connect-distributed.properties |grep-E"security|ssl|sasl"

# 查看Connect连接器配置
cat$KAFKA_HOME/config/connect-standalone.properties |grep-E"security|ssl|sasl"

# 查看Connect密钥配置
cat$KAFKA_HOME/config/connect-distributed.properties |grep-E"config.providers|config.providers.file"

八、一键巡检脚本(Kafka)

#!/bin/bash
# Apache Kafka 等保三级一键巡检脚本
# 适用:Kafka 2.4.x - 3.6.x

KAFKA_HOME=${1:-/opt/kafka}
BOOTSTRAP_SERVER=${2:-localhost:9093}
CLIENT_CONFIG=${3:-$KAFKA_HOME/config/client.properties}

echo"===== Apache Kafka 等保三级巡检 ====="
echo"巡检时间:$(date)"
echo"主机名:$(hostname)"
echo"KAFKA_HOME: $KAFKA_HOME"
echo"BOOTSTRAP_SERVER: $BOOTSTRAP_SERVER"
echo""

if[!-d"$KAFKA_HOME"];then
echo"错误:未找到Kafka安装目录 $KAFKA_HOME"
exit1
fi

echo"===== 1 身份鉴别 ====="
echo"--- 版本信息 ---"
ls$KAFKA_HOME/libs/kafka_* 2>/dev/null |head-1|grep-oP'kafka_\K[0-9]+\.[0-9]+\.[0-9]+'

echo"--- 认证机制 ---"
grep-E"sasl.enabled.mechanisms|sasl.mechanism"${KAFKA_HOME}/config/server.properties 2>/dev/null |head-3

echo"--- 超级用户 ---"
grep"super.users"${KAFKA_HOME}/config/server.properties 2>/dev/null

echo"--- JAAS配置 ---"
cat${KAFKA_HOME}/config/kafka_server_jaas.conf 2>/dev/null |head-10||echo"未找到JAAS配置"

echo"--- 客户端认证 ---"
grep"ssl.client.auth"${KAFKA_HOME}/config/server.properties 2>/dev/null

echo""
echo"===== 2 访问控制 ====="
echo"--- 授权器配置 ---"
grep"authorizer.class.name"${KAFKA_HOME}/config/server.properties 2>/dev/null

echo"--- 默认ACL策略 ---"
grep"allow.everyone.if.no.acl.found"${KAFKA_HOME}/config/server.properties 2>/dev/null

echo"--- ACL列表(需认证)---"
kafka-acls.sh --bootstrap-server ${BOOTSTRAP_SERVER}--list\
  --command-config ${CLIENT_CONFIG}2>/dev/null |head-20||echo"无法获取ACL(需检查认证配置)"

echo"--- 超级用户权限 ---"
grep"super.users"${KAFKA_HOME}/config/server.properties 2>/dev/null

echo""
echo"===== 3 安全审计 ====="
echo"--- 授权日志配置 ---"
grep-i"authorizer"${KAFKA_HOME}/config/log4j.properties 2>/dev/null |head-3

echo"--- 日志目录 ---"
ls-la${KAFKA_HOME}/logs/ 2>/dev/null |head-5
ls-la /var/log/kafka/ 2>/dev/null |head-5

echo"--- 日志保留配置 ---"
grep-E"log.retention"${KAFKA_HOME}/config/server.properties 2>/dev/null |head-3

echo"--- 最近授权事件 ---"
grep-i"acl\|authorized\|denied"${KAFKA_HOME}/logs/server.log 2>/dev/null |tail-10||echo"无授权日志"

echo""
echo"===== 4 入侵防范 ====="
echo"--- Log4j版本检查 ---"
ls${KAFKA_HOME}/libs/log4j-core-*.jar 2>/dev/null |head-1
echo"[检查] 确认版本>=2.17.1(防Log4Shell)"

echo"--- 网络线程配置 ---"
grep-E"num.network.threads|num.io.threads"${KAFKA_HOME}/config/server.properties 2>/dev/null

echo"--- 请求大小限制 ---"
grep-E"max.request.size|message.max.bytes"${KAFKA_HOME}/config/server.properties 2>/dev/null

echo"--- 连接限制 ---"
grep-E"connections.max.idle|max.connections"${KAFKA_HOME}/config/server.properties 2>/dev/null

echo"--- 内存配置 ---"
ps-ef|grep kafka |grep-oE"\-Xmx[0-9]+[mg]|\-Xms[0-9]+[mg]"|head-1

echo""
echo"===== 5 传输安全 ====="
echo"--- SSL配置 ---"
grep-E"ssl.keystore|ssl.truststore|ssl.client.auth"${KAFKA_HOME}/config/server.properties 2>/dev/null |head-5

echo"--- SSL协议版本 ---"
grep"ssl.enabled.protocols"${KAFKA_HOME}/config/server.properties 2>/dev/null

echo"--- 密码套件 ---"
grep"ssl.cipher.suites"${KAFKA_HOME}/config/server.properties 2>/dev/null

echo"--- 证书文件 ---"
ls-la${KAFKA_HOME}/config/*.jks ${KAFKA_HOME}/config/*.p12 2>/dev/null |head-3

echo"--- 监听配置 ---"
grep-E"^listeners|advertised.listeners"${KAFKA_HOME}/config/server.properties 2>/dev/null

echo""
echo"===== 6 高可用与灾备 ====="
echo"--- 副本因子配置 ---"
grep-E"default.replication.factor|offsets.topic.replication.factor"${KAFKA_HOME}/config/server.properties 2>/dev/null

echo"--- 最小同步副本 ---"
grep"min.insync.replicas"${KAFKA_HOME}/config/server.properties 2>/dev/null

echo"--- Unclean Leader选举 ---"
grep"unclean.leader.election.enable"${KAFKA_HOME}/config/server.properties 2>/dev/null

echo"--- KRaft模式 ---"
grep-E"process.roles|node.id|controller.quorum.voters"${KAFKA_HOME}/config/server.properties 2>/dev/null |head-3

echo"--- ZooKeeper模式 ---"
grep"zookeeper.connect"${KAFKA_HOME}/config/server.properties 2>/dev/null |head-1

echo"--- ZooKeeper ACL ---"
grep"zookeeper.set.acl"${KAFKA_HOME}/config/server.properties 2>/dev/null

echo""
echo"===== 通用安全检查 ====="
echo"--- 进程运行用户 ---"
ps-ef|grep-E"kafka.Kafka|kafka-server"|grep-vgrep|awk'{print $1}'|sort|uniq-c

echo"--- 配置文件权限 ---"
ls-la${KAFKA_HOME}/config/server.properties ${KAFKA_HOME}/config/kafka_server_jaas.conf 2>/dev/null |awk'{print $1, $3, $4, $9}'

echo"--- 日志文件权限 ---"
ls-la${KAFKA_HOME}/logs/server.log 2>/dev/null |awk'{print $1, $3, $4, $9}'

echo"--- 端口监听 ---"
ss -tulnp|grep-E"9092|9093|9094|2181"2>/dev/null |head-5

echo"--- 环境变量检查 ---"
env|grep-E"KAFKA|PASSWORD|SECRET|KEY"|grep-v"HIST"|awk-F='{print $1}'|head-5
echo"[警告] 避免在环境变量中存储敏感信息"

echo""
echo"===== 巡检完成 ====="
echo"重点关注以下高风险项:"
echo"1. 未启用SASL认证(listeners=PLAINTEXT)"
echo"2. 未配置ACL授权器(authorizer.class.name为空)"
echo"3. 允许无ACL访问(allow.everyone.if.no.acl.found=true)"
echo"4. 未启用SSL/TLS加密(listeners=SASL_PLAINTEXT)"
echo"5. 客户端证书认证非强制(ssl.client.auth!=required)"
echo"6. 使用弱SSL协议(ssl.enabled.protocols包含TLSv1.0/1.1)"
echo"7. Log4j版本过低(<2.17.1,存在Log4Shell漏洞)"
echo"8. 副本因子过低(default.replication.factor<3)"
echo"9. 允许Unclean Leader选举(unclean.leader.election.enable=true)"
echo"10. ZooKeeper未启用ACL(zookeeper.set.acl=false)"

九、高风险项重点核查清单

检查项验证命令不合规判定整改建议
未启用SASL认证grep listeners server.properties包含PLAINTEXT启用SASL_SSL或SSL
未配置ACL授权器grep authorizer.class.name为空或SimpleAclAuthorizer配置AclAuthorizer
允许无ACL访问grep allow.everyone.if.no.acl.foundtrue或未配置设置为false
超级用户未配置grep super.users为空配置超级用户
未启用SSLgrep listeners无SASL_SSL或SSL启用SSL监听
客户端证书非强制grep ssl.client.auth不为required设置为required
弱SSL协议grep ssl.enabled.protocols包含TLSv1.0/1.1仅启用TLSv1.2+
Log4j版本过低ls log4j-core-*.jar<2.17.1升级Log4j
副本因子过低grep default.replication.factor<3设置为≥3
ZooKeeper无ACLgrep zookeeper.set.aclfalse或未配置设置为true

十、Kafka与RabbitMQ/RocketMQ对比

对比项Apache KafkaRabbitMQRocketMQ
架构分布式日志消息代理分布式消息
吞吐量极高
延迟毫秒级微秒级毫秒级
安全认证SASL/SSLSASL/SSLAK/SK/SSL
ACL支持完善完善完善
事务消息支持支持支持
消息追踪有限完善完善
云原生优秀良好良好
等保合规
国密支持需配置需配置需配置

十一、等保测评执行要点

1. 关键安全配置检查

# server.properties 生产环境推荐配置

# 监听配置(仅启用安全协议)
listeners=SASL_SSL://:9093
advertised.listeners=SASL_SSL://broker1.example.com:9093
listener.security.protocol.map=SASL_SSL:SASL_SSL
security.inter.broker.protocol=SASL_SSL

# SASL认证
sasl.enabled.mechanisms=SCRAM-SHA-256,SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256

# SSL配置
ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
ssl.keystore.password=${file:/var/private/ssl/keystore-password:keystore_password}
ssl.key.password=${file:/var/private/ssl/key-password:key_password}
ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
ssl.truststore.password=${file:/var/private/ssl/truststore-password:truststore_password}
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.3
ssl.endpoint.identification.algorithm=HTTPS

# ACL授权
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false
super.users=User:admin;User:kafka

# 高可用配置
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

# ZooKeeper安全
zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
zookeeper.set.acl=true

2. JAAS配置示例

// kafka_server_jaas.conf
KafkaServer{
org.apache.kafka.common.security.scram.ScramLoginModule required
  username="admin"
  password="admin-secret"
  user_admin="admin-secret"
  user_alice="alice-secret"
  user_bob="bob-secret";
};

Client{
org.apache.kafka.common.security.plain.PlainLoginModule required
  username="kafka"
  password="zookeeper-secret";
};

3. 现场访谈要点

  • 是否启用SASL/SSL认证(生产环境必须)
  • 是否配置ACL并禁用默认允许策略
  • 是否配置超级用户进行紧急管理
  • 是否启用SSL客户端证书认证
  • 是否配置审计日志并定期分析
  • 是否启用ZooKeeper ACL保护元数据
  • 是否配置副本因子≥3和min.insync.replicas≥2
  • 是否禁用Unclean Leader选举
  • 是否升级到无漏洞的Log4j版本

4. 版本差异

功能项Kafka 2.xKafka 3.0Kafka 3.5+
KRaft模式预览支持推荐
ZK依赖必需可选可选
安全增强基础增强完善
云原生良好优秀优秀
等保合规基础增强完善

参考标准:GB/T 22239-2019、GB/T 28448-2019、Apache Kafka Security Documentation

适用版本:Kafka 2.4.x / 2.5.x / 2.6.x / 2.7.x / 2.8.x / 3.0.x / 3.1.x / 3.2.x / 3.3.x / 3.4.x / 3.5.x / 3.6.x

验证环境:x86_64 / ARM64 / 国产化芯片(飞腾/鲲鹏/龙芯/海光/兆芯/申威)

声明:来自汪汪虚拟空间,仅代表创作者观点。链接:https://eyangzhen.com/7391.html

汪汪虚拟空间的头像汪汪虚拟空间

相关推荐

添加微信
添加微信
Ai学习群
返回顶部