上云无忧 > 文档中心 > 天翼云分布式消息服务Kafka编译运行Demo Java工程
分布式消息服务Kafka
天翼云分布式消息服务Kafka编译运行Demo Java工程

文档简介:
编译运行Demo Java工程 Tips: 需要kafka-clients引入依赖: org.apache.kafka kafka-clients
*产品来源:中国电信天翼云。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

编译运行Demo Java工程

Tips:  需要kafka-clients引入依赖:


org.apache.kafka

kafka-clients


1.从控制台获取以下信息:

连接地址:

Topic名称:

消费组名称:

2. 在实例代码中替换以上信息即可实现消息。


点击启动后成功发送消息。

3. 同样在实例代码中替换以上信息即可消费消息。


如下图运行代码成功获取刚才发送的消息。


KafkaDemo.java 示例源代码:


package com.ctg.kafka.mgr.test;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.*;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class KafkaDemo {

/**
* The producer is thread safe and sharing a single producer instance across threads
* will generally be faster than having multiple instances.
*
* @param args
*/
public static void main(String args[]) throws Exception{
testPlainProducer();
// testPlainConsumer();
// 从管理控制台下载以下文件
//        System.setProperty("java.security.krb5.conf", "F:\\Downloads\\user\\krb5.conf");
//        System.setProperty("java.security.auth.login.config", "F:\\Downloads\\user\\kafka_client.conf");
//        System.setProperty("sun.security.krb5.debug", "false");
//        testProducer();
//        testConsumer();
}

private static void testPlainProducer() throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.90.139:8090");  //kafka server  ,192.168.1.159:8091
props.put("acks", "all");
props.put("retries", 1);
props.put("batch.size", 1684);
props.put("linger.ms", 0);
props.put("buffer.memory", 33554432); // buffer空间32M
props.put("request.timeout.ms", 1000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer producer = new KafkaProducer(props);
int index = 0;
while(true) {
String dvalue = "hello " ;
ProducerRecord record = new ProducerRecord<>("pps", "pps"+index++, dvalue);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata paramRecordMetadata, Exception paramException) {
if (paramRecordMetadata == null) {
System.out.println("paramRecordMetadata is null ");
paramException.printStackTrace();
return;
}
System.out.println("发送的消息信息 " + paramRecordMetadata.topic() + ", partition:" + paramRecordMetadata.partition());
}
});
TimeUnit.SECONDS.sleep(1);
}
}


private static void testPlainConsumer() throws InterruptedException {
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.0.27:8090");
properties.put("group.id", "topicGroup");
properties.put("enable.auto.commit", "true");
properties.put("auto.offset.reset", "earliest");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("topic2"));
while (true) {
ConsumerRecords records = consumer.poll(100);
records.forEach(record->{
String format = String.format("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
System.out.println(format);
});
TimeUnit.SECONDS.sleep(1000);
}
}

private static void testProducer() throws  Exception{
Properties props = new Properties();
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name", "ctg_kafka");
props.put("bootstrap.servers", "192.168.90.139:8092,192.168.90.41:8092,192.168.90.42:8092");  // 安全接入点地址
props.put("acks", "0");
props.put("retries", 3);
props.put("batch.size", 1684);
props.put("linger.ms", 100);
props.put("buffer.memory", 33554432); // buffer空间32M
//        props.put("request.timeout.ms", 1000);

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
int index = 0;

Producer producer = new KafkaProducer(props);
TimeUnit.SECONDS.sleep(2);
while(true) {
String dvalue = "hello " ;
ProducerRecord record = new ProducerRecord<>("ppx", "pps200"+index++, dvalue);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata paramRecordMetadata, Exception paramException) {
if (paramRecordMetadata == null) {
System.out.println("paramRecordMetadata is null ");
paramException.printStackTrace();
return;
}
System.out.println("发送的消息信息 " + paramRecordMetadata.topic() + ", partition:" + paramRecordMetadata.partition());
}
});
TimeUnit.SECONDS.sleep(1);
}
}

private static void testConsumer() throws InterruptedException {
Properties properties = new Properties();
properties.put("bootstrap.servers", "192.168.90.139:8092,192.168.90.42:8092");
properties.put("group.id", "ppxgroup");
properties.put("enable.auto.commit", "true");
properties.put("auto.offset.reset", "earliest");//earliest
//        properties.put("max.poll.records", 1);//earliest
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "GSSAPI");
properties.put("sasl.kerberos.service.name", "ctg_kafka");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("ppx"));
/* 读取数据,读取超时时间为100ms */
while (true) {
System.out.println("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
ConsumerRecords records = consumer.poll(100);
records.forEach(record->{
String format = String.format("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
System.out.println("================================="+format);
});
TimeUnit.SECONDS.sleep(1);
}
}
}

相似文档
  • 生产者接口 org.apache.kafka.clients.producer.KafkaProducer 1)发送消息,返回值为Future public Future send(ProducerRecord record); public Future send(ProducerRecord record, Callback callback);
  • 接口方法概述 (1)消费者接口 org.apache.kafka.clients.producer. KafkaConsumer 1) 拉取消息,返回值为ConsumerRecords ConsumerRecords poll(long timeout) 创建消费者 (1)消费者实例化
  • 本文主要介绍消息队列 Kafka 发布者的最佳实践,从而帮助您更好的使用该产品。 文中的最佳实践基于消息队列 Kafka 的 Java 客户端;对于其它语言的客户端,其基本概念与思想是通用的,但实现细节可能有差异,仅供参考。
  • Kafka 的消费者示例代码片段如下: Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.90.139:8090,192.168.90.41:8090,192.168.90.42:8090"); properties.put("group.id", "ppsgroup"); properties.put("enable.auto.commit", "true");
  • 分布式消息Kafka使用SASL 认证协议来实现身份验证的能力,加密的主题需要经过身份校验才能正常地消费和生产消息。 创建用户并配置用户权限 进入应用用户管理界面,新建用户,并给用户添加主题和消费组的权限,并且下载密钥。详细操作可以查看用户指南。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部