Spring Boot之Kafka的安装使用配置与测试
使用Spring Boot 集成Kafka的时候需要安装kafka,那时安装的时候忘记了记录,也忙,今天刚好又安装了一遍kafka,特此记录下。
安装
官方安装文档:http://kafka.apache.org/quickstart
1、kafka 需要java环境;
2、kafka 最新版本内置了 zookeeper,所以不需要安装zookeeper;
3、下载kafka最新版本,点击前往下载,因为下载的是tgz文件,所以不需要安装,解压到相应的地方就可以了。
4、bin 目录下放的是启动kafka的文件,conf目录下放的是kafka的各种配置文件。
运行
简单单节点搭建测试,不需要修改任何配置文件,只需要知道 zookeeper的默认端口是2181,生产者的默认端口是9092。在kafka的目录运行。例如:D:\kafka\kafka_2.12-2.2.0
启动ZooKeeper服务
1
| bin\windows\zookeeper-server-start.bat config\zookeeper.properties
|
启动Kafka server
1
| bin\windows\kafka-server-start.bat config\server.properties
|
测试
创建个订阅
1
| bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
|
发送消息
1
| bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
|
测试代码
测试先启动生产者创建订阅,然后再启动消费者消费订阅
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;
public class ProducerTest {
private void execMsgSend() throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "10.187.227.244:9092"); props.put("acks", "1"); props.put("retries", 0); props.put("batch.size", 16384); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> procuder = new KafkaProducer<String, String>(props);
String topic = "test"; for (int i = 1; i <= 10; i++) { String value = " this is another message_" + i; ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, i + "", value); procuder.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset()); } }); System.out.println(i + " ---- success"); Thread.sleep(1000); } System.out.println("send message over."); procuder.close(); }
public static void main(String[] args) throws Exception { ProducerTest test1 = new ProducerTest(); test1.execMsgSend(); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ConsumerTest {
@SuppressWarnings("resource") public static void main(String[] s) { Properties props = new Properties(); props.put("bootstrap.servers", "10.187.227.244:9092"); props.put("group.id", "commonLogList"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test")); while (true) { System.out.println("poll start..."); ConsumerRecords<String, String> records = consumer.poll(100); int count = records.count(); System.out.println("the numbers of topic:" + count); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); } } }
|
远程配置
解决办法
- 这里需要注意的是,因为是远程连接服务器,所以要看服务器的防火墙是否针对端口9092(默认端口)打开的,刚开始弄了很长时间,我一直没弄好的原因是因为中午我重启了服务器,导致防火墙又打开了。
- 如果防火墙是正常的,就需要改变Kafka的配置:在
/config/service.properties
中,添加上一句host.name=10.74.0.152
这主要是因为,kafka默认是监听localhost的端口,如果不配置新端口名的话,就解析监听不到消息。
运行命令处理
为了启动方便,编写下面运行命令,方便启动
Start ZooKeeper Server
1 2 3 4 5 6
| title ZooKeeper服务 2181 @ echo. echo 开始启动ZooKeeper服务... call bin\windows\zookeeper-server-start.bat config\zookeeper.properties @ echo. echo 启动ZooKeeper服务完成
|
Start Kafka Server
1 2 3 4 5 6
| title Kafka服务 9092 @ echo. echo 开始启动Kafka服务... call bin\windows\kafka-server-start.bat config\server.properties @ echo. echo 启动Kafka服务完成
|
Test Kafka
1 2 3 4 5 6 7 8 9 10 11 12
| title 测试Kafka服务状态 @ echo. echo 创建Kafka订阅服务... call bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test @ echo. echo 创建Kafka订阅服务完成 @ echo. echo 测试订阅消息... call bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092 @ echo. echo 测试订阅消息完成 pause
|
资料
个人微信公众号 | 技术交流QQ群 |
|
|