上云无忧 > 文档中心 > 使用BSC(百度流式计算服务)将Kafka中的数据导入百度智能云Elasticsearch
Elasticsearch
使用BSC(百度流式计算服务)将Kafka中的数据导入百度智能云Elasticsearch

文档简介:
介绍: 本文主要介绍通过 BSC【百度流式计算服务】将数据从Kafka中导入到Es中。 创建集群: 在将数据导入Es之前需要再百度云上创建Es集群,假定创建的集群信息如下:
*此产品及展示信息均由百度智能云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

介绍

本文主要介绍通过 BSC【百度流式计算服务】将数据从Kafka中导入到Es中。

创建集群

在将数据导入Es之前需要再百度云上创建Es集群,假定创建的集群信息如下: 

这里主要记录以下信息:

  • 集群ID: 296245916518715392
  • 创建集群时的密码: bbs_2016

创建Kafka Topic

登录百度云管理控制台,进入kafka产品界面,创建topic,并向topic中灌入数据。示例中我们灌入的数据如下,包括两个json字段,样例数据如下: 

示例中创建topic:a15fdd9dd5154845b32f7c74ae155ae3__demo_test 并且确保该topic下有对应的证书,将证书下载到本地。

编辑BSC 作业

创建Kafka Source

进入BSC编辑作业界面,创建kafka source table, sql代码如下

CREATE table source_table_kafka(
stringtype STRING,
longtype LONG
) with(
    type = 'BKAFKA',
    topic = 'a15fdd9dd5154845b32f7c74ae155ae3__demo_test',
    kafka.bootstrap.servers = 'kafka.bj.baidubce.com:9091',
    sslFilePath = 'kafka_key.zip',
    encode = 'json'
);

其中sslFilePath = 'kafka-key.zip',为上一步下载到本地的kafka证书。

上传Kafka证书

点击高级设置,上传kafka证书

上传之后如下图

创建Es Sink Table

sql代码如下

create table sink_table_es(
    stringtype String,
    longtype Long
)with(
    type = 'ES',
    es.net.http.auth.user = 'superuser',
    es.net.http.auth.pass = 'bbs_2016',
    es.resource = 'bsc_test/doc_type',
    es.clusterId = '296245916518715392',
    es.region = 'bd',
    es.port = '8200',
    es.version = '6.5.3'
);

其中:

  • es.resource对应es的索引与类型,es会在bsc写入数据时自动创建指定索引
  • es.clusterId对应es的集群ID
  • es.region 表示 Es服务所在的地区的代码,可以参考 Es服务区域代码 中查询区域与代码的对应关系。

编写导入语句

sql语句如下:

insert into
    sink_table_es(stringtype, longtype) outputmode append
select
    stringtype,
    longtype
from
    source_table_kafka;

保存作业并发布运行作业

查看Es中的数据

相似文档
  • 创建带冷数据节点集群: 冷数据节点集群创建: 登录百度智能云Elasticsearch 控制台,选择【创建集群】。 开启冷数据节点,并选择对应的计算资源及磁盘配置进行创建。
  • Kibana 是一款开源的可视化数据分析工具,可以使用 Kibana 对 Elasticsearch 中的数据进行搜索、分析,可以很方便的利用图表、表格及地图对数据进行多样的可视化展现。
  • 简介: Logstash是一款开源的实时数据采集工具。通过配置多种类型的input、fliter、output插件,Logstash支持多类数据源的数据采集、转换,并存储到目的端。
  • 介绍: 日志是一个系统不可缺少的组成部分,日志记录系统产生的各种行为,基于日志信息可以对系统进行问题排查,性能优化,安全分析等工作。 使用ELK(Elasticsearch、Logstash、Kibana) 结合Filebeat与Kafka构建日志分析系统,可以方便的对日志数据进行采集解析存储,以及近实时的查询分析。
  • Elasticsearch系统常见问题: 如何查看Es安装了哪些插件? 线程池队列满导致错误? Too Many Open Files的错误? Es 中一个分片一般设置多大? 当集群为red或者yellow的时候怎么办? 如何cancel掉慢查询?
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部