kafka开启鉴权验证

/ 工具和中间件 / 2 条评论 / 4280浏览

kafka鉴权验证

开启验证以及一些配置文件

#安全认证监控服务
advertised.listeners=SASL_PLAINTEXT://121.37.175.249:8318
security.inter.broker.protocol=SASL_PLAINTEXT  
sasl.enabled.mechanisms=SCRAM-SHA-512
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="admin"
    password="123123"
    user_admin="123123"
    user_producer="proD#pW2120";
};

Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
    username="kafka"
    password="kafka#pW2120";
};
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="123123";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test123";

用户及权限操作相关

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9002 --topic zto-data1 --producer.config /data/app/kafka/test.conf
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9002  --topic zto-data1 --consumer.config /data/app/kafka/test.conf
bin/kafka-configs.sh  --zookeeper 127.0.0.1:2181 --alter --add-config 'SCRAM-SHA-512=[password=test123]' --entity-type users --entity-name test
bin/kafka-configs.sh --zookeeper 127.0.0.1:2181 --alter --add-config 'SCRAM-SHA-512=[password=123456]' --entity-type users --entity-name test
bin/kafka-topics.sh --create  --bootstrap-server  127.0.0.1:9002 --replication-factor 1 --partitions 12 --topic first --command-config /data/app/kafka/config/admin.conf
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:"test" --consumer --topic 'first' --group '*'
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=127.0.0.1:2181 --add --allow-principal User:"test" --producer --topic 'first' --group '*'
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=127.0.0.1:2181 --list
bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9002 --list --command-config /data/app/kafka/config/admin.conf
./bin/kafka-consumer-groups.sh --describe --bootstrap-server 127.0.0.1:9002 --group bigdata1  --command-config /data/app/kafka/config/admin.conf

Java的生产者和消费者

public class MyProducer2 {

    public static void main(String[] args) {
        //1.创建kafka生产者的配置信息
        Properties props = new Properties();
        //连接地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.129:9092");
        //应答级别
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        //重试次数
        props.put("retries", 1);
        //批次大小
        props.put("batch.size", 16384);
        //等待时间(毫秒)
        props.put("linger.ms", 1);
        //RecordAccumulator 缓冲区大小(32m)
        props.put("buffer.memory", 33554432);
        //key、value所使用的序列化器
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //鉴权、验证
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-512");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test\" password=\"test123\";");

        //2.创建生产者对象
        KafkaProducer producer = new KafkaProducer<String, String>(props);

        //3.发送数据
        producer.send(new ProducerRecord<String, String>("first", "hello: test"));

        //4.关闭连接(必须要关闭 否则消息不会发送出去)
        producer.close();
    }
}
public class MyConsumer2 {
    public static void main(String[] args) {
        //1.创建kafka生产者的配置信息
        Properties props = new Properties();
        //主机地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.129:9092");
        //自动提交开关
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //自动提交延时
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //key、value所使用的序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "bigdata1");
        //重置消费者的offset(配置earliest + 切换消费者组 或 数据过期 = 可消费生产者最初的未消费数据)
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        //鉴权、验证
        props.put("security.protocol", "SASL_PLAINTEXT");
        props.put("sasl.mechanism", "SCRAM-SHA-512");
        props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"test\" password=\"test123\";");

        //2.创建消费者
        KafkaConsumer consumer = new KafkaConsumer<String, String>(props);

        //3.订阅主题
        consumer.subscribe(Arrays.asList("first"));

        while (true){
            //4.获取数据
            ConsumerRecords<String,String> records = consumer.poll(1);
            //5.遍历数据
            for(ConsumerRecord<String, String> consumerRecord : records){
                System.out.println(" 拉取数据--- " + consumerRecord.value());;
            }
            //同步提交,当前线程会阻塞直到 offset 提交成功
            consumer.commitSync();
        }
    }
}

Offset Explorer可视化工具

org.apache.kafka.common.security.scram.ScramLoginModule required username="test" password="test123";