文档简介:
编译运行Demo Java工程
Tips: 需要kafka-clients引入依赖:
org.apache.kafka
kafka-clients
1.从控制台获取以下信息:
连接地址:
Topic名称:
消费组名称:
2. 在实例代码中替换以上信息即可实现消息。
点击启动后成功发送消息。
3. 同样在实例代码中替换以上信息即可消费消息。
如下图运行代码成功获取刚才发送的消息。
KafkaDemo.java 示例源代码:
package com.ctg.kafka.mgr.test;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class KafkaDemo {
/**
* The producer is thread safe and sharing a single producer instance across threads
* will generally be faster than having multiple instances.
*
* @param args
*/
public static void main(String args[]) throws Exception{
testPlainProducer();
// testPlainConsumer();
// 从管理控制台下载以下文件
// System.setProperty("java.security.krb5.conf", "F:\\Downloads\\user\\krb5.conf");
// System.setProperty("java.security.auth.login.config", "F:\\Downloads\\user\\kafka_client.conf");
// System.setProperty("sun.security.krb5.debug", "false");
// testProducer();
// testConsumer();
}
private static void testPlainProducer() throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.90.139:8090"); //kafka server ,192.168.1.159:8091
props.put("acks", "all");
props.put("retries", 1);
props.put("batch.size", 1684);
props.put("linger.ms", 0);
props.put("buffer.memory", 33554432); // buffer空间32M
props.put("request.timeout.ms", 1000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer
int index = 0;
while(true) {
String dvalue = "hello " ;
ProducerRecord record = new ProducerRecord<>("pps", "pps"+index++, dvalue);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata paramRecordMetadata, Exception paramException) {
if (paramRecordMetadata == null) {
System.out.println("paramRecordMetadata is null ");
paramException.printStackTrace();
return;
}
System.out.println("发送的消息信息 " + paramRecordMetadata.topic() + ", partition:" + paramRecordMetadata.partition());
}
});
TimeUnit.SECONDS.sleep(1);
}
}
private static void testPlainConsumer() throws InterruptedException {
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.0.27:8090");
properties.put("group.id", "topicGroup");
properties.put("enable.auto.commit", "true");
properties.put("auto.offset.reset", "earliest");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer