Spring Boot之Kafka的安装使用配置与测试

  |  

使用Spring Boot 集成Kafka的时候需要安装kafka,那时安装的时候忘记了记录,也忙,今天刚好又安装了一遍kafka,特此记录下。

安装

官方安装文档:http://kafka.apache.org/quickstart

1、kafka 需要java环境;
2、kafka 最新版本内置了 zookeeper,所以不需要安装zookeeper;
3、下载kafka最新版本,点击前往下载,因为下载的是tgz文件,所以不需要安装,解压到相应的地方就可以了。
4、bin 目录下放的是启动kafka的文件,conf目录下放的是kafka的各种配置文件。

运行

简单单节点搭建测试,不需要修改任何配置文件,只需要知道 zookeeper的默认端口是2181,生产者的默认端口是9092。在kafka的目录运行。例如:D:\kafka\kafka_2.12-2.2.0

启动ZooKeeper服务

1
bin\windows\zookeeper-server-start.bat config\zookeeper.properties

启动Kafka server

1
bin\windows\kafka-server-start.bat config\server.properties

测试

创建个订阅

1
bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

发送消息

1
bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

测试代码

测试先启动生产者创建订阅,然后再启动消费者消费订阅

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerTest {

private void execMsgSend() throws Exception {
Properties props = new Properties();
// props.put("bootstrap.servers", "10.74.0.174:9092");
props.put("bootstrap.servers", "10.187.227.244:9092");
props.put("acks", "1");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> procuder = new KafkaProducer<String, String>(props);

String topic = "test";
for (int i = 1; i <= 10; i++) {
String value = " this is another message_" + i;
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, i + "", value);
procuder.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
}
});
System.out.println(i + " ---- success");
Thread.sleep(1000);
}
System.out.println("send message over.");
procuder.close();
}

public static void main(String[] args) throws Exception {
ProducerTest test1 = new ProducerTest();
test1.execMsgSend();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ConsumerTest {

@SuppressWarnings("resource")
public static void main(String[] s) {
Properties props = new Properties();
// props.put("bootstrap.servers", "10.74.0.174:9092");
props.put("bootstrap.servers", "10.187.227.244:9092");
props.put("group.id", "commonLogList");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
System.out.println("poll start...");
ConsumerRecords<String, String> records = consumer.poll(100);
int count = records.count();
System.out.println("the numbers of topic:" + count);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
}
}

远程配置

解决办法

  • 这里需要注意的是,因为是远程连接服务器,所以要看服务器的防火墙是否针对端口9092(默认端口)打开的,刚开始弄了很长时间,我一直没弄好的原因是因为中午我重启了服务器,导致防火墙又打开了。
  • 如果防火墙是正常的,就需要改变Kafka的配置:在/config/service.properties中,添加上一句host.name=10.74.0.152

这主要是因为,kafka默认是监听localhost的端口,如果不配置新端口名的话,就解析监听不到消息。

运行命令处理

为了启动方便,编写下面运行命令,方便启动

Start ZooKeeper Server

1
2
3
4
5
6
title ZooKeeper服务 2181
@ echo.
echo 开始启动ZooKeeper服务...
call bin\windows\zookeeper-server-start.bat config\zookeeper.properties
@ echo.
echo 启动ZooKeeper服务完成

Start Kafka Server

1
2
3
4
5
6
title Kafka服务 9092
@ echo.
echo 开始启动Kafka服务...
call bin\windows\kafka-server-start.bat config\server.properties
@ echo.
echo 启动Kafka服务完成

Test Kafka

1
2
3
4
5
6
7
8
9
10
11
12
title 测试Kafka服务状态
@ echo.
echo 创建Kafka订阅服务...
call bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
@ echo.
echo 创建Kafka订阅服务完成
@ echo.
echo 测试订阅消息...
call bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
@ echo.
echo 测试订阅消息完成
pause

资料

个人微信公众号技术交流QQ群
文章目录
  1. 1. 安装
  2. 2. 运行
    1. 2.1. 启动ZooKeeper服务
    2. 2.2. 启动Kafka server
  3. 3. 测试
    1. 3.1. 创建个订阅
    2. 3.2. 发送消息
  4. 4. 测试代码
    1. 4.1. 生产者
    2. 4.2. 消费者
  5. 5. 远程配置
  6. 6. 运行命令处理
    1. 6.1. Start ZooKeeper Server
    2. 6.2. Start Kafka Server
    3. 6.3. Test Kafka
  7. 7. 资料