上云无忧 > 文档中心 > 腾讯云消息队列 CKafka 实战教程 - Schema Registry 接入 CKafka
消息队列 CKafka
腾讯云消息队列 CKafka 实战教程 - Schema Registry 接入 CKafka

文档简介:
无论是使用传统的 Avro API 自定义序列化类与反序列化类,还是使用 Twitter 的 Bijection 类库实现 Avro 的序列化与反序列化,两种方法有相同的缺点:在每条 Kafka 记录里都嵌入了 Schema,从而导致记录的大小成倍增加。但是不管怎样,在读取记录时仍然需要用到整个 Schema,所以要先找到 Schema。
*此产品及展示信息均由腾讯云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠
无论是使用传统的 Avro API 自定义序列化类与反序列化类,还是使用 Twitter 的 Bijection 类库实现 Avro 的序列化与反序列化,两种方法有相同的缺点:在每条 Kafka 记录里都嵌入了 Schema,从而导致记录的大小成倍增加。但是不管怎样,在读取记录时仍然需要用到整个 Schema,所以要先找到 Schema。 CKafka 提供了数据共用一个 Schema 的方法:将 Schema 中的内容注册到 Confluent Schema Registry,Kafka Producer 和 Kafka Consumer 通过识别 Confluent Schema Registry 中的 schema 内容进行序列化和反序列化。

前提条件

下载 Download JDK 8
下载 Confluent oss 4.1.1
创建实例

操作步骤

步骤1:获取实例接入地址并开启自动创建 Topic

1. 登录 CKafka 控制台
2. 在左侧导航栏选择实例列表,单击实例的“ID”,进入实例基本信息页面。
3. 在实例的基本信息页面的接入方式模块,可获取实例的接入地址。

4. 自动创建 Topic模块开启自动创建 Topic。
说明
启动 oss 会创建 schemas 主题,所以实例中需要开启自动创建主题。

步骤2:准备 Confluent 配置

1. 修改 oss 配置文件中的 server 地址等信息。 配置信息如下:
		
kafkastore.bootstrap.servers=PLAINTEXT://xxxx
kafkastore.topic=schemas
debug=true
说明
bootstrap.servers:接入网络,在 CKafka 控制台 的实例详情页面接入方式模块的网络列复制。

2. 执行如下命令启动 Schema Registry。
		
bin/schema-registry-start etc/schema-registry/schema-registry.properties
运行结果如下:

步骤3:收发消息

现有 schema 文件,其中内容如下:
		
{
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
1. 注册 schema 到对应 Topic(注册 Topic 名为 test)。 下面的脚本是直接在 Schema Registry 部署的环境中使用 curl 命令调用对应 API 实现注册的一个示例:

		
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \

--data '{"schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\":

\"id\", \"type\": \"int\"}, {\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}"}' \

http://127.0.0.1:8081/subjects/test/versions

2. Kafka Producer 发送数据:
		
package schemaTest;
import java.util.Properties;
import java.util.Random;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class SchemaProduce {
public static final String USER_SCHEMA = "{\"type\": \"record\", \"name\": \"User\", " +
"\"fields\": [{\"name\": \"id\", \"type\": \"int\"}, " +
"{\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"age\", \"type\": \"int\"}]}";
public static void main(String[] args) throws Exception {
Properties props = new Properties();
// 添加CKafka实例的接入地址
props.put("bootstrap.servers", "xx.xx.xx.xx:xxxx");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 使用 Confluent 实现的 KafkaAvroSerializer
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
// 添加 schema 服务的地址,用于获取 schema
props.put("schema.registry.url", "http://127.0.0.1:8081");
Producer<String, GenericRecord> producer = new KafkaProducer<>(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Random rand = new Random();
int id = 0;
while(id < 100) {
id++;
String name = "name" + id;
int age = rand.nextInt(40) + 1;
GenericRecord user = new GenericData.Record(schema);
user.put("id", id);
user.put("name", name);
user.put("age", age);
ProducerRecord<String, GenericRecord> record = new ProducerRecord<>("test", user);
producer.send(record);
Thread.sleep(1000);
}
producer.close();
}
}
运行一段时间后,在 CKafka 控制台topic 管理页面,选择对应的 Topic ,单击更多 > 消息查询,查看刚刚发送的消息。

3. Kafka Consumer 消费数据:
		
package schemaTest;
import java.util.Collections;
import java.util.Properties;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class SchemaProduce {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "xx.xx.xx.xx:xxxx"); //CKafka实例的接入地址
props.put("group.id", "schema");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 使用Confluent实现的KafkaAvroDeserializer
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
// 添加schema服务的地址,用于获取schema
props.put("schema.registry.url", "http://127.0.0.1:8081");
KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("test"));
try {
while (true) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(10);
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord user = record.value();
System.out.println("value = [user.id = " + user.get("id") + ", " + "user.name = "
+ user.get("name") + ", " + "user.age = " + user.get("age") + "], "
+ "partition = " + record.partition() + ", " + "offset = " + record.offset());
}
}
} finally {
consumer.close();
}
}
}
CKafka 控制台Consumer Group页面,选择 schema消费组名称,在主题名称输入 Topic 名称,单击查看消费者详情,查看消费详情。

启动消费者进行消费,下图为消费日志截图:

相似文档
  • Spark Streaming 是 Spark Core 的一个扩展,用于高吞吐且容错地处理持续性的数据,目前支持的外部输入有 Kafka、Flume、HDFS/S3、Kinesis、Twitter 和 TCP socket。
  • Apache Flume 是一个分布式、可靠、高可用的日志收集系统,支持各种各样的数据来源(如 HTTP、Log 文件、JMS、监听端口数据等),能将这些数据源的海量日志数据进行高效收集、聚合、移动,最后存储到指定存储系统中(如 Kafka、分布式文件系统、Solr 搜索服务器等)。
  • Kafka Connect 目前支持两种执行模式:standalone 和 distributed。 以 standalone 模式启动 connect: 通过以下命令以 standalone 模式启动 connect:
  • Storm 是一个分布式实时计算框架,能够对数据进行流式处理和提供通用性分布式 RPC 调用,可以实现处理事件亚秒级的延迟,适用于对延迟要求比较高的实时数据处理场景。
  • Logstash 是一个开源的日志处理工具,可以从多个源头收集数据、过滤收集的数据并对数据进行存储作为其他用途。 Logstash 灵活性强,拥有强大的语法分析功能,插件丰富,支持多种输入和输出源。Logstash 作为水平可伸缩的数据管道,与 Elasticsearch 和 Kibana 配合,在日志收集检索方面功能强大。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部