[toc]

本篇在windows下演示,不严谨测试,在Linux下操作一样!(虚拟机三台太卡,所以在windows下测试学习)

一、前言

前边我们知道了,kafka的架构运行原理,生产者,消费者以及topic主题之间的通信和确保数据的安全。

当我们创建了一个主题topic,之前的做法是命令形式,进行发送数据

bin/kafka-console-producer.sh --broker-list master:9092 --topic first

那么代码中,怎么实现呢?所以引出了本篇探讨主题:maven整合kafka

二、生产者发送流程

Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。
main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。
image.png
相关参数:
batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据。
linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。

三、项目准备

1. 创建项目kafka_demo

image.png

2. 修改pom文件

添加kafka和junit依赖包

<dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

3. 创建测试类

在test包中创建测试类,并编写代码
需要用到的类:

  1. KafkaProducer:需要创建一个生产者对象,用来发送数据
  2. ProducerConfig:获取所需的一系列配置参数
  3. ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象
@Test
    public void connectTest(){

        Properties props = new Properties();
        //kafka 集群,broker-list
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        //重试次数
        props.put("retries", 1);
        //批次大小
        props.put("batch.size", 16384);
        //等待时间
        props.put("linger.ms", 1);
        //RecordAccumulator 缓冲区大小
        props.put("buffer.memory", 33554432);
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        producer.send(new ProducerRecord<String, String>("first", "producerTest"));

        producer.close();
    }

参数解读:

  1. bootstrap.servers:连接zookeeper配置
  2. acks:发送过程确保数据是否丢失凭证,有0,1,-1(all)
  3. retries:发送失败时,客户端进行重试
  4. batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据。
  5. linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。
  6. buffer.memory:缓存区RecordAccumulator容量大小
  7. key.serializer:key值得序列化实现类

image.png
image.png

四、生产者发送函数测试

1. 无回调函数API测试

@Test
    public void connect2Test(){

        Properties props = new Properties();
        //kafka 集群,broker-list
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        //重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        //批次大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //等待时间
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //RecordAccumulator 缓冲区大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        producer.send(new ProducerRecord<String, String>("first", "configTest"));

        producer.close();
    }

image.png

2. 带回调函数API测试

@Test
    public void connect2Test(){

        Properties props = new Properties();
        //kafka 集群,broker-list
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        //重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        //批次大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //等待时间
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //RecordAccumulator 缓冲区大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        producer.send(new ProducerRecord<String, String>("first",
                "learn-kafka-callback"), new Callback() {
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (null != recordMetadata){
                    System.out.println("success->" + recordMetadata.offset());
                }else {
                    e.printStackTrace();
                }
            }
        });

        producer.close();
    }

image.png

五、生产者发送partition策略

还记得分区的原则吗?
传送门:https://liudongdong.top/archives/kafkasi-kafka-jia-gou-yuan-li--shang-pian-

image.png

0. 代码提取

重复了几遍的配置,差不多也熟悉了,下边测试就进行了代码提取,把配置和关闭进行提取,我们只关注测试用例就行!

package com.learn.demo;

import org.apache.kafka.clients.producer.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Properties;

public class kafkaDemoTest2 {

    KafkaProducer<String, String> producer = null;


    @Before
    public void beforeKafkaTest(){
        Properties props = new Properties();
        //kafka 集群,broker-list
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        //重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        //批次大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //等待时间
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //RecordAccumulator 缓冲区大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<String, String>(props);
    }


    /**
     * 生产者:测试无参发送
     */
    @Test
    public void kafkaTest(){
        producer.send(new ProducerRecord<String, String>("first",
                "learn-kafka2"));
    }

    @After
    public void afterKafkaTest(){
        if (null != producer){
            producer.close();
        }
    }

}

1. 指定分区,进行发送数据

    /**
     * 生产者:指定分区
     */
    @Test
    public void kafkaPro1Test(){
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first",0, "learn-kafka", Integer.toString(i)));
        }
    }

image.png

2. 指定key进行hash,进行发送数据

    /**
     * 生产者:指定key进行hash
     */
    @Test
    public void kafkaPro2Test(){
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first","learn-kafka", Integer.toString(i)));
        }
    }

image.png

3. 随机发送

先看看源码,路径为:KafkaProducer类下->doSend方法里->partition;
进入partition方法下的partitioner.partition()方法。
最后在DefaultPartitioner下的partition方法
源码如下可以看的下边的随机生成值counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
第一次随机生成,之后累计!
image.png

image.png

    /**
     * 生产者:第一次随机发送,之后确认
     */
    @Test
    public void kafkaPro3Test(){
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i)));
        }
    }

image.png

4. 自定义分区策略

除了上述三种默认的策略,当然还可以自己去实现分区策略

创建一个自定义类,实现Partitioner接口

package com.learn.config;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class MyPartition implements Partitioner {


    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        // todo 自定义实现策略

        return 0;
    }

    public void close() {

    }

    public void configure(Map<String, ?> map) {

    }
}

在使用自定义的实现类时,需要告诉配置文件,如下测试用例


    @Test
    public void connect3Test(){

        Properties props = new Properties();
        //kafka 集群,broker-list
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        //重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        //批次大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //等待时间
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //RecordAccumulator 缓冲区大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        // 自定义分区
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.learn.config.MyPartition");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        producer.send(new ProducerRecord<String, String>("first", "myPartitionTest"));

        producer.close();
    }

六、生产者同步发送API

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。
由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同步发送的效果,只需在调用 Future 对象的 get 方发即可。

    /**
     * 生产者:测试同步发送
     */
    @Test
    public void kafkaSyncTest()throws Exception{
        producer.send(new ProducerRecord<String, String>("first",
                "learn-kafka-sync")).get();
    }

image.png

整体测试代码

package com.learn.demo;

import org.apache.kafka.clients.producer.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Properties;

public class kafkaDemoTest2 {

    KafkaProducer<String, String> producer = null;


    @Before
    public void beforeKafkaTest(){
        Properties props = new Properties();
        //kafka 集群,broker-list
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        //重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        //批次大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //等待时间
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //RecordAccumulator 缓冲区大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<String, String>(props);
    }


    /**
     * 生产者:测试无参发送
     */
    @Test
    public void kafkaTest(){
        producer.send(new ProducerRecord<String, String>("first",
                "learn-kafka2"));
    }

    /**
     * 生产者:测试无参发送
     */
    @Test
    public void kafka2Test(){
        producer.send(new ProducerRecord<String, String>("first",
                "learn-kafka-callback"), new Callback() {
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (null != recordMetadata){
                    System.out.println("success->" + recordMetadata.offset());
                }else {
                    e.printStackTrace();
                }
            }
        });
    }





    /**
     * 生产者:指定分区
     */
    @Test
    public void kafkaPro1Test(){
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first",0, "learn-kafka", Integer.toString(i)));
        }
    }

    /**
     * 生产者:指定key进行hash
     */
    @Test
    public void kafkaPro2Test(){
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first","learn-kafka", Integer.toString(i)));
        }
    }

    /**
     * 生产者:第一次随机发送,之后确认
     */
    @Test
    public void kafkaPro3Test(){
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first", Integer.toString(i)));
        }
    }


    /**
     * 生产者:测试同步发送
     */
    @Test
    public void kafkaSyncTest()throws Exception{
        producer.send(new ProducerRecord<String, String>("first",
                "learn-kafka-sync")).get();
    }

    @After
    public void afterKafkaTest(){
        if (null != producer){
            producer.close();
        }
    }

}

Q.E.D.


只有创造,才是真正的享受,只有拚搏,才是充实的生活。