1. 生产者

1.1 消息发送

1.1.1 数据生产流程解析

image.png

  1. Producer创建时,会创建一个Sender线程并设置为守护线程。
  2. 生产消息时,内部其实是异步流程;生产的消息先经过拦截器()->序列化器->分区器,然后将消息缓存在缓冲区(该缓冲区也是在Producer创建时创建)。
  3. 批次发送的条件为:缓冲区数据大小达到batch.size或者linger.ms达到上限,哪个先达到就算哪个。
  4. 批次发送后,发往指定分区,然后落盘到broker;如果生产者配置了retrires参数大于0并且失败原因允许重试,那么客户端内部会对该消息进行重试。
  5. 落盘到broker成功,返回生产元数据给生产者。
  6. 元数据返回有两种方式:一种是通过阻塞直接返回,另一种是通过回调返回。

1.1.2 必要参数配置

  1. 配置条目的使用方式:
    image.png
  2. 配置参数:
    image.png

1.1.3 序列化器

image.png

由于Kafka中的数据都是字节数组,在将消息发送到Kafka之前需要先将数据序列化为字节数组。
序列化器的作用就是用于序列化要发送的消息的。

Kafka使用 org.apache.kafka.common.serialization.Serializer 接口用于定义序列化器,将泛型指定类型的数据转换为字节数组。

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.common.serialization;

import org.apache.kafka.common.header.Headers;

import java.io.Closeable;
import java.util.Map;

/**
 * An interface for converting objects to bytes.
 *
 * A class that implements this interface is expected to have a constructor with no parameter.
 * <p>
 * Implement {@link org.apache.kafka.common.ClusterResourceListener} to receive cluster metadata once it's available. Please see the class documentation for ClusterResourceListener for more information.
 *
 * @param <T> Type to be serialized from.
 * 将对象转换成byte数组
 */
public interface Serializer<T> extends Closeable {

    /**
     * Configure this class.
     * @param configs configs in key/value pairs
     * @param isKey whether is for key or value
     */
    default void configure(Map<String, ?> configs, boolean isKey) {
        // intentionally left blank
    }

    /**
     * Convert {@code data} into a byte array.
     *
     * @param topic topic associated with data
     * @param data typed data
     * @return serialized bytes
     */
    byte[] serialize(String topic, T data);

    /**
     * Convert {@code data} into a byte array.
     *
     * @param topic topic associated with data
     * @param headers headers associated with the record
     * @param data typed data
     * @return serialized bytes
     */
    default byte[] serialize(String topic, Headers headers, T data) {
        return serialize(topic, data);
    }

    /**
     * Close this serializer.
     * <p>
     * This method must be idempotent as it may be called multiple times.
     *关闭序列化器 此方法需要是幂等的 可能会被调用多次
     */
    @Override
    default void close() {
        // intentionally left blank
    }
}

系统提供了该接口的子接口以及实现类:
org.apache.kafka.common.serialization.ByteArraySerializer
image.png
org.apache.kafka.common.serialization.ByteBufferSerializer
image.png
org.apache.kafka.common.serialization.BytesSerializer
image.png
org.apache.kafka.common.serialization.DoubleSerializer
image.png
org.apache.kafka.common.serialization.FloatSerializer
image.png
org.apache.kafka.common.serialization.IntegerSerializer
image.png
org.apache.kafka.common.serialization.StringSerializer
image.png
org.apache.kafka.common.serialization.LongSerializer
image.png
org.apache.kafka.common.serialization.ShortSerializer
image.png

自定义序列化器
数据的序列化一般生产中使用avro。
自定义序列化器需要实现org.apache.kafka.common.serialization.Serializer接口,并实现其中的 serialize 方法。
示例:

package com.lagou.kafka.demo.entity;

/**
 * 用户自定义的封装消息的实体类
 */
public class User {
    private Integer userId;
    private String username;

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }
}

序列化类:
package com.lagou.kafka.demo.serialization;

import com.lagou.kafka.demo.entity.User;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class UserSerializer implements Serializer<User> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // do nothing
        // 用于接收对序列化器的配置参数,并对当前序列化器进行配置和初始化的
    }

    @Override
    public byte[] serialize(String topic, User data) {
        try {
            if (data == null) {
                return null;
            } else {
                final Integer userId = data.getUserId();
                final String username = data.getUsername();

                if (userId != null) {
                    if (username != null) {
                        final byte[] bytes = username.getBytes("UTF-8");
                        int length = bytes.length;
                        // 第一个4个字节用于存储userId的值
                        // 第二个4个字节用于存储username字节数组的长度int值
                        // 第三个长度,用于存放username序列化之后的字节数组
                        ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
                        // 设置userId
                        buffer.putInt(userId);
                        // 设置username字节数组长度
                        buffer.putInt(length);
                        // 设置username字节数组
                        buffer.put(bytes);
                        // 以字节数组形式返回user对象的值
                        return buffer.array();
                    }
                }
            }
        } catch (Exception e) {
            throw new SerializationException("数据序列化失败");
        }
        return null;
    }

    @Override
    public void close() {
        // do nothing
        // 用于关闭资源等操作。需要幂等,即多次调用,效果是一样的。
    }
}

生产者:
package com.lagou.kafka.demo.producer;

import com.lagou.kafka.demo.entity.User;
import com.lagou.kafka.demo.serialization.UserSerializer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;

public class MyProducer {
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // 设置自定义的序列化器
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class);

        KafkaProducer<String, User> producer = new KafkaProducer<String, User>(configs);

        User user = new User();
//        user.setUserId(1001);
//        user.setUsername("张三");
//        user.setUsername("李四");
//        user.setUsername("王五");
        user.setUserId(400);
        user.setUsername("赵四");

        ProducerRecord<String, User> record = new ProducerRecord<String, User>(
                "tp_user_01",   // topic
                user.getUsername(),   // key
                user                  // value
        );


        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception != null) {
                    System.out.println("消息发送异常");
                } else {
                    System.out.println("主题:" + metadata.topic() + "\t"
                    + "分区:" + metadata.partition() + "\t"
                    + "生产者偏移量:" + metadata.offset());
                }
            }
        });

        // 关闭生产者
        producer.close();

    }
}

1.1.4 分区器

image.png

默认(DefaultPartitioner)分区计算:

  1. 如果record提供了分区号,则使用record提供的分区号 。
  2. 如果record没有提供分区号,则使用key的序列化后的值的hash值对分区数量取模 。
  3. 如果record没有提供分区号,也没有提供key,则使用轮询的方式分配分区号。
  4. 会首先在可用的分区中分配分区号 。
  5. 如果没有可用的分区,则在该主题所有分区中分配分区号。

image.png
image.png
如果要自定义分区器,则需要

  1. 首先开发Partitioner接口的实现类 。
  2. 在KafkaProducer中进行设置:configs.put(“partitioner.class”, “xxx.xx.Xxx.class”)。

位于 org.apache.kafka.clients.producer 中的分区器接口:

image.png

包 org.apache.kafka.clients.producer.internals 中分区器的默认实现:

image.png

image.png

image.png

可以实现Partitioner接口自定义分区器:
image.png

然后在生产者中配置:
image.png

1.1.5 拦截器

image.png

Producer拦截器(interceptor)和Consumer端Interceptor是在Kafka 0.10版本被引入的,主要用于实现Client端的定制化控制逻辑。
对于Producer而言,Interceptor使得用户在消息发送前以及Producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,Producer允许用户指定多个Interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。 例如 拦截器顺序为1-2-3-4 回调时顺序仍然为1-2-3-4 此处与web开发中拦截器链调用顺序不同。
Intercetpor的实现接口是 org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

  • onSend(ProducerRecord):该方法封装进KafkaProducer.send方法中,即运行在用户主线程中。Producer确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。
  • onAcknowledgement(RecordMetadata, Exception):该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在Producer回调逻辑触发之前。onAcknowledgement运行在Producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢Producer的消息发送效率。
  • close:关闭Interceptor,主要用于执行一些资源清理工作。

如前所述,Interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。 另外倘若指定了多个Interceptor,则Producer将按照指定顺序调用它们,并仅仅是捕获每个 Interceptor可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

自定义拦截器:

  1. 实现ProducerInterceptor接口
  2. 在KafkaProducer的设置中设置自定义的拦截器
    image.png
    案例:
    自定义拦截器1:
    package com.lagou.kafka.demo.interceptor;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.header.Headers;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Map;
    
    public class InterceptorOne implements ProducerInterceptor<Integer, String> {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorOne.class);
    
        @Override
        public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
            System.out.println("拦截器1 -- go");
    
    
            // 消息发送的时候,经过拦截器,调用该方法
    
            // 要发送的消息内容
            final String topic = record.topic();
            final Integer partition = record.partition();
            final Integer key = record.key();
            final String value = record.value();
            final Long timestamp = record.timestamp();
            final Headers headers = record.headers();
    
    
            // 拦截器拦下来之后根据原来消息创建的新的消息
            // 此处对原消息没有做任何改动
            ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
                    topic,
                    partition,
                    timestamp,
                    key,
                    value,
                    headers
            );
            // 传递新的消息
            return newRecord;
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            System.out.println("拦截器1 -- back");
            // 消息确认或异常的时候,调用该方法,该方法中不应实现较重的任务
            // 会影响kafka生产者的性能。
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
            final Object classContent = configs.get("classContent");
            System.out.println(classContent);
        }
    }
    
    自定义拦截器2:
    package com.lagou.kafka.demo.interceptor;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.header.Headers;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Map;
    
    public class InterceptorTwo implements ProducerInterceptor<Integer, String> {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);
    
        @Override
        public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
            System.out.println("拦截器2 -- go");
    
    
            // 消息发送的时候,经过拦截器,调用该方法
    
            // 要发送的消息内容
            final String topic = record.topic();
            final Integer partition = record.partition();
            final Integer key = record.key();
            final String value = record.value();
            final Long timestamp = record.timestamp();
            final Headers headers = record.headers();
    
    
            // 拦截器拦下来之后根据原来消息创建的新的消息
            // 此处对原消息没有做任何改动
            ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
                    topic,
                    partition,
                    timestamp,
                    key,
                    value,
                    headers
            );
            // 传递新的消息
            return newRecord;
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            System.out.println("拦截器2 -- back");
            // 消息确认或异常的时候,调用该方法,该方法中不应实现较重的任务
            // 会影响kafka生产者的性能。
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
            final Object classContent = configs.get("classContent");
            System.out.println(classContent);
        }
    }
    
    自定义拦截器3:
    package com.lagou.kafka.demo.interceptor;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.header.Headers;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Map;
    
    public class InterceptorThree implements ProducerInterceptor<Integer, String> {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorThree.class);
    
        @Override
        public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
            System.out.println("拦截器3 -- go");
    
    
            // 消息发送的时候,经过拦截器,调用该方法
    
            // 要发送的消息内容
            final String topic = record.topic();
            final Integer partition = record.partition();
            final Integer key = record.key();
            final String value = record.value();
            final Long timestamp = record.timestamp();
            final Headers headers = record.headers();
    
    
            // 拦截器拦下来之后根据原来消息创建的新的消息
            // 此处对原消息没有做任何改动
            ProducerRecord<Integer, String> newRecord = new ProducerRecord<Integer, String>(
                    topic,
                    partition,
                    timestamp,
                    key,
                    value,
                    headers
            );
            // 传递新的消息
            return newRecord;
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            System.out.println("拦截器3 -- back");
            // 消息确认或异常的时候,调用该方法,该方法中不应实现较重的任务
            // 会影响kafka生产者的性能。
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
            final Object classContent = configs.get("classContent");
            System.out.println(classContent);
        }
    }
    
    生产者:
    package com.lagou.kafka.demo.producer;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.IntegerSerializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class MyProducer {
        public static void main(String[] args) {
    
            Map<String, Object> configs = new HashMap<>();
            configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
            configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    
            // 保证等待确认的消息只有设置的这几个。如果设置为1,则只有一个请求在等待响应
            // 此时可以保证发送消息即使在重试的情况下也是有序的。
            configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    //        configs.put("max.in.flight.requests.per.connection", 1);
    
    //        interceptor.classes
            // 如果有多个拦截器,则设置为多个拦截器类的全限定类名,中间用逗号隔开
            configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "com.lagou.kafka.demo.interceptor.InterceptorOne," +
                    "com.lagou.kafka.demo.interceptor.InterceptorTwo," +
                    "com.lagou.kafka.demo.interceptor.InterceptorThree");
    
    
            configs.put("classContent", "this is lagou's kafka class");
    
            KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
    
            ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
                    "tp_inter_01",
                    0,
                    1001,
                    "this is lagou's 1001 message"
            );
    
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println(metadata.offset());
                    }
                }
            });
    
            // 关闭生产者
            producer.close();
    
    
        }
    }
    
    运行结果:
    image.png

1.2 原理剖析

image.png

由上图可以看出:KafkaProducer有两个基本线程:

  • 主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器 RecoderAccumulator中;
    • 消息收集器RecoderAccumulator为每个分区都维护了一个Deque 类型的双端队列。
    • ProducerBatch 可以理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐 量,降低网络影响;
    • 由于生产者客户端使用 java.io.ByteBuffer 在发送消息之前进行消息保存,并维护了 一个 BufferPool 实现 ByteBuffer 的复用;该缓存池只针对特定大小( batch.size 指定)的 ByteBuffer进行管理,对于消息过大的缓存,不能做到重复利用。
    • 每次追加一条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取一 个ProducerBatch,判断当前消息的大小是否可以写入该批次中。若可以写入则写 入;若不可以写入,则新建一个ProducerBatch,判断该消息大小是否超过客户端参 数配置 batch.size 的值,不超过,则以 batch.size建立新的ProducerBatch,这样方便进行缓存重复利用;若超过,则以计算的消息大小建立对应的 ProducerBatch , 缺点就是该内存不能被复用了。
  • Sender线程:
    • 该线程从消息收集器获取缓存的消息,将其处理为<Node, List<ProducerBatch>的形式, Node 表示集群的broker节点。
    • 进一步将<Node, List<ProducerBatch>转化为形式,此时才可以向服务端发送数据。
    • 在发送之前,Sender线程将消息以 Map<NodeId, Deque<Request>>的形式保存到 InFlightRequests 中进行缓存,可以通过其获取 leastLoadedNode ,即当前Node中 负载压力最小的一个,以实现消息的尽快发出。

1.3 生产者参数配置补充

  1. 参数设置方式:
    image.png
    image.png
  2. 补充参数:
    image.png
    image.png
    image.png
    image.png

2. 消费者

2.1 概念入门

2.1.1 消费者、消费组

   消费者从订阅的主题消费消息,消费消息的偏移量保存在Kafka的名字是 __consumer_offsets 的主题中。 
  消费者还可以将自己的偏移量存储到Zookeeper,需要设置offset.storage=zookeeper。 

推荐使用Kafka存储消费者的偏移量。因为Zookeeper不适合高并发。

 多个从同一个主题消费的消费者可以加入到一个消费组中。 
 消费组中的消费者共享group_id。 
 configs.put("group.id", "xxx"); 
group_id一般设置为应用的逻辑名称。比如多个订单处理程序组成一个消费组,可以设置group_id为"order_process"。 
group_id通过消费者的配置指定: group.id=xxxxx 
消费组均衡地给消费者分配分区,每个分区只由消费组中一个消费者消费。

image.png
一个拥有四个分区的主题,包含一个消费者的消费组。

此时,消费组中的消费者消费主题中的所有分区。并且没有重复的可能。
如果在消费组中添加一个消费者2,则每个消费者分别从两个分区接收消息。
image.png
如果消费组有四个消费者,则每个消费者可以分配到一个分区。
image.png
如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置,不会接收任何消息。
image.png
向消费组添加消费者是横向扩展消费能力的主要方式。
必要时,需要为主题创建大量分区,在负载增长时可以加入更多的消费者。但是不要让消费者的数量超过主题分区的数量。

image.png

除了通过增加消费者来横向扩展单个应用的消费能力之外,经常出现多个应用程序从同一个主题消费的情况。
此时,每个应用都可以获取到所有的消息。只要保证每个应用都有自己的消费组,就可以让它们获取到主题所有的消息。
横向扩展消费者和消费组不会对性能造成负面影响。
为每个需要获取一个或多个主题全部消息的应用创建一个消费组,然后向消费组添加消费者来横向扩展消费能力和应用的处理能力,则每个消费者只处理一部分消息。

2.1.2 心跳机制

image.png
消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区。
image.png

由于broker宕机,主题X的分区3宕机,此时分区3没有Leader副本,触发再平衡,消费者4没有对应的主题分区,则消费者4闲置。
image.png
Kafka 的心跳是 Kafka Consumer 和 Broker 之间的健康检查,只有当 Broker Coordinator 正常时,Consumer 才会发送心跳。
Consumer 和 Rebalance 相关的 2 个配置参数:
image.png
broker 端,sessionTimeoutMs 参数
broker 处理心跳的逻辑在 GroupCoordinator 类中:如果心跳超期, broker coordinator 会把消费者从 group 中移除,并触发 rebalance。

image.png
image.png
consumer 端:sessionTimeoutMs,rebalanceTimeoutMs 参数
如果客户端发现心跳超期,客户端会标记 coordinator 为不可用,并阻塞心跳线程;如果超过了poll 消息的间隔超过了 rebalanceTimeoutMs,则 consumer 告知 broker 主动离开消费组,也会触发rebalance
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread
image.png
image.png

2.2 消息接收

2.2.1 必要参数配置

image.png

2.2.2 订阅

主题和分区

  • Topic,Kafka用于分类管理消息的逻辑单元,类似与MySQL的数据库。
  • Partition,是Kafka下数据存储的基本单元,这个是物理上的概念。同一个topic的数据,会被分散的存储到多个partition中,这些partition可以在同一台机器上,也可以是在多台机器上。优势在于:有利于水平扩展,避免单台机器在磁盘空间和性能上的限制,同时可以通过复制来增加数据冗余性,提高容灾能力。为了做到均匀分布,通常partition的数量通常是Broker Server数量的整数倍。
  • Consumer Group,同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。保证一个消费组获取到特定主题的全部的消息。在消费组内部,若干个消费者消费主题分区的消息,消费组可以保证一个主题的每个分区只被消费组中的一个消费者消费。

image.png
consumer 采用 pull 模式从 broker 中读取数据。
采用 pull 模式,consumer 可自主控制消费消息的速率, 可以自己控制消费方式(批量消费/逐条消费),还可以选择不同的提交方式从而实现不同的传输语义。
consumer.subscribe(“tp_demo_01,tp_demo_02”)

2.2.3 反序列化

Kafka的broker中所有的消息都是字节数组,消费者获取到消息之后,需要先对消息进行反序列化处理,然后才能交给用户程序消费处理。
消费者的反序列化器包括key的和value的反序列化器。
key.deserializer
value.deserializer
IntegerDeserializer
StringDeserializer
需要实现 org.apache.kafka.common.serialization.Deserializer 接口。

消费者从订阅的主题拉取消息:
consumer.poll(3_000);
在Fetcher类中,对拉取到的消息首先进行反序列化处理。

image.png

Kafka默认提供了几个反序列化的实现:
org.apache.kafka.common.serialization 包下包含了这几个实现:
image.png
image.png

image.png
image.png
image.png
image.png
image.png
image.png
image.png
自定义反序列化
自定义反序列化类,需要实现 org.apache.kafka.common.serialization.Deserializer 接口。
com.lagou.kafka.demo.deserializer.UserDeserializer

package com.lagou.kafka.demo.deserializer;

import com.lagou.kafka.demo.entity.User;
import org.apache.kafka.common.serialization.Deserializer;

import java.nio.ByteBuffer;
import java.util.Map;

public class UserDeserializer implements Deserializer<User> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public User deserialize(String topic, byte[] data) {
        ByteBuffer buffer = ByteBuffer.allocate(data.length);

        buffer.put(data);
        buffer.flip();

        final int userId = buffer.getInt();
        final int usernameLength = buffer.getInt();

        String username = new String(data, 8, usernameLength);

        return new User(userId, username);
    }

    @Override
    public void close() {

    }
}

消费者:

package com.lagou.kafka.demo.consumer;

import com.lagou.kafka.demo.deserializer.UserDeserializer;
import com.lagou.kafka.demo.entity.User;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

public class UserConsumer {

    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 设置自定义的反序列化器
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class);

        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "user_consumer");
        configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_id");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, User> consumer = new KafkaConsumer<String, User>(configs);

        // 订阅主题
        consumer.subscribe(Collections.singleton("tp_user_01"));

        final ConsumerRecords<String, User> records = consumer.poll(Long.MAX_VALUE);

        records.forEach(new Consumer<ConsumerRecord<String, User>>() {
            @Override
            public void accept(ConsumerRecord<String, User> record) {
                System.out.println(record.value());
            }
        });

        // 关闭消费者
        consumer.close();

    }

}

2.2.4 位移提交

  1. Consumer需要向Kafka记录自己的位移数据,这个汇报过程称为 提交位移(Committing Offsets)
  2. Consumer 需要为分配给它的每个分区提交各自的位移数据
  3. 位移提交的由Consumer端负责的,Kafka只负责保管。kafka内部主题 __consumer_offsets 保存偏移量
  4. 位移提交分为自动提交和手动提交
  5. 位移提交分为同步提交和异步提交

(1) 自动提交
Kafka Consumer 后台提交

  • 开启自动提交: enable.auto.commit=true
  • 配置自动提交间隔:Consumer端: auto.commit.interval.ms ,默认 5s
    Map<String, Object> configs = new HashMap<>(); 
    configs.put("bootstrap.servers", "node1:9092");
    configs.put("group.id", "mygrp"); 
    // 设置偏移量自动提交。自动提交是默认值。这里做示例。 
    configs.put("enable.auto.commit", "true"); 
    // 偏移量自动提交的时间间隔 
    configs.put("auto.commit.interval.ms", "3000");
    configs.put("key.deserializer", StringDeserializer.class); 
    configs.put("value.deserializer", StringDeserializer.class);
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String> (configs); 
    consumer.subscribe(Collections.singleton("tp_demo_01"));
    while (true) { 
        ConsumerRecords<String, String> records = consumer.poll(100); 
        for (ConsumerRecord<String, String> record : records) { 
            System.out.println(record.topic()
                               + "\t" + record.partition() 
                               + "\t" + record.offset()
                               + "\t" + record.key() 
                               + "\t" + record.value());
        }
    }
  • 自动提交位移的顺序
    • 配置 enable.auto.commit = true
    • Kafka会保证在开始调用poll方法时,提交上次poll返回的所有消息
    • 因此自动提交不会出现消息丢失,但会 重复消费
  • 重复消费举例
    • Consumer 每 5s 提交 offset
    • 假设提交 offset 后的 3s 发生了 Rebalance
    • Rebalance 之后的所有 Consumer 从上一次提交的 offset 处继续消费
    • 因此 Rebalance 发生前 3s 的消息会被重复消费

(2) 异步提交

  • 使用 KafkaConsumer#commitSync():会提交 KafkaConsumer#poll() 返回的最新 offset
  • 该方法为同步操作,等待直到 offset 被成功提交才返回

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); 
        process(records);
        // 处理消息
        try {
            consumer.commitSync(); 
        } catch (CommitFailedException e) {
            handle(e);
            // 处理提交失败异常 
        }
    }
  • commitSync 在处理完所有消息之后

  • 手动同步提交可以控制offset提交的时机和频率
  • 手动同步提交会:
    • 调用 commitSync 时,Consumer 处于阻塞状态,直到 Broker 返回结果
    • 会影响 TPS
    • 可以选择拉长提交间隔,但有以下问题
      • 会导致 Consumer 的提交频率下降
      • Consumer 重启后,会有更多的消息被消费

异步提交

  • KafkaConsumer#commitAsync()

    while (true) { 
        ConsumerRecords<String, String> records = consumer.poll(3_000);
        process(records); 
        // 处理消息
        consumer.commitAsync((offsets, exception) -> {
            if (exception != null) {
                handle(exception); 
            } 
        });
    }
  • commitAsync出现问题不会自动重试

  • 处理方式:
    try {
        while(true) { 
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); 
            process(records);
            // 处理消息 
            commitAysnc();
            // 使用异步提交规避阻塞 
        } 
    } catch(Exception e) { 
        handle(e); 
        // 处理异常 
    } finally { 
        try {
            consumer.commitSync();
            // 最后一次提交使用同步阻塞式提交 
        } finally { 
            consumer.close(); 
        } 
    }

    2.2.5 消费者位移管理

    Kafka中,消费者根据消息的位移顺序消费消息。
    消费者的位移由消费者管理,可以存储于zookeeper中,也可以存储于Kafka主题 __consumer_offsets中。

Kafka提供了消费者API,让消费者可以管理自己的位移。
API如下:KafkaConsumer

image.png
image.png
image.png

  1. 准备数据
    # 生成消息文件
    [root@node1 ~]# for i in `seq 60`; do echo "hello lagou $i" >> nm.txt; done
    
    # 创建主题,三个分区,每个分区一个副本 
    [root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create -- topic tp_demo_01 --partitions 3 --replication-factor 1 
        
    # 将消息生产到主题中 
    [root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_demo_01 < nm.txt
  2. API实战
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.Node;
    import org.apache.kafka.common.PartitionInfo; 
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer; 
    import java.util.*;
    
    
    
    /*** # 生成消息文件
    * [root@node1 ~]# for i in `seq 60`; do echo "hello lagou $i" >> nm.txt; done
    * # 创建主题,三个分区,每个分区一个副本
    * [root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_demo_01 --partitions 3 --replication-factor 1 
    * # 将消息生产到主题中 
    * [root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 -- topic tp_demo_01 < nm.txt 
    ** 消费者位移管理 
    */ 
    public class MyConsumerMgr1 {
        public static void main(String[] args) { 
            Map<String, Object> configs = new HashMap<>(); 
            configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
            configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
            
            /*** 给当前消费者手动分配一系列主题分区。 
            * 手动分配分区不支持增量分配,如果先前有分配分区,则该操作会覆盖之前的分配。 
            * 如果给出的主题分区是空的,则等价于调用unsubscribe方法。 
            * 手动分配主题分区的方法不使用消费组管理功能。当消费组成员变了,或者集群或主题 的元数据改变了,不会触发分区分配的再平衡。 
            ** 手动分区分配assign(Collection)不能和自动分区分配subscribe(Collection, ConsumerRebalanceListener)一起使用。 
            * 如果启用了自动提交偏移量,则在新的分区分配替换旧的分区分配之前,会对旧的分区 分配中的消费偏移量进行异步提交。
            **/
            // consumer.assign(Arrays.asList(new TopicPartition("tp_demo_01", 0))); 
            //// Set<TopicPartition> assignment = consumer.assignment();
            // for (TopicPartition topicPartition : assignment) {
            // System.out.println(topicPartition); 
            // } 
            // 获取对用户授权的所有主题分区元数据。该方法会对服务器发起远程调用。 
            // Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics(); 
            //// stringListMap.forEach((k, v) -> { 
            // System.out.println("主题:" + k); 
            // v.forEach(info -> { 
            // System.out.println(info); 
            // }); 
            // }); 
            // Set<String> strings = consumer.listTopics().keySet(); 
            //// strings.forEach(topicName -> { 
            // System.out.println(topicName);
            // });
            // List<PartitionInfo> partitionInfos = consumer.partitionsFor("tp_demo_01"); 
            // for (PartitionInfo partitionInfo : partitionInfos) { 
            // Node leader = partitionInfo.leader(); 
            // System.out.println(leader); 
            // System.out.println(partitionInfo); 
            // // 当前分区在线副本
            // Node[] nodes = partitionInfo.inSyncReplicas(); 
            // // 当前分区下线副本 
            // Node[] nodes1 = partitionInfo.offlineReplicas(); 
            // }
            // 手动分配主题分区给当前消费者
            consumer.assign(Arrays.asList( new TopicPartition("tp_demo_01", 0), new TopicPartition("tp_demo_01", 1), new TopicPartition("tp_demo_01", 2) ));
            // 列出当前主题分配的所有主题分区 
            // Set<TopicPartition> assignment = consumer.assignment();
            // assignment.forEach(k -> { 
            // System.out.println(k);
            // });
            
            // 对于给定的主题分区,列出它们第一个消息的偏移量。
            // 注意,如果指定的分区不存在,该方法可能会永远阻塞。 
            // 该方法不改变分区的当前消费者偏移量。
            // Map<TopicPartition, Long> topicPartitionLongMap = consumer.beginningOffsets(consumer.assignment()); 
            //// topicPartitionLongMap.forEach((k, v) -> { 
            // System.out.println("主题:" + k.topic() + "\t分区:" + k.partition() + "偏移量\t" + v); 
            // }); 
            // 将偏移量移动到每个给定分区的最后一个。
            // 该方法延迟执行,只有当调用过poll方法或position方法之后才可以使用。
            // 如果没有指定分区,则将当前消费者分配的所有分区的消费者偏移量移动到最后。
            // 如果设置了隔离级别为:isolation.level=read_committed,则会将分区的消费 偏移量移动到
            // 最后一个稳定的偏移量,即下一个要消费的消息现在还是未提交状态的事务消息。 
            // consumer.seekToEnd(consumer.assignment()); 
            // 将给定主题分区的消费偏移量移动到指定的偏移量,即当前消费者下一条要消费的消息 偏移量。
            // 若该方法多次调用,则最后一次的覆盖前面的。 
            // 如果在消费中间随意使用,可能会丢失数据。
            // consumer.seek(new TopicPartition("tp_demo_01", 1), 10); 
            //// // 检查指定主题分区的消费偏移量
            // long position = consumer.position(new TopicPartition("tp_demo_01", 1)); 
            // System.out.println(position); consumer.seekToEnd(Arrays.asList(new TopicPartition("tp_demo_01", 1))); 
            // 检查指定主题分区的消费偏移量
            long position = consumer.position(new TopicPartition("tp_demo_01", 1));
            System.out.println(position); 
            // 关闭生产者 
            consumer.close();
        } 
    }
    package com.lagou.kafka.demo;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.*;
    import java.util.function.BiConsumer;
    
    public class MyOffsetManager {
        public static void main(String[] args) {
    
            Map<String, Object> configs = new HashMap<>();
            configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
            configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            // group.id很重要
            configs.put(ConsumerConfig.GROUP_ID_CONFIG, "mygrp1");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
    
    //        consumer.subscribe(Collections.singleton("tp_demo_01"));
    
            // 如何手动给消费者分配分区?
            // 1、需要知道有哪些主题可以访问,和消费
    
            // 获取当前消费者可以访问和消费的主题以及它们的分区信息
    //        final Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics();
    //
    //        stringListMap.forEach(new BiConsumer<String, List<PartitionInfo>>() {
    //            @Override
    //            public void accept(String topicName, List<PartitionInfo> partitionInfos) {
    //                System.out.println("主题名称:" + topicName);
    //                for (PartitionInfo partitionInfo : partitionInfos) {
    //                    System.out.println(partitionInfo);
    //                }
    //            }
    //        });
    
    //        final Set<TopicPartition> assignment1 = consumer.assignment();
    //
    //        for (TopicPartition partition : assignment1) {
    //            System.out.println(partition);
    //        }
    //        System.out.println("----------------------------");
    
            // 给当前消费者分配指定的主题分区
            consumer.assign(Arrays.asList(
                    new TopicPartition("tp_demo_01", 0),
                    new TopicPartition("tp_demo_01", 1),
                    new TopicPartition("tp_demo_01", 2)
            ));
    
            // 获取给当前消费者分配的主题分区信息
    //        final Set<TopicPartition> assignment = consumer.assignment();
    //
    //        for (TopicPartition partition : assignment) {
    //            System.out.println(partition);
    //        }
    
            // 查看当前消费者在指定主题的分区上的消费者偏移量
    //        final long offset0 = consumer.position(new TopicPartition("tp_demo_01", 0));
    //
    //        System.out.println("当前主题在0号分区上的位移:" + offset0);
    
    //        consumer.seekToBeginning(Arrays.asList(
    //                new TopicPartition("tp_demo_01", 0),
    //                new TopicPartition("tp_demo_01", 2)
    //        ));
            long offset0 = consumer.position(new TopicPartition("tp_demo_01", 0));
            long offset1 = consumer.position(new TopicPartition("tp_demo_01", 1));
            long offset2 = consumer.position(new TopicPartition("tp_demo_01", 2));
    
            System.out.println(offset0);
            System.out.println(offset1);
            System.out.println(offset2);
    
    //        consumer.seekToEnd(Arrays.asList(new TopicPartition("tp_demo_01", 2)));
    
            consumer.seek(new TopicPartition("tp_demo_01", 2), 14);
    
            offset0 = consumer.position(new TopicPartition("tp_demo_01", 0));
            offset1 = consumer.position(new TopicPartition("tp_demo_01", 1));
            offset2 = consumer.position(new TopicPartition("tp_demo_01", 2));
    
            System.out.println(offset0);
            System.out.println(offset1);
            System.out.println(offset2);
    
            consumer.close();
        }
    
    
    
    }
    
    package com.lagou.kafka.demo;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.function.Consumer;
    
    public class MyConsumer {
        public static void main(String[] args) {
            Map<String, Object> configs = new HashMap<>();
            configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
            configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            // group.id很重要
            configs.put(ConsumerConfig.GROUP_ID_CONFIG, "mygrp1");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
    
            consumer.subscribe(Arrays.asList("tp_demo_01"));
    
            while (true) {
                final ConsumerRecords<String, String> records = consumer.poll(1_000);
    
                records.forEach(new Consumer<ConsumerRecord<String, String>>() {
                    @Override
                    public void accept(ConsumerRecord<String, String> record) {
                        System.out.println(record);
                    }
                });
    
            }
    
        }
    }
    

    2.2.6 再均衡

    重平衡可以说是kafka为人诟病最多的一个点了。
    重平衡其实就是一个协议,它规定了如何让消费者组下的所有消费者来分配topic中的每一个分区。 比如一个topic有100个分区,一个消费者组内有20个消费者,在协调者的控制下让组内每一个消费者分配到5个分区,这个分配的过程就是重平衡。

重平衡的触发条件主要有三个:

1. 消费者组内成员发生变更,这个变更包括了增加和减少消费者,比如消费者宕机退出消费组。 
    2. 主题的分区数发生变更,kafka目前只支持增加分区,当增加的时候就会触发重平衡 
    3. 订阅的主题发生变化,当消费者组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会触发重平衡
        ![image.png](https://note-1259153703.cos.ap-nanjing.myqcloud.com/images/202201041511309.png)
        消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区。
        ![image.png](https://note-1259153703.cos.ap-nanjing.myqcloud.com/images/202201041511890.png)
        由于broker宕机,主题X的分区3宕机,此时分区3没有Leader副本,触发再平衡,消费者4没有对应的主题分区,则消费者4闲置。
        ![image.png](https://note-1259153703.cos.ap-nanjing.myqcloud.com/images/202201041511473.png)

主题增加分区,需要主题分区和消费组进行再均衡。
image.png
由于使用正则表达式订阅主题,当增加的主题匹配正则表达式的时候,也要进行再均衡。
image.png

为什么说重平衡为人诟病呢?因为重平衡过程中,消费者无法从kafka消费消息,这对kafka的TPS影响极大,而如果kafka集内节点较多,比如数百个,那重平衡可能会耗时极多。数分钟到数小时都有可能,而这段时间kafka基本处于不可用状态。所以在实际环境中,应该尽量避免重平衡发生。

避免重平衡
要说完全避免重平衡,是不可能,因为你无法完全保证消费者不会故障。而消费者故障其实也是最常见的引发重平衡的地方,所以我们需要保证尽力避免消费者故障
而其他几种触发重平衡的方式,增加分区,或是增加订阅的主题,抑或是增加消费者,更多的是主 动控制。
如果消费者真正挂掉了,就没办法了,但实际中,会有一些情况,kafka错误地认为一个正常的消费者已经挂掉了,我们要的就是避免这样的情况出现。
首先要知道哪些情况会出现错误判断挂掉的情况。
在分布式系统中,通常是通过心跳来维持分布式系统的,kafka也不例外。

在分布式系统中,由于网络问题你不清楚没接收到心跳,是因为对方真正挂了还是只是因为负载过重没来得及发生心跳或是网络堵塞。所以一般会约定一个时间,超时即判定对方挂了。而在kafka消费者场景中,session.timout.ms参数就是规定这个超时时间是多少
还有一个参数,heartbeat.interval.ms,这个参数控制发送心跳的频率,频率越高越不容易被误判,但也会消耗更多资源。
此外,还有最后一个参数,max.poll.interval.ms,消费者poll数据后,需要一些处理,再进行拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。也就是说,拉取,然后处理,这个处理的时间不能超过 max.poll.interval.ms 这个参数的值。这个参数的默认值是5分钟,而如果消费者接收到数据后会执行耗时的操作,则应该将其设置得大一些。

三个参数,
session.timout.ms控制心跳超时时间,
heartbeat.interval.ms控制心跳发送频率,
max.poll.interval.ms控制poll的间隔。

这里给出一个相对较为合理的配置,如下:

  • session.timout.ms:设置为6s
  • heartbeat.interval.ms:设置2s
  • max.poll.interval.ms:推荐为消费者处理消息最长耗时再加1分钟

2.2.7 消费者拦截器

消费者在拉取了分区消息之后,要首先经过反序列化器对key和value进行反序列化处理。
处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应用程序进行处理。
image.png

消费端定义消息拦截器,需要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口。

  1. 一个可插拔接口,允许拦截甚至更改消费者接收到的消息。首要的用例在于将第三方组件引入消费者应用程序,用于定制的监控、日志处理等。
  2. 该接口的实现类通过configre方法获取消费者配置的属性,如果消费者配置中没有指定 clientID,还可以获取KafkaConsumer生成的clientId。获取的这个配置是跟其他拦截器共享的,需要保证不会在各个拦截器之间产生冲突。
  3. ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置了 错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来。
  4. ConsumerInterceptor回调发生在 org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)方法同一个线程。

该接口中有如下方法:

package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.Configurable; 
import org.apache.kafka.common.TopicPartition;
import java.util.Map; 

public interface ConsumerInterceptor<K, V> extends Configurable { 
    /**** 该方法在poll方法返回之前调用。调用结束后poll方法就返回消息了。 
    ** 该方法可以修改消费者消息,返回新的消息。拦截器可以过滤收到的消息或生成新的消息。 
    * 如果有多个拦截器,则该方法按照KafkaConsumer的configs中配置的顺序调用。 
    ** @param records 由上个拦截器返回的由客户端消费的消息。 
    */ 
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
    
    /*** 当消费者提交偏移量时,调用该方法。 * 该方法抛出的任何异常调用者都会忽略。 */
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets); 
    public void close(); 
}

消费者:
package com.lagou.kafka.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

public class MyConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "mygrp");
//        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "myclient");
        // 如果在kafka中找不到当前消费者的偏移量,则设置为最旧的
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 配置拦截器
        // One -> Two -> Three,接收消息和发送偏移量确认都是这个顺序
        props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                        "com.lagou.kafka.demo.interceptor.OneInterceptor" +
                        ",com.lagou.kafka.demo.interceptor.TwoInterceptor" +
                        ",com.lagou.kafka.demo.interceptor.ThreeInterceptor"
                );

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        // 订阅主题
        consumer.subscribe(Collections.singleton("tp_demo_01"));

        while (true) {
            final ConsumerRecords<String, String> records = consumer.poll(3_000);

            records.forEach(record -> {
                System.out.println(record.topic()
                        + "\t" + record.partition()
                        + "\t" + record.offset()
                        + "\t" + record.key()
                        + "\t" + record.value());
            });

//            consumer.commitAsync();
//            consumer.commitSync();

        }

//        consumer.close();

    }
}

package com.lagou.kafka.demo.interceptor;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;

public class OneInterceptor implements ConsumerInterceptor<String, String> {
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        // poll方法返回结果之前最后要调用的方法

        System.out.println("One -- 开始");

        // 消息不做处理,直接返回
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        // 消费者提交偏移量的时候,经过该方法
        System.out.println("One -- 结束");
    }

    @Override
    public void close() {
        // 用于关闭该拦截器用到的资源,如打开的文件,连接的数据库等
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // 用于获取消费者的设置参数
        configs.forEach((k, v) -> {
            System.out.println(k + "\t" + v);
        });
    }
}

package com.lagou.kafka.demo.interceptor;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;

public class TwoInterceptor implements ConsumerInterceptor<String, String> {
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        // poll方法返回结果之前最后要调用的方法

        System.out.println("Two -- 开始");

        // 消息不做处理,直接返回
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        // 消费者提交偏移量的时候,经过该方法
        System.out.println("Two -- 结束");
    }

    @Override
    public void close() {
        // 用于关闭该拦截器用到的资源,如打开的文件,连接的数据库等
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // 用于获取消费者的设置参数
        configs.forEach((k, v) -> {
            System.out.println(k + "\t" + v);
        });
    }
}

package com.lagou.kafka.demo.interceptor;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.Map;

public class ThreeInterceptor implements ConsumerInterceptor<String, String> {
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        // poll方法返回结果之前最后要调用的方法

        System.out.println("Three -- 开始");

        // 消息不做处理,直接返回
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        // 消费者提交偏移量的时候,经过该方法
        System.out.println("Three -- 结束");
    }

    @Override
    public void close() {
        // 用于关闭该拦截器用到的资源,如打开的文件,连接的数据库等
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // 用于获取消费者的设置参数
        configs.forEach((k, v) -> {
            System.out.println(k + "\t" + v);
        });
    }
}

2.2.8 消费者参数配置补充

image.png
image.png
image.png
image.png
image.png

2.3 消费组管理

一、消费者组 (Consumer Group)
1. 什么是消费者组
consumer group是kafka提供的可扩展且具有容错性的消费者机制。
三个特性:

  1. 消费组有一个或多个消费者,消费者可以是一个进程,也可以是一个线程
  2. group.id是一个字符串,唯一标识一个消费组
  3. 消费组订阅的主题每个分区只能分配给消费组一个消费者。

2. 消费者位移(consumer position)
消费者在消费的过程中记录已消费的数据,即消费位移(offset)信息。
每个消费组保存自己的位移信息,那么只需要简单的一个整数表示位置就够了;同时可以引入 checkpoint机制定期持久化。

3. 位移管理(offset management)
3.1 自动VS手动
Kafka默认定期自动提交位移( enable.auto.commit = true ),也手动提交位移。另外kafka会定期把group消费情况保存起来,做成一个offset map,如下图所示:
image.png
3.2 位移提交
位移是提交到Kafka中的 consumer_offsets 主题。 consumer_offsets 中的消息保存了每个消费组某一时刻提交的offset信息。

[root@node1 __consumer_offsets-0]# kafka-console-consumer.sh --topic 
__consumer_offsets --bootstrap-server node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" 
--consumer.config /opt/kafka_2.12-1.0.2/config/consumer.properties --from- beginning | head

image.png
上图中,标出来的,表示消费组为 test-consumer-group ,消费的主题为 __consumer_offsets , 消费的分区是4,偏移量为5。

__consumers_offsets 主题配置了compact策略,使得它总是能够保存最新的位移信息,既控制了该topic总体的日志容量,也能实现保存最新offset的目的

上图中,标出来的,表示消费组为 test-consumer-group ,消费的主题为 consumer_offsets , 消费的分区是4,偏移量为5。 consumers_offsets 主题配置了compact策略,使得它总是能够保存最新的位移信息,既控制 了该topic总体的日志容量,也能实现保存最新offset的目的。​

2.4 再谈再均衡

2.4.1 什么是再均衡?

再均衡(Rebalance)本质上是一种协议,规定了一个消费组中所有消费者如何达成一致来分配订阅主题的每个分区。
比如某个消费组有20个消费组,订阅了一个具有100个分区的主题。正常情况下,Kafka平均会为每个消费者分配5个分区。这个分配的过程就叫再均衡。

2.4.2 什么时候再均衡?

再均衡的触发条件:

  1. 组成员发生变更(新消费者加入消费组组、已有消费者主动离开或崩溃了)
  2. 订阅主题数发生变更。如果正则表达式进行订阅,则新建匹配正则表达式的主题触发再均衡。
  3. 订阅主题的分区数发生变更

    2.4.3 如何进行组内分区分配?

    三种分配策略:RangeAssignor和RoundRobinAssignor以及StickyAssignor。后面讲。

2.4.4 谁来执行再均衡和消费组管理?

Kafka提供了一个角色:Group Coordinator来执行对于消费组的管理。
Group Coordinator——每个消费组分配一个消费组协调器用于组管理和位移管理。当消费组的第 一个消费者启动的时候,它会去和Kafka Broker确定谁是它们组的组协调器。之后该消费组内所有消费 者和该组协调器协调通信。

2.4.5 如何确定coordinator?

两步:

  1. 确定消费组位移信息写入 consumers_offsets 的哪个分区。具体计算公式: consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
    注意:groupMetadataTopicPartitionCount 由 offsets.topic.num.partitions 指定,默认是50个分区。

  2. 该分区leader所在的broker就是组协调器。

2.4.6 Rebalance Generation

它表示Rebalance之后主题分区到消费组中消费者映射关系的一个版本,主要是用于保护消费组, 隔离无效偏移量提交的。如上一个版本的消费者无法提交位移到新版本的消费组中,因为映射关系变 了,你消费的或许已经不是原来的那个分区了。每次group进行Rebalance之后,Generation号都会加 1,表示消费组和分区的映射关系到了一个新版本,如下图所示: Generation 1时group有3个成员,随后成员2退出组,消费组协调器触发Rebalance,消费组进入Generation 2,之后成员4加入,再次触发Rebalance,消费组进入Generation 3.
image.png

2.4.7 协议(protocol)

kafka提供了5个协议来处理与消费组协调相关的问题:
Heartbeat请求:consumer需要定期给组协调器发送心跳来表明自己还活着
LeaveGroup请求:主动告诉组协调器我要离开消费组
SyncGroup请求:消费组Leader把分配方案告诉组内所有成员
JoinGroup请求:成员请求加入组
DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用组协调器在再均衡的时候主要用到了前面4种请求。

2.4.8 liveness

消费者如何向消费组协调器证明自己还活着? 通过定时向消费组协调器发送Heartbeat请求。如果超过了设定的超时时间,那么协调器认为该消费者已经挂了。一旦协调器认为某个消费者挂了,那么它就会开启新一轮再均衡,并且在当前其他消费者的心跳响应中添加“REBALANCE_IN_PROGRESS”,告诉其他消费者:重新分配分区。

2.4.9 再均衡过程

再均衡分为2步:Join和Sync

  1. Join, 加入组。所有成员都向消费组协调器发送JoinGroup请求,请求加入消费组。一旦所有成员都发送了JoinGroup请求,协调i器从中选择一个消费者担任Leader的角色,并把组成员信息以及订阅信息发给Leader。
  2. Sync,Leader开始分配消费方案,即哪个消费者负责消费哪些主题的哪些分区。一旦完成分配,Leader会将这个方案封装进SyncGroup请求中发给消费组协调器,非Leader也会发SyncGroup请求,只是内容为空。消费组协调器接收到分配方案之后会把方案塞进SyncGroup的response中发给各个消费者。
    image.png
    注意:在协调器收集到所有成员请求前,它会把已收到请求放入一个叫purgatory(炼狱)的地方。然后是分发分配方案的过程,即SyncGroup请求:
    image.png
    注意:消费组的分区分配方案在客户端执行。Kafka交给客户端可以有更好的灵活性。Kafka默认提 供三种分配策略:range和round-robin和sticky。可以通过消费者的参数: partition.assignment.strategy 来实现自己分配策略。

2.4.10 消费组状态机

消费组组协调器根据状态机对消费组做不同的处理:
image.png
说明:

  1. Dead:组内已经没有任何成员的最终状态,组的元数据也已经被组协调器移除了。这种状态响应各种请求都是一个response: UNKNOWN_MEMBER_ID
  2. Empty:组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求
  3. PreparingRebalance:组准备开启新的rebalance,等待成员加入
  4. AwaitingSync:正在等待leader consumer将分配方案传给各个成员
  5. Stable:再均衡完成,可以开始消费。

3. 主题

3.1 管理

使用kafka-topics.sh脚本:
image.png
image.png
image.png

主题中可以使用的参数定义:
image.png
image.png
image.png

3.1.1 创建主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_x --partitions 1 --replication-factor 1
kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_02 --partitions 3 --replication-factor 1 --config max.message.bytes=1048576 --config segment.bytes=10485760

3.1.2 查看主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --list 
kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_x 
kafka-topics.sh --zookeeper localhost:2181/myKafka --topics-with-overrides --describe

3.1.3 修改主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_test_01 --partitions 2 --replication-factor 1 
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config max.message.bytes=1048576 
kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_test_01 
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --topic topic_test_01 --config segment.bytes=10485760 
kafka-topics.sh --zookeeper localhost:2181/myKafka --alter --delete-config max.message.bytes --topic topic_test_01

3.1.4 删除主题

kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_x

image.png
给主题添加删除的标记:
image.png
要过一段时间删除。

3.2 增加分区

通过命令行工具操作,主题的分区只能增加,不能减少。否则报错:

ERROR org.apache.kafka.common.errors.InvalidPartitionsException: The number of partitions for a topic can only be increased. 
Topic myTop1 currently has 2 partitions, 1 would not be an increase.

通过—alter修改主题的分区数,增加分区。
kafka-topics.sh --zookeeper localhost/myKafka --alter --topic myTop1 -- partitions 2

3.3 分区副本的分配-了解

副本分配的三个目标:

  1. 均衡地将副本分散于各个broker上
  2. 对于某个broker上分配的分区,它的其他副本在其他broker上
  3. 如果所有的broker都有机架信息,尽量将分区的各个副本分配到不同机架上的broker。

在不考虑机架信息的情况下:

  1. 第一个副本分区通过轮询的方式挑选一个broker,进行分配。该轮询从broker列表的随机位置 进行轮询。
  2. 其余副本通过增加偏移进行分配。

分配案例:
image.png
image.png
考虑到机架信息,首先为每个机架创建一个broker列表。如:
三个机架(rack1,rack2,rack3),六个broker(0,1,2,3,4,5)
brokerID -> rack

  • 0 -> “rack1”, 1 -> “rack3”, 2 -> “rack3”, 3 -> “rack2”, 4 -> “rack2”, 5 -> “rack1”

rack1:0,5
rack2:3,4
rack3:1,2
这broker列表为rack1的0,rack2的3,rack3的1,rack1的5,rack2的4,rack3的2
即:0, 3, 1, 5, 4, 2
通过简单的轮询将分区分配给不同机架上的broker:
image.png
每个分区副本在分配的时候在上一个分区第一个副本开始分配的位置右移一位。
六个broker,六个分区,正好最后一个分区的第一个副本分配的位置是该broker列表的最后一个。
如果有更多的分区需要分配,则算法开始对follower副本进行移位分配。
这主要是为了避免每次都得到相同的分配序列。
此时,如果有一个分区等待分配(分区6),这按照如下方式分配:
6 -> 0,4,2 (而不是像分区0那样重复0,3,1)

跟机架相关的副本分配中,永远在机架相关的broker列表中轮询地分配第一个副本。 其余的副本,倾向于机架上没有副本的broker进行副本分配,除非每个机架有一个副本。 然后其他的副本又通过轮询的方式分配给broker。

结果是,如果副本的个数大于等于机架数,保证每个机架最少有一个副本。
否则每个机架最多保有一个副本。
如果副本的个数和机架的个数相同,并且每个机架包含相同个数的broker,可以保证副本在机架和 broker之间均匀分布。
image.png
上图,tp_eagle_01主题的分区0分配信息:leader分区在broker1上,同步副本分区是1和2,也就是在broker1和broker2上的两个副本分区是同步副本分区,其中一个是leader分区。

3.4 必要参数配置

kafka-topics.sh --config xx=xx --config yy=yy
配置给主题的参数。
image.png
image.png

3.5 KafkaAdminClient应用

说明
除了使用Kafka的bin目录下的脚本工具来管理Kafka,还可以使用管理Kafka的API将某些管理查看的功能集成到系统中。在Kafka0.11.0.0版本之前,可以通过kafka-core包(Kafka的服务端,采用Scala编写)中的AdminClient和AdminUtils来实现部分的集群管理操作。Kafka0.11.0.0之后,又多了一个AdminClient,在kafka-client包下,一个抽象类,具体的实现是 org.apache.kafka.clients.admin.KafkaAdminClient。

功能与原理介绍
Kafka官网:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects。
KafkaAdminClient包含了一下几种功能(以Kafka1.0.2版本为准):

  1. 创建主题:
  2. createTopics(final Collection newTopics, final CreateTopicsOptions options)
  3. 删除主题:
  4. deleteTopics(final Collection topicNames, DeleteTopicsOptions options)
  5. 列出所有主题:
  6. listTopics(final ListTopicsOptions options)
  7. 查询主题:
  8. describeTopics(final Collection topicNames, DescribeTopicsOptions options)
  9. 查询集群信息:
  10. describeCluster(DescribeClusterOptions options)
  11. 查询配置信息:
  12. describeConfigs(Collection configResources, final DescribeConfigsOptions options)
  13. 修改配置信息:
  14. alterConfigs(Map configs, final AlterConfigsOptions options)
  15. 修改副本的日志目录:
  16. alterReplicaLogDirs(Map replicaAssignment, final AlterReplicaLogDirsOptions options)
  17. 查询节点的日志目录信息:
  18. describeLogDirs(Collection brokers, DescribeLogDirsOptions options)
  19. 查询副本的日志目录信息:
  20. describeReplicaLogDirs(Collection replicas, DescribeReplicaLogDirsOptions options)
  21. 增加分区:
  22. createPartitions(Map newPartitions, final CreatePartitionsOptions options)

其内部原理是使用Kafka自定义的一套二进制协议来实现,详细可以参见Kafka协议。
用到的参数:
image.png
image.png
image.png
主要操作步骤:
客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送 CreateTopicRequest请求。
客户端发送请求至Kafka Broker。

Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是 CreateTopicResponse。
客户端接收相应的回执并进行解析处理。

和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中, AbstractRequest和AbstractResponse是这些请求和响应类的两个父类。
综上,如果要自定义实现一个功能,只需要三个步骤:

  1. 自定义XXXOptions;
  2. 自定义XXXResult返回值;
  3. 自定义Call,然后挑选合适的XXXRequest和XXXResponse来实现Call类中的3个抽象方法。
package com.lagou.kafka.demo;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class MyAdminClient {

    private KafkaAdminClient client;

    @Before
    public void before() {

        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers", "node1:9092");
        configs.put("client.id", "admin_001");

        client = (KafkaAdminClient) KafkaAdminClient.create(configs);
    }

    @After
    public void after() {
        // 关闭admin客户端
        client.close();
    }

    @Test
    public void testListTopics() throws ExecutionException, InterruptedException {
        // 列出主题
//        final ListTopicsResult listTopicsResult = client.listTopics();

        ListTopicsOptions options = new ListTopicsOptions();
        // 列出内部主题
        options.listInternal(true);
        // 设置请求超时时间,单位是毫秒
        options.timeoutMs(500);

        final ListTopicsResult listTopicsResult = client.listTopics(options);

//        final Set<String> strings = listTopicsResult.names().get();
//
//        strings.forEach(name -> {
//            System.out.println(name);
//        });

        // 将请求变成同步的请求,直接获取结果
        final Collection<TopicListing> topicListings = listTopicsResult.listings().get();

        topicListings.forEach(new Consumer<TopicListing>() {
            @Override
            public void accept(TopicListing topicListing) {

                // 该主题是否是内部主题
                final boolean internal = topicListing.isInternal();
                // 该主题的名字
                final String name = topicListing.name();


                System.out.println("主题是否是内部主题:" + internal);
                System.out.println("主题的名字:" + name);
                System.out.println(topicListing);
                System.out.println("=====================================");
            }
        });

    }


    @Test
    public void testDescribeLogDirs() throws ExecutionException, InterruptedException {
        final DescribeLogDirsResult describeLogDirsResult = client.describeLogDirs(Collections.singleton(0));

        final Map<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>> integerMapMap
                = describeLogDirsResult.all().get();

        integerMapMap.forEach(new BiConsumer<Integer, Map<String, DescribeLogDirsResponse.LogDirInfo>>() {
            @Override
            public void accept(Integer integer, Map<String, DescribeLogDirsResponse.LogDirInfo> stringLogDirInfoMap) {
                System.out.println("broker.id = " + integer);
//                log.dirs可以设置多个目录
                stringLogDirInfoMap.forEach(new BiConsumer<String, DescribeLogDirsResponse.LogDirInfo>() {
                    @Override
                    public void accept(String s, DescribeLogDirsResponse.LogDirInfo logDirInfo) {
                        System.out.println("logdir = " + s);
                        final Map<TopicPartition, DescribeLogDirsResponse.ReplicaInfo> replicaInfos = logDirInfo.replicaInfos;

                        replicaInfos.forEach(new BiConsumer<TopicPartition, DescribeLogDirsResponse.ReplicaInfo>() {
                            @Override
                            public void accept(TopicPartition topicPartition, DescribeLogDirsResponse.ReplicaInfo replicaInfo) {
                                System.out.println("主题分区:" + topicPartition.partition());
                                System.out.println("主题:" + topicPartition.topic());
//                                final boolean isFuture = replicaInfo.isFuture;
//                                final long offsetLag = replicaInfo.offsetLag;
//                                final long size = replicaInfo.size;
                            }
                        });

                    }
                });
            }
        });


    }


}

3.6 偏移量管理

Kafka 1.0.2,__consumer_offsets主题中保存各个消费组的偏移量。
早期由zookeeper管理消费组的偏移量。

查询方法:
通过原生 kafka 提供的工具脚本进行查询。
工具脚本的位置与名称为 bin/kafka-consumer-groups.sh
首先运行脚本,查看帮助:
image.png
image.png
image.png
image.png
这里我们先编写一个生产者,消费者的例子:
我们先启动消费者,再启动生产者, 再通过 bin/kafka-consumer-groups.sh 进行消费偏移量查询,
由于kafka 消费者记录group的消费偏移量有两种方式 :
1)kafka 自维护 (新)
2)zookpeer 维护 (旧) ,已经逐渐被废弃
所以 ,脚本只查看由broker维护的,由zookeeper维护的可以将 —bootstrap-server 换成 —zookeeper 即可。

1. 查看有那些 group ID 正在进行消费:

[root@node11 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --list 
Note: This will not show information about old Zookeeper-based consumers. group

image.png
注意:

  1. 这里面是没有指定 topic,查看的是所有topic消费者的 group.id 的列表。
  2. 注意: 重名的 group.id 只会显示一次

2.查看指定group.id 的消费者消费情况
image.png
如果消费者停止,查看偏移量信息:
image.png
将偏移量设置为最早的:
image.png
将偏移量设置为最新的:
image.png
分别将指定主题的指定分区的偏移量向前移动10个消息:
image.png
代码:
KafkaProducerSingleton.java

package com.lagou.kafka.demo.producer;

import org.apache.kafka.clients.producer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.Random;

public class KafkaProducerSingleton {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerSingleton.class);
    private static KafkaProducer<String, String> kafkaProducer;
    private Random random = new Random();
    private String topic;
    private int retry;

    private KafkaProducerSingleton() {
    }

    /**
     * 静态内部类
     *
     * @author tanjie
     */
    private static class LazyHandler {
        private static final KafkaProducerSingleton instance = new KafkaProducerSingleton();
    }

    /**
     * 单例模式,kafkaProducer是线程安全的,可以多线程共享一个实例
     * @return
     */
    public static final KafkaProducerSingleton getInstance() {
        return LazyHandler.instance;
    }

    /**
     * kafka生产者进行初始化
     *
     * @return KafkaProducer
     */
    public void init(String topic, int retry) {
        this.topic = topic;
        this.retry = retry;
        if (null == kafkaProducer) {
            Properties props = new Properties();
            props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
            props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.setProperty(ProducerConfig.ACKS_CONFIG, "1");

            kafkaProducer = new KafkaProducer<String, String>(props);
        }
    }

    /**
     * 通过kafkaProducer发送消息
     * @param message
     */
    public void sendKafkaMessage(final String message) {

        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                topic, random.nextInt(3), "", message);

        kafkaProducer.send(record, new Callback() {
            public void onCompletion(RecordMetadata recordMetadata,
                                     Exception exception) {
                if (null != exception) {
                    LOGGER.error("kafka发送消息失败:" + exception.getMessage(), exception);
                    retryKakfaMessage(message);
                }
            }
        });
    }

    /**
     * 当kafka消息发送失败后,重试
     *
     * @param retryMessage
     */
    private void retryKakfaMessage(final String retryMessage) {
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                topic, random.nextInt(3), "", retryMessage);
        for (int i = 1; i <= retry; i++) {
            try {
                kafkaProducer.send(record);
                return;
            } catch (Exception e) {
                LOGGER.error("kafka发送消息失败:" + e.getMessage(), e);
                retryKakfaMessage(retryMessage);
            }
        }
    }

    /**
     * kafka实例销毁
     */
    public void close() {
        if (null != kafkaProducer) {
            kafkaProducer.close();
        }
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public int getRetry() {
        return retry;
    }

    public void setRetry(int retry) {
        this.retry = retry;
    }
}

ProducerHandler.java

package com.lagou.kafka.demo.producer;

public class ProducerHandler implements Runnable {
    private String message;

    public ProducerHandler(String message) {
        this.message = message;
    }

    @Override
    public void run() {
        KafkaProducerSingleton kafkaProducerSingleton = KafkaProducerSingleton.getInstance();
        kafkaProducerSingleton.init("tp_demo_02", 3);
        int i = 0;

        while (true) {
            try {
                System.out.println("当前线程:" + Thread.currentThread().getName()
                        + "\t获取的kafka实例:" + kafkaProducerSingleton);
                kafkaProducerSingleton.sendKafkaMessage("发送消息: " + message + " " + (++i));
                Thread.sleep(100);
            } catch (Exception e) {
            }
        }
    }
}

MyProducer.java
package com.lagou.kafka.demo.producer;

public class MyProducer {
    public static void main(String[] args){
        Thread thread = new Thread(new ProducerHandler("hello lagou "));
        thread.start();
    }
}

KafkaConsumerAuto.java

package com.lagou.kafka.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class KafkaConsumerAuto {
    /**
     * kafka消费者不是线程安全的
     */
    private final KafkaConsumer<String, String> consumer;

    private ExecutorService executorService;

    public KafkaConsumerAuto() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        // 打开自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put("auto.commit.interval.ms", "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        consumer = new KafkaConsumer<String, String>(props);
        // 订阅主题
        consumer.subscribe(Collections.singleton("tp_demo_02"));
    }

    public void execute() throws InterruptedException {
        executorService = Executors.newFixedThreadPool(2);
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(2_000);
            if (null != records) {
                executorService.submit(new ConsumerThreadAuto(records, consumer));
            }
            Thread.sleep(1000);
        }
    }

    public void shutdown() {
        try {
            if (consumer != null) {
                consumer.close();
            }
            if (executorService != null) {
                executorService.shutdown();
            }
            if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
                System.out.println("关闭线程池超时。。。");
            }
        } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
    }
}

ConsumerThreadAuto.java
package com.lagou.kafka.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerThreadAuto implements Runnable {
    private ConsumerRecords<String, String> records;
    private KafkaConsumer<String, String> consumer;

    public ConsumerThreadAuto(ConsumerRecords<String, String> records,
                              KafkaConsumer<String, String> consumer) {
        this.records = records;
        this.consumer = consumer;
    }

    @Override
    public void run() {

        for(ConsumerRecord<String,String> record : records){
            System.out.println("当前线程:" + Thread.currentThread()
                    + "\t主题:" + record.topic()
                    + "\t偏移量:" + record.offset() + "\t分区:" + record.partition()
                    + "\t获取的消息:" + record.value());
        }
    }
}

ConsumerAutoMain.java
package com.lagou.kafka.demo.consumer;

public class ConsumerAutoMain {
    public static void main(String[] args) {
        KafkaConsumerAuto kafka_consumerAuto = new KafkaConsumerAuto();
        try {
            kafka_consumerAuto.execute();
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            kafka_consumerAuto.shutdown();
        }
    }
}