java使用kafka,kafka 指定分区发送
00-1010前言业务场景业务实现不指定分区,指定分区主题分区初始化并配置生产者分区发送方案消费者
目录
Kafka是一个非常流行的分布式消息队列,常用于微服务之间的异步通信、业务解耦等场景。Kafka的性能非常强大,但是单个微服务的吞吐性能是有上限的。我们会使用分布式微服务,多消费者多生产者处理数据,保证性能也可以根据业务量水平扩展。对于同一个微服务的多个实例,输入和输出的主题是相同的。这时候我们可以用卡夫卡分区消费来解决这个问题。
00-1010我们开发了一个物联网系统。大量设备连接到平台上实时发送数据,包括秒级数据和分钟级数据等。处理流程包括访问、处理和存储。kafka用于在这三个模块之间传输数据。数据处理模块包含多个微服务,单个数据会被多次处理,有些服务耗时较长,导致单个服务在高频接收数据时达不到吞吐量平衡,所以进行这些服务。
前言
00-1010我们给卡夫卡发消息的时候,如果不指定分区,就不需要手动创建主题。在没有主题的情况下发送时,kafka将自动创建一个分区为1的主题,如下所示:
@ service public class product service { @ Autowired private Kafka template Kafka template;public void send(String msg,String topic){ kafkatemplate . send(topic,msg);}}
业务场景
00-1010指定要发送的分区时,如果没有配置主题分区数,指定了0的分区,会提示该分区不存在。这时候就需要提前创建题目和分区了。
手动,在服务启动之前,使用kafka工具手动创建主题不推荐 x.
当服务启动时,使用KafkaClient自动创建推荐 .
/* * *初始化多分区主题基于Spring boot 2 */@ Component Public Void Topic Runner实现Application Runner { @ autowired private admin client admin client;@ override public void run(application arguments args)throws exception {//省略通过配置文件读取的自定义配置的主题名称和分区号.//关键话题V分区数mapstring,整数话题分区图=new hashmap();对于(图。EntryString,Integer e : topicpartitionmap . entry set()){ create topic(e . getkey(),e . getvalue());} } public void createTopic(String topic,int partition){ NewTopic NewTopic=new NewTopic(topic,partition);admin client . create topics(lists . newarraylist(new topics));}}/* * *配置类引用基于springboot2 *如果只发送普通的单个消息,则不需要将此配置添加到项目*/@配置公共类Kafka Config { @ value( $ { spring . Kafka . bootstrap-Servers } )私有字符串服务器中;@ Bean public admin client admin client(){ return admin client . create(kafkaAdmin()。getConfig());} @ Bean public KafkaAdmin KafkaAdmin(){
Map<String, Object> props = Maps.newHashMap(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers); return new KafkaAdmin(props); }}
生产者分区发送方案
上面讲到如何初始化分区topic,这时候我们的kafka环境已经准备好了,我们先使用TopicInitRunner为我们创建一个名称为 partition-topic 分区数为三,现在讲一讲如何均匀的讲消息发送的每个分区上,如何保证多消费者实例是负载均衡的,具体方案如下:
1.因为每条消息都是设备上传的,都会有设备id,先给每个设备生成一个自增号,这样1000个设备,每个设备就会有0到999的自增号,放到缓存中,每次根据消息中的设备id获取到该设备的自增号2.使用自增号对分区数进行取模操作,代码实现如下:
public class ProductService { /** * data为需要发送的数据 */ public void partitionSend(String topic, int partition, JSONObject data) { // 获取设备id String deviceId = data.getString("deviceId"); // 获取自增数 如果是新设备会创建一个并放入缓存中 int inc = getDeviceInc(deviceId); // 如果分区数为3 设备自增id为1 取模结果为1 就是发送到1分区 这样1000个设备就可以保证每个分区发送数据量是1000 / 3 int targetPartition = Math.floorMod(inc, partition); // 分区发送时候 需要指定一个唯一k 可以使用uuid或者百度提供的雪花算法获取id 字符串即可 kafkaTemplate.send(topic, partition, getUuid(), data.toJSONString()); }}
消费者
我们讲到消费者使用分布式部署,一个微服务有多个实例,我们只需要按照服务监听的topic分区数创建对应数目的服务实例即可,这样kafka就会自动分配对应分区的数据到每个实例。
我们采取批量消费,进一步提高服务吞吐性能,消费及配置代码如下,配置文件参考springbootkafka配置即可,主要设计kafka服务配置,消费及生产配置,比较核心的是
@Componentpublic class DataListener { @Autowired private MongoTemplate mongoTemplate; /** * 站点报文监听消费 * * @param records */ @KafkaListener(topics = "partition-topic", containerFactory = "batchConsumerFactory") public void iotSiteHistoryMessageConsumer(List<ConsumerRecord<String, String>> records) { } /** * 消费者配置 */ @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = Maps.newHashMap(); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } /** * 批量消费配置 */ @Bean public KafkaListenerContainerFactory batchConsumerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); factory.setBatchListener(true); return factory; }}
到此这篇关于Java Kafka分区发送及消费实战的文章就介绍到这了,更多相关Kafka分区发送及消费内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。