Skip to main content
  1. Posts/

Springboot中使用kafka

·306 words·2 mins
Blogs Tutorials Kafka Springboot Middleware
Gwen0x4c3
Author
Gwen0x4c3
java n go is my mother tongue

首先说明,本人之前没用过zookeeper、kafka等,尚硅谷十几个小时的教程实在没有耐心看,现在我也不知道分区、副本之类的概念。用kafka只是听说他比RabbitMQ快,我也是昨天晚上刚使用,下文中若有讲错的地方或者我的理解与它的本质有偏差的地方请包涵。

此文背景的环境是windows,linux流程也差不多。

  • 解压在D盘下或者什么地方,注意不要放在桌面等绝对路径太长的地方

  • 打开config中的 zookeeper.properties,自己选择性修改clientPort,不想改也行

  • 修改config中的 server.properties

1.取消 advertised.listeners 注释,修改kafka地址与端口。如果要集群部署,broker.id不能重复,1号机是0,2号机是1这样的。

2.修改 zookeeper.connect 为你上面zookeeper.properties中配置的地址

  • 配置好了,尝试开启kafka。

来到bin/windows,shift右键在此处打开cmd,输入 zookeeper-server-start.bat ../../config/zookeeper.properties,等待其启动。(注意你config的路径不要写错)

启动完再开一个cmd,输入 kafka-server-start.bat ../../config/server.properties

如果在此环节出现问题,请查看logs中的日志,面向csdn。

我遇到的问题是 他显示什么什么路径太长了,问题的原因是我把他放桌面上了。

  • 新建springboot项目,pom中添加依赖
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.28</version>
        </dependency>
    </dependencies>
  • 配置application.yml,写启动类,controller,创建User类,创建consumer

application.yml

    spring:
      application:
        name: kafka
      kafka:
        bootstrap-servers: localhost:9092 #这个是你server.properties中配置的
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          group-id: test-consumer-group #这个去config/consumer.properties中查看和修改
                                    # 不过好像不一样也不影响?
    server:
      port: 8001

controller

    @RestController
    public class ProducerController {
        @Autowired
        KafkaTemplate<String, String> kafka;
    
        @RequestMapping("register")
        public String register(User user) {
            String message = JSON.toJSONString(user);
            System.out.println("接收到用户信息:" + message);
            kafka.send("register", message);
            //kafka.send(String topic, @Nullable V data) {
            return "OK";
        }
    }

user

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class User implements Serializable {
    
        private String id;
    
        private String name;
    
        private Integer age;
    }

consumer

    @Configuration
    public class Consumer {
    
        @KafkaListener(topics = "register")
        public void consume(String message) {
            System.out.println("接收到消息:" + message);
            User user = JSON.parseObject(message, User.class);
            System.out.println("正在为 " + user.getName() + " 办理注册业务...");
            System.out.println("注册成功");
        }
    }

此时启动springboot,然而报错了

    org.springframework.context.ApplicationContextException: 
        Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry';
    
    nested exception is java.lang.IllegalStateException:
         Topic(s) [register] is/are not present and missingTopicsFatal is true

为什么呢?

请检查zookeeper和kafka的cmd上有没有他们启动失败的消息,如果有就重新启动下,记得先开zookeeper再开kafka。

然后确认你的kafka上有没有"register"这个topic,此时我是没有的,而consumer又加了 @KafkaListener(topics = "register") 注解,于是他就启动失败了。

有两种解决方式:

1.比较傻X的方式,先将@KafkaListener注释掉,启动springboot后访问localhost:8001/register,他send的时候就会自行创建topic,再取消注释重新启动就可以了。

2.cmd方式:输入 kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic register

然后我们再启动,已经启动成功了。访问 localhost:8001/register?name=JamesBond&age=55

我们可以看到数据已经成功送到那里了。

然后来测试一下速度

    @RequestMapping("test")
    public String test() {
        System.out.println("发送开始" + System.currentTimeMillis() % 10000);
        for (int i = 0; i < 1000; i++) {
            kafka.send("test", JSON.toJSONString(new User((1289312+i)+"",
                    "User" + i, (int)(Math.random() * 100), info)));
        }
        System.out.println("发送结束" + System.currentTimeMillis() % 10000);
        return "OK";
    }
    
    @KafkaListener(topics = "test")
    public void test(String message) {
        System.out.println("--" + System.currentTimeMillis() % 10000 + "--");
    }

console:

发送开始1267
--1384--
--1384--
...
--1715--
--1715--
发送结束1715
--1715--
--1715--
...
--1734--

对比RabbitMQ:

发送开始5692
--5766--
--5766--
...
--5973--
--5974--
发送结束5976
--5977--
--5977--
...
--6456--

kafka:

发送结束 - 发送开始=448ms

接收结束 - 接收开始=350ms

整体耗时: 467ms

rabbit:

发送结束 - 发送开始=284ms

接收结束 - 接收开始=690ms

整体耗时: 764ms

OK既然我会用了 我就去学一下kafka基本知识了