上云无忧 > 文档中心 > 腾讯云消息队列 CKafka 实战教程 - Flume 接入 CKafka
消息队列 CKafka
腾讯云消息队列 CKafka 实战教程 - Flume 接入 CKafka

文档简介:
Apache Flume 是一个分布式、可靠、高可用的日志收集系统,支持各种各样的数据来源(如 HTTP、Log 文件、JMS、监听端口数据等),能将这些数据源的海量日志数据进行高效收集、聚合、移动,最后存储到指定存储系统中(如 Kafka、分布式文件系统、Solr 搜索服务器等)。
*此产品及展示信息均由腾讯云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠
Apache Flume 是一个分布式、可靠、高可用的日志收集系统,支持各种各样的数据来源(如 HTTP、Log 文件、JMS、监听端口数据等),能将这些数据源的海量日志数据进行高效收集、聚合、移动,最后存储到指定存储系统中(如 Kafka、分布式文件系统、Solr 搜索服务器等)。
Flume 基本结构如下:

Flume 以 agent 为最小的独立运行单位。一个 agent 就是一个 JVM,单个 agent 由 Source、Sink 和 Channel 三大组件构成。

Flume 与 Kafka
把数据存储到 HDFS 或者 HBase 等下游存储模块或者计算模块时需要考虑各种复杂的场景,例如并发写入的量以及系统承载压力、网络延迟等问题。Flume 作为灵活的分布式系统具有多种接口,同时提供可定制化的管道。 在生产处理环节中,当生产与处理速度不一致时,Kafka 可以充当缓存角色。Kafka 拥有 partition 结构以及采用 append 追加数据,使 Kafka 具有优秀的吞吐能力;同时其拥有 replication 结构,使 Kafka 具有很高的容错性。 所以将 Flume 和 Kafka 结合起来,可以满足生产环境中绝大多数要求。

Flume 接入开源 Kafka

准备工作

下载 Apache Flume (1.6.0以上版本兼容 Kafka)
下载 Kafka工具包 (0.9.x以上版本,0.8已经不支持)
确认 Kafka 的 Source、 Sink 组件已经在 Flume 中。

接入方式

Kafka 可作为 Source 或者 Sink 端对消息进行导入或者导出。
Kafka Source
Kafka Sink
配置 kafka 作为消息来源,即将自己作为消费者,从 Kafka 中拉取数据传入到指定 Sink 中。主要配置选项如下:
配置项
说明
channels
自己配置的 Channel
type
必须为:org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.servers
Kafka Broker 的服务器地址
kafka.consumer.group.id
作为 Kafka 消费端的 Group ID
kafka.topics
Kafka 中数据目标 Topic
batchSize
每次写入 Channel 的大小
batchDurationMillis
每次写入最大间隔时间
示例:
				
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
更多内容请参见 Apache Flume 官网
配置 Kafka 作为内容接收方,即将自己作为生产者,推到 Kafka Server 中等待后续操作。主要配置选项如下:
配置项
说明
channel
自己配置的 Channel
type
必须为:org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.servers
Kafka Broker 的服务器
kafka.topics
Kafka 中数据来源 Topic
kafka.flumeBatchSize
每次写入的 Bacth 大小
kafka.producer.acks
Kafka 生产者的生产策略
示例:
				
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = mytopic
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
更多内容请参见 Apache Flume 官网

Flume 接入 CKafka

使用 CKafka 作为 Sink
使用 CKafka 作为 Source

步骤1:获取 CKafka 实例接入地址

1. 登录 CKafka 控制台
2. 在左侧导航栏选择实例列表,单击实例的“ID”,进入实例基本信息页面。
3. 在实例的基本信息页面的接入方式模块,可获取实例的接入地址。

步骤2:创建 Topic

1. 在实例基本信息页面,选择顶部 Topic 管理页签。
2. 在 Topic 管理页面,单击新建,创建一个名为 flume_test 的 Topic。

步骤3:配置 Flume

1. 下载 Apache Flume 工具包并解压
2. 编写配置文件 flume-kafka-sink.properties,以下是一个简单的 Java 语言 Demo(配置在解压目录的 conf 文件夹下),若无特殊要求则将自己的实例 IP 与 Topic 替换到配置文件当中即可。本例使用的 source 为 tail -F flume-test ,即文件中新增的信息。

代码示例如下:
				
# 以kafka作为sink的demo
agentckafka.source = exectail
agentckafka.channels = memoryChannel
agentckafka.sinks = kafkaSink
# 设置source类型,根据不同需求而设置。若有特殊source可自行配置,此处使用最简单的例子
agentckafka.sources.exectail.type = exec
agentckafka.sources.exetail.command = tail -F ./flume.test
agentckafka.sources.exectail.batchSize = 20
# 设置source channel
agentckafka.sources.exectail.channels = memoryChannel
# 设置sink类型,此处设置为kafka
agentckafka.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
# 此处设置ckafka提供的ip:port
agentckafka.sinks.kafkaSink.brokerList = 172.16.16.12:9092 # 配置实例IP
# 此处设置需要导入数据的topic,请先在控制台提前创建好topic
agentckafka.sinks.kafkaSink.topic = flume test #配置topic
# 设置sink channel
agentckafka.sinks.kafkaSink.channel = memoryChannel
# Channel使用默认配置
# Each channel's type is defined.
agentckafka.channels.memoryChannel.type = memory
agentckafka.channels.memoryChannel.keep-alive = 10
# Other config values specific to each type of channel(sink or source) can be defined as well
# In this case, it specifies the capacity of the memory channel
agentckafka.channels.memoryChannel.capacity = 1000
agentckafka.channels.memoryChannel.transactionCapacity = 1000
3. 执行如下命令启动 Flume。
				
./bin/flume-ng agent -n agentckafka -c conf -f conf/flume-kafka-sink.properties
4. 写入消息到 flume-test 文件中,此时消息将由 Flume 写入到 CKafka。

5. 启动 CKafka 客户端进行消费。
				
./kafka-console-consumer.sh --bootstrap-server xx.xx.xx.xx:xxxx --topic flume_test --from-beginning --new-consumer
说明
bootstrap-server 填写刚创建的 CKafka 实例的接入地址,topic 填写刚创建的 Topic 名称。
可以看到刚才的消息被消费出来。

步骤1:获取 CKafka 实例接入地址

1. 登录 CKafka 控制台
2. 在左侧导航栏选择实例列表,单击实例的“ID”,进入实例基本信息页面。
3. 在实例的基本信息页面的接入方式模块,可获取实例的接入地址。

步骤2:创建 Topic

1. 在实例基本信息页面,选择顶部 Topic 管理页签。
2. 在 Topic 管理页面,单击新建,创建一个名为 flume_test 的 Topic。

步骤3:配置 Flume

1. 下载 Apache Flume 工具包并解压
2. 编写配置文件 flume-kafka-source.properties,以下是一个简单的 Demo(配置在解压目录的 conf 文件夹下)。若无特殊要求则将自己的实例 IP 与 Topic 替换到配置文件当中即可。此处使用的 sink 为 logger。

3. 执行如下命令启动 Flume。
				
./bin/flume-ng agent -n agentckafka -c conf -f conf/flume-kafka-source.properties
4. 查看 logger 输出信息(默认路径为logs/flume.log)。

相似文档
  • Kafka Connect 目前支持两种执行模式:standalone 和 distributed。 以 standalone 模式启动 connect: 通过以下命令以 standalone 模式启动 connect:
  • Storm 是一个分布式实时计算框架,能够对数据进行流式处理和提供通用性分布式 RPC 调用,可以实现处理事件亚秒级的延迟,适用于对延迟要求比较高的实时数据处理场景。
  • Logstash 是一个开源的日志处理工具,可以从多个源头收集数据、过滤收集的数据并对数据进行存储作为其他用途。 Logstash 灵活性强,拥有强大的语法分析功能,插件丰富,支持多种输入和输出源。Logstash 作为水平可伸缩的数据管道,与 Elasticsearch 和 Kibana 配合,在日志收集检索方面功能强大。
  • Beats 平台 集合了多种单一用途数据采集器。这些采集器安装后可用作轻量型代理,从成百上千或成千上万台机器向目标发送采集数据。 Beats 有多种采集器,您可以根据自身的需求下载对应的采集器。本文以 Filebeat(轻量型日志采集器)为例,向您介绍 Filebeat 接入 CKafka 的操作方法,及接入后常见问题的解决方法。
  • CKafka 跨可用区部署: CKafka 专业版支持跨可用区部署,在拥有3个或3个以上可用区的地域购买 CKafka 实例时,可以最多选择四个可用区购买跨可用区实例。该实例分区副本会强制分布在各个可用区节点上,这种部署方式能够让您的实例在单个可用区不可用情况下仍能正常提供服务。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部