kafka的使用

anjingsi 1年前 ⋅ 862 阅读

kafka的使用

1、构建项目

1.1 pom引入依赖

这儿我使用的springboot的版本是2.6.5

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

1.2 配置

spring:
  kafka:
    bootstrap-servers: localhost:9092 # kafka集群信息,多个用逗号间隔
    producer:
      # 重试次数,设置大于0的值,则客户端会将发送失败的记录重新发送
      retries: 3
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      # 开启事务时,必须设置为all
      acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
      batch-size: 16384 #批量大小
      #transaction-id-prefix: transaction
      buffer-memory: 33554432 #缓冲存储大,32M
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      compression-type: gzip # 消息压缩:none、lz4、gzip、snappy,默认为 none。
      properties:
        linger:
          ms: 100 #提交延迟
        #        max.request.size: 12695150
        #        message.max.bytes: 32695150
        max.block.ms: 60000 #限制获取Metadata的最大阻塞时间(默认60000ms)
        partitioner:
          #          class: com.txcf.mq.kafka.FastPartitioner    #指定自定义分区器
          linger:
            ms: 1000  # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
          max:
            block:
              ms: 6000  # KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms
    consumer:
      group-id: testGroup #默认的消费组ID
      enable-auto-commit: true #是否自动提交offset
      auto-commit-interval: 100 #提交offset延时
      # 当kafka中没有初始offset或offset超出范围时将自动重置offset
      # none:如果没有为消费者找到先前的offset的值,即没有自动维护偏移量,也没有手动维护偏移量,则抛出异常
      # earliest:在各分区下有提交的offset时:从offset处开始消费;在各分区下无提交的offset时:从头开始消费
      # latest:在各分区下有提交的offset时:从offset处开始消费;在各分区下无提交的offset时:从最新的数据开始消费
      auto-offset-reset: latest
        # 这个参数定义了poll方法最多可以拉取多少条消息,默认值为500。如果在拉取消息的时候新消息不足500条,那有多少返回多少;如果超过500条,每次只返回500。
        # 这个默认值在有些场景下太大,有些场景很难保证能够在5min内处理完500条消息,
        # 如果消费者无法在5分钟内处理完500条消息的话就会触发reBalance,
        # 然后这批消息会被分配到另一个消费者中,还是会处理不完,这样这批消息就永远也处理不完。
      # 要避免出现上述问题,提前评估好处理一条消息最长需要多少时间,然后覆盖默认的max.poll.records参数
      # 注:需要开启BatchListener批量监听才会生效,如果不开启BatchListener则不会出现reBalance情况
      max-poll-records: 100 #单次拉取消息的最大条数
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties:
        # 两次poll之间的最大间隔,默认值为5分钟。如果超过这个间隔会触发reBalance
        max:
          poll:
            interval:
              ms: 600000
        # 当broker多久没有收到consumer的心跳请求后就触发reBalance,默认值是10s
        session:
          timeout:
            ms: 120000
        request:
          timeout:
            ms: 18000 # 消费请求的超时时间
    # 监听
    listener:
#      type: batch
      missing-topics-fatal: false # consumer listener topics 不存在时,启动项目就会报错
#       record:当每一条记录被消费者监听器ListenerConsumer处理之后提交
#       batch:当每一批poll()的数据被ListenerConsumer处理之后提交
#       time:当每一批poll()的数据被ListenerConsumer处理之后,距离上次提交时间大于TIME时提交
#       count:当每一批poll()的数据被ListenerConsumer处理之后,被处理record数量大于等于COUNT时提交
#       count_time:TIME或COUNT中有一个条件满足时提交
#       manual:当每一批poll()的数据被ListenerConsumer处理之后, 手动调用Acknowledgment.acknowledge()后提交
#       manual_immediate:手动调用Acknowledgment.acknowledge()后立即提交,一般推荐使用这种
#      ack-mode: record

2.先来简单的使用

2.1 生产者

@RestController
public class KafkaSimpleProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    @GetMapping("simple/{msg}")
    public void sendSimpleMessage(@PathVariable("msg") String msg) {
        kafkaTemplate.send("simple_topic", LocalDateTime.now()+"  "+msg);
    }
}

2.2 消费者

@Slf4j
@Component
public class KafkaConsumer {
    @KafkaListener(topics = {"simple_topic"})
    public void onNormalMessage(ConsumerRecord<String, Object> record) {
        log.info("简单消费:topic:{}  partition:{}   offset:{}  key:{}  msg:{}  timestamp:{}", record.topic(), record.partition(), record.offset(), record.key(), record.value(), record.timestamp());
    }
}

3. 生产者

3.1 带回调的生产者

kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功或失败时做补偿处理,有两种方式实现

    /**
     * 回调--方式1
     * @param msg
     */
    @GetMapping("callback1/{msg}")
    public void sendCallback1Message(@PathVariable("msg") String msg) {
        kafkaTemplate.send("simple_topic", LocalDateTime.now()+"  "+msg).addCallback(new SuccessCallback<SendResult<String, Object>>() {
            @Override
            public void onSuccess(SendResult<String, Object> result) {
                // 消息发送到的topic
                String topic = result.getRecordMetadata().topic();
                // 消息发送到的分区
                int partition = result.getRecordMetadata().partition();
                // 消息在分区内的offset
                long offset = result.getRecordMetadata().offset();
                log.info("发送回调消息成功: topic:{}  partition:{}   offset:{}",topic,partition,offset);
            }
        }, new FailureCallback() {
            @Override
            public void onFailure(Throwable ex) {
                log.info("发送回调消息失败:" + ex.getMessage());
            }
        });
    }

    /**
     * 回调--方式2
     * @param msg
     */
    @GetMapping("callback2/{msg}")
    public void sendCallback2Message(@PathVariable("msg") String msg) {
        kafkaTemplate.send("simple_topic", LocalDateTime.now()+"  "+msg).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.info("发送回调消息失败:" + ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                log.info("发送回调消息成功: topic:{}  partition:{}   offset:{}",result.getRecordMetadata().topic(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());
            }
        });
    }

3.2 监听器

Kafka提供了ProducerListener 监听器来异步监听生产者消息是否发送成功,我们可以自定义一个kafkaTemplate添加ProducerListener,当消息发送失败我们可以拿到消息进行重试或者把失败消息记录到数据库定时重试

@Slf4j
@Configuration
public class KafkaConfig {
    
    @Autowired
    ProducerFactory producerFactory;
    
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<String, Object>(producerFactory);
        kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
            @Override
            public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
                log.info("ProducerListener 发送成功  topic:{}  partition:{}  offset:{}  msg:{}",recordMetadata.topic(),recordMetadata.partition(),recordMetadata.offset(),producerRecord.value());
            }

            @Override
            public void onError(ProducerRecord<String, Object> producerRecord, @Nullable RecordMetadata recordMetadata, Exception e) {
                log.info("ProducerListener 发送失败  msg:{} errMsg:{}",producerRecord.value(),e.getMessage());
            }
        });
        return kafkaTemplate;
    }
}

当我们发送一条消息,先走 ListenableFutureCallback 回调在走ProducerListener回调

3.3 监听器

Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:

若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区; 若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区; patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition; 我们来自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区

public class FastPartitioner implements Partitioner {
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        //自定义分区规则,默认全部发送到0号分区
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

配置文件中增加spring.kafka.producer.properties.partitioner.class=xx.config.FastPartitioner

3.4 事务消息

@GetMapping("/transaction/{msg}")
public void sendTransactionMessage(@PathVariable("msg") String msg) {
    kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback<String, Object, Object>() {
        @Override
        public Object doInOperations(KafkaOperations<String, Object> operations) {
            operations.send("simple_topic", LocalDateTime.now()+"  "+msg + " test executeInTransaction");
            //后面出错消息就不会发出去
//                throw new RuntimeException("fail");
            return null;
        }
    });
    //后面出错消息也会发出去
     throw new RuntimeException("fail");
}

需要注意的是:

  • 需要配置spring.kafka.producer.transaction-id-prefix=tx_ #事务id前缀
  • 需要配置spring.kafka.producer.acks=tall

4. 消费者

4.1 异常处理

ConsumerAwareListenerErrorHandler 异常处理器,新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器。

    @Bean
    public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
        return new ConsumerAwareListenerErrorHandler() {
            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
                log.info("消费异常:" + message.getPayload());
                return null;
            }
        };
    }
    
    @KafkaListener(topics = {"simple_topic"},errorHandler = "consumerAwareErrorHandler")
    public void onNormalMessage(ConsumerRecord<String, Object> record) {
        log.info("简单消费:topic:{}  partition:{}   msg:{}",record.topic(),record.partition(),record.value());
        throw new RuntimeException("---------->");
    }

4.2 消息过滤

消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。

配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。

    @Autowired
    private ConsumerFactory consumerFactory;

    //消息过滤器
    @Bean
    public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        //被过滤的消息将被丢弃
        factory.setAckDiscarded(true);
        //消息过滤策略
        factory.setRecordFilterStrategy(new RecordFilterStrategy() {
            @Override
            public boolean filter(ConsumerRecord consumerRecord) {
                if (consumerRecord.value().toString().hashCode() % 2 == 0) {
                    return false;
                }
                return true;
            }
        });
        return factory;
    }
    
    @KafkaListener(topics = {"simple_topic"},errorHandler = "consumerAwareErrorHandler",containerFactory = "filterContainerFactory")
    public void onNormalMessage(ConsumerRecord<String, Object> record) {
        log.info("简单消费:topic:{}  partition:{}   offset:{}  key:{}  msg:{}  timestamp:{}", record.topic(), record.partition(), record.offset(), record.key(), record.value(), record.timestamp());
    }

4.3 定时启动、停止

默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢——使用KafkaListenerEndpointRegistry,下面我们就来实现:

  • 禁止监听器自启动;
  • 创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;

新建一个定时任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry 在Spring中已经被注册为Bean,直接注入,设置禁止KafkaListener自启动

    //消息过滤器
    @Bean
    public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        //被过滤的消息将被丢弃
        factory.setAckDiscarded(true);
        //禁止KafkaListener自启动
        factory.setAutoStartup(true);
        //消息过滤策略
        factory.setRecordFilterStrategy(new RecordFilterStrategy() {
            @Override
            public boolean filter(ConsumerRecord consumerRecord) {
                if (consumerRecord.value().toString().hashCode() % 2 == 0) {
                    return false;
                }
                return true;
            }
        });
        return factory;
    }
    
    //---------------------------------------------------------------------------------------------
    
    /**
     * @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
     * 而是会被注册在KafkaListenerEndpointRegistry中,
     * 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
     **/
    @Autowired
    private KafkaListenerEndpointRegistry registry;

    // 监听器
    @KafkaListener(id="timingConsumer",topics = "simple_topic",containerFactory = "filterContainerFactory")
    public void onMessage1(ConsumerRecord<?, ?> record){
       log.info("简单消费:topic:{}  partition:{}   msg:{}",record.topic(),record.partition(),record.value());
    }

    // 定时启动监听器
    @Scheduled(cron = "0 33 13 * * ? ")
    public void startListener() {
        System.out.println("启动监听器...");
        // "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
        if (!registry.getListenerContainer("timingConsumer").isRunning()) {
            registry.getListenerContainer("timingConsumer").start();
        }
        //registry.getListenerContainer("timingConsumer").resume();
    }

    // 定时停止监听器
    @Scheduled(cron = "0 34 13 * * ? ")
    public void shutDownListener() {
        System.out.println("关闭监听器...");
        registry.getListenerContainer("timingConsumer").pause();
    }

5. 消费者的常见示例

@Slf4j
@Component
public class KafkaConsumer {
    @KafkaListener(topics = {"simple_topic"})
    public void onSimpleMessage(ConsumerRecord<String, Object> record) {
        log.info("简单消费:topic:{}  partition:{}   offset:{}  key:{}  msg:{}  timestamp:{}", record.topic(), record.partition(), record.offset(), record.key(), record.value(), record.timestamp());
    }

    @KafkaListener(topics = {"simple_topic1", "TEST4_TOPIC"})
    public void topics(ConsumerRecord<String, Object> record) {
        log.info("多个消费者:topic:{}  partition:{}   offset:{}  key:{}  msg:{}  timestamp:{}", record.topic(), record.partition(), record.offset(), record.key(), record.value(), record.timestamp());
    }

    /**
     * 监听一个主题,且指定消费主题的哪些分区。
     * 参数详解:消费者组=TEST_GROUP;监听主题=TEST3_TOPIC;只消费的分区=0,1;消费者数量=2
     *
     * @param record
     */
    @KafkaListener(
            groupId = "TEST_GROUP",
            topicPartitions = {
                    @TopicPartition(topic = "TEST4_TOPIC", partitions = {"0", "1"})
            },
            concurrency = "2"
    )
    public void consumeByPattern(ConsumerRecord<String, String> record) {
        log.info("指定多个分区从哪个偏移量开始消费:topic:{}  partition:{}   offset:{}  key:{}  msg:{}  timestamp:{}", record.topic(), record.partition(), record.offset(), record.key(), record.value(), record.timestamp());
    }

}

全部评论: 0

    我有话说: