kafka

/ 工具和中间件 / 2 条评论 / 2124浏览

1.kafka相关

Kafka 是一个分布式的基于发布/订阅模式的消息队列
异步、解耦、削峰

安装

unzip kafka_2.11-2.1.0.zip 
broker.id=0  #id要唯一
delete.topic.enable=true  #允许删除主题
log.dirs=/data/kafka/logs   #运行日志地址
zookeeper.connect=192.168.153.128:2181   #zookeeper集群地址用逗号隔开(注意:zookeeper也要配置好集群模式)
dataDir=/data/kafka/zookeeper/data  #运行日志地址
clientPort=2181     #端口
maxClientCnxns=0    

2.kafka命令

jps
./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 
./bin/kafka-server-start.sh -daemon config/server.properties 
partitions:分区数
replication-factor:副本数

bin/kafka-topics.sh --create --zookeeper 192.168.31.129:2181 --topic first --partitions 1 --replication-factor 3
bin/kafka-topics.sh --delete --zookeeper 192.168.31.129:2181 --topic first
bin/kafka-topics.sh --list --zookeeper 192.168.31.129:2181 
bin/kafka-topics.sh --describe --zookeeper 192.168.31.129:2181 --topic first
bin/kafka-console-producer.sh --topic first --broker-list 192.168.31.129:9092
bin/kafka-console-consumer.sh --topic first --bootstrap-server 192.168.31.129:9092

3.1Kafka生产者

文件存储机制

分区架构

数据可靠性保证(ISR)

ack应答机制(acks参数配置)

HW和LEO

Exactly Once

  1. At Least Once:至少一次(保证数据不丢失,不保证数据重复)
  2. At Most Once:至多一次(保证数据不重复,不保证数据不丢失)
  3. At Least Once + 幂等性 = Exactly Once
  4. 幂等性:enable.idompotence=true(设置幂等性)
    • 在初始化的时候会被分配一个PID,发往同一Partition的消息会附带SequenceNumber。而Broker端会对<PID,Partition,SeqNumber>做缓存,当具有相同主键的消息提交时,Broker只会持久化一条。
    • 但是PID重启就会变化,同时不同的Partition也具有不同主键,所以幂等性无法保证跨分区跨会话的ExactlyOnce。

3.2Kafka消费者

分区分配策略(消费者数量改变就会触发)

  1. RoundRobin:轮询(根据组来划分)

  2. Range:范围(根据主题来划分)

4.Kafka开发api

Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。

1.发送消息到服务器

    public static void main(String[] args) {

        //1.创建kafka生产者的配置信息
        Properties props = new Properties();
        //连接地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.129:9092");
        //应答级别
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        //重试次数
        props.put("retries", 1);
        //批次大小
        props.put("batch.size", 16384);
        //等待时间(毫秒)
        props.put("linger.ms", 1);
        //RecordAccumulator 缓冲区大小(32m)
        props.put("buffer.memory", 33554432);
        //key、value所使用的序列化器
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //2.创建生产者对象
        KafkaProducer producer = new KafkaProducer<String, String>(props);

        //3.发送数据
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first","hello: " + i));
        }

        //4.关闭连接(必须要关闭 否则消息不会发送出去)
        producer.close();
    }

2.带回调的生产者API

    public static void main(String[] args) {
        //1.创建kafka生产者的配置信息
        Properties props = new Properties();
        //连接地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.129:9092");
        //key、value所使用的序列化器
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //2.创建生产者对象
        KafkaProducer producer = new KafkaProducer<String, String>(props);

        //3.发送数据
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first", 0, "at", "hello: " + i), (metadata, e) -> {
                System.out.println(metadata.partition() + "----" + metadata.offset());
                System.out.println(metadata.topic() + "----" + metadata.toString());
            });
        }

        //4.关闭连接(必须要关闭 否则消息不会发送出去)
        producer.close();
    }

3.消费者demo

    public static void main(String[] args) {
        //1.创建kafka生产者的配置信息
        Properties props = new Properties();
        //主机地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.129:9092");
        //自动提交开关
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //自动提交延时
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //key、value所使用的序列化器
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //消费者组
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "bigdata1");
        //重置消费者的offset(配置earliest + 切换消费者组 或 数据过期 = 可消费生产者最初的未消费数据)
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        //2.创建消费者
        KafkaConsumer consumer = new KafkaConsumer<String, String>(props);

        //3.订阅主题
        consumer.subscribe(Arrays.asList("first"));

        while (true){
            //4.获取数据
            ConsumerRecords<String,String> records = consumer.poll(100);

            //5.遍历数据
            for(ConsumerRecord<String, String> consumerRecord : records){
                System.out.println(consumerRecord.key() + " --- " + consumerRecord.value());;
                System.out.println(consumerRecord.partition() + " --- " + consumerRecord.offset());;
            }
            //同步提交,当前线程会阻塞直到 offset 提交成功
            consumer.commitSync();
        }
    }

消费者自动提交与手动提交(同步/异步)

    //自动提交开关
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    //关闭自动提交开关
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    
    //同步提交,当前线程会阻塞直到 offset 提交成功
    consumer.commitSync();
    
    //异步提交
    consumer.commitAsync(new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition,
                                OffsetAndMetadata> offsets, Exception exception) {
            if (exception != null) {
                System.err.println("Commit failed for" +
                        offsets);
            }
        }
    });

自定义分区器

/**
 * 自定义生产者分区
 * @author langao_q
 * @since 2021-07-20 15:39
 */
public class MyPartitioner implements Partitioner{
    /**
     * 自定义分区器:返回分区号
     * @param topic
     * @param key
     * @param keyBytes
     * @param value
     * @param valueBytes
     * @param cluster
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //获取所有可用分区
        Integer integer = cluster.partitionCountForTopic(topic);
        //增加逻辑处理....
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
        props.put("partitioner.class", "io.imwj.kafka.partitioner.MyPartitioner");

自定义拦截器(org.apache.kafka.clients.producer.ProducerInterceptor)

public class CountIntercept implements ProducerInterceptor<String, String> {

    Integer successCount = 0;
    Integer errorCount = 0;

    @Override
    public void configure(Map<String, ?> configs) {

    }

    /**
     * 增加时间戳
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        //1.取出value数据
        String value = record.value();
        //2.增加时间戳
        value = System.currentTimeMillis() + "," + value;
        //3.创建一个新的对象返回
        return new ProducerRecord<String, String>(record.topic(), record.partition(), record.timestamp(), record.key(), value);
    }

    /**
     * 统计成功和失败条数
     * @param metadata
     * @param exception
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if(metadata != null){
            successCount ++;
        }else{
            errorCount ++;
        }
    }

    @Override
    public void close() {
        System.out.println("successCount:" + successCount);
        System.out.println("errorCount:" + errorCount);
    }
}
    //1.创建kafka生产者的配置信息
    Properties props = new Properties();
    
    //2.配置拦截器拦截器
    ArrayList<String> interceptorList = new ArrayList<>();
    interceptorList.add("io.imwj.kafka.intercept.CountIntercept");
    props.put("interceptor.classes", interceptorList);

Kafka监控(Eagle)

    export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m 
    -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -
    XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
    export JMX_PORT="9999"
cluster1.zk.list=127.0.0.1:2181
cluster1.kafka.eagle.offset.storage=kafka
kafka.eagle.metrics.charts=true

#数据库配置
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&ch
aracterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
export KE_HOME=/data/kafka/eagle
export PATH=$PATH:$KE_HOME/bin
bin/ke.sh start

kafka应用

kafka与springboot可以参考

记一次Java远程连接kafka生产者的问题

18:55:46.237 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error connecting to node iZuf688uiv7i1onjv82rf8Z:8318 (id: 10 rack: null)
java.net.UnknownHostException: iZuf688uiv7i1onjv82rf8Z
	at java.net.InetAddress.getAllByName0(InetAddress.java:1280)
	at java.net.InetAddress.getAllByName(InetAddress.java:1192)
	at java.net.InetAddress.getAllByName(InetAddress.java:1126)
服务器ip iZuf688uiv7i1onjv82rf8Z
listeners=PLAINTEXT://:8318
advertised.listeners=PLAINTEXT://[主机的外网ip]:对外端口

查看kafka当前消费情况

bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:8318 --list
./bin/kafka-consumer-groups.sh --describe --bootstrap-server 127.0.0.1:8318 --group [分组名称]