上云无忧 > 文档中心 > 百度数据仓库 Palo 订阅Kafka日志
百度数据仓库 Palo Doris版
百度数据仓库 Palo 订阅Kafka日志

文档简介:
用户可以通过提交例行导入作业,直接订阅Kafka中的消息数据,以近实时的方式进行数据同步。 PALO 自身能够保证不丢不重的订阅 Kafka 中的消息,即 Exactly-Once 消费语义。
*此产品及展示信息均由百度智能云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

用户可以通过提交例行导入作业,直接订阅Kafka中的消息数据,以近实时的方式进行数据同步。

PALO 自身能够保证不丢不重的订阅 Kafka 中的消息,即 Exactly-Once 消费语义。

准备工作

开通百度消息服务

百度消息服务(BMS)基于 Kafka 在百度智能云提供托管服务,请先按照以下流程开通服务。

  1. 请根据 BMS 快速入门 文档开通消息服务
  2. 下载证书压缩包 kafka-key.zip 并解压,解压后将得到以下文件

    • ca.pem
    • client.key
    • client.keystore.jks
    • client.pem
    • client.properties
    • client.truststore.jks
  3. 上传证书文件到 HTTP 服务器。

    因为后续 PALO 需要从某个 HTTP 服务器上下载这些证书以供访问 Kafka。因此我们需要先将这些证书上传到 HTTP 服务器。这个 HTTP 服务器必须要能够被 PALO 的 Leader Node 节点所访问。

    如果您没有合适的 HTTP 服务器,可以参照以下方式借助百度对象存储(BOS)来完成:

    1. 根据 开始使用,创建Bucket 文档开通BOS服务并创建一个 Bucket。注意,Bucket所在地域必须和 PALO 集群所在地域相同
    2. 将以下三个文件上传到 Bucket

      • ca.pem
      • client.key
      • client.pem
    3. 在 BOS Bucket 文件列表页面,点击文件右侧的 文件信息,可以获取 HTTP 访问连接。请将 连接有效时间 设为 -1,即永久。

      注:请不要使用带有 cdn 加速的 http 下载地址。这个地址某些情况无法被 PALO 访问。

自建 Kafka 服务

如果使用自建 Kafka 服务,请确保 Kafka 服务和 PALO 集群在同一个 VPC 内,并且相互之间的网络能够互通。

订阅 Kafka 消息

订阅 Kafka 消息使用了 PALO 中的例行导入(Routine Load)功能。

用户首先需要创建一个例行导入作业。作业会通过例行调度,不断地发送一系列的任务,每个任务会消费一定数量 Kafka 中的消息。

请注意以下使用限制:

  1. 支持无认证的 Kafka 访问,以及通过 SSL 方式认证的 Kafka 集群。
  2. 支持的消息格式如下:

    • csv 文本格式。每一个 message 为一行,且行尾不包含换行符。
    • Json 格式,详见 导入 Json 格式数据。
  3. 仅支持 Kafka 0.10.0.0(含) 以上版本。

访问 SSL 认证的 Kafka 集群

例行导入功能支持无认证的 Kafka 集群,以及通过 SSL 认证的 Kafka 集群。

访问 SSL 认证的 Kafka 集群需要用户提供用于认证 Kafka Broker 公钥的证书文件(ca.pem)。如果 Kafka 集群同时开启了客户端认证,则还需提供客户端的公钥(client.pem)、密钥文件(client.key),以及密钥密码。这里所需的文件需要先通过 CREAE FILE 命令上传到 Plao 中,并且 catalog 名称为 kafka。CREATE FILE 命令的具体帮助可以参见 CREATE FILE 命令手册。这里给出示例:

  • 上传文件

    CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = 
  • "kafka"); CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", 
  • "catalog" = "kafka"); CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/
  • client.pem", "catalog" = "kafka");

上传完成后,可以通过 SHOW FILES 命令查看已上传的文件。

创建例行导入作业

创建例行导入任务的具体命令,请参阅 ROUTINE LOAD 命令手册。这里给出示例:

  1. 访问无认证的 Kafka 集群

    CREATE ROUTINE LOAD example_db.my_first_job ON example_tbl COLUMNS TERMINATED BY "," PROPERTIES 
    • max_batch_interval/max_batch_rows/max_batch_size 用于控制一个子任务的运行周期。一个子任务的运行周期由最长运行时间、最多消费行数和最大消费数据量共同决定。
  2. ( "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", )
  3.  FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = 
  4. "my_topic", "property.group.id" = "xxx", "property.client.id" = "xxx", "property.kafka_default_offsets" 
  5. = "OFFSET_BEGINNING" );
  6. 访问 SSL 认证的 Kafka 集群

    CREATE ROUTINE LOAD example_db.my_first_job ON example_tbl COLUMNS TERMINATED BY ",",
    • 对于百度消息服务,property.ssl.key.password 属性可以在 client.properties 文件中获取。
  7.  PROPERTIES ( "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", )
  8.  FROM KAFKA ( "kafka_broker_list"= "broker1:9091,broker2:9091", "kafka_topic" = "my_topic", 
  9. "property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.
  10. certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key",
  11.  "property.ssl.key.password" = "abcdefg" );

查看导入作业状态

查看作业状态的具体命令和示例请参阅 SHOW ROUTINE LOAD 命令文档。

查看某个作业的任务运行状态的具体命令和示例请参阅 SHOW ROUTINE LOAD TASK 命令文档。

只能查看当前正在运行中的任务,已结束和未开始的任务无法查看。

修改作业属性

用户可以修改已经创建的作业的部分属性。具体说明请参阅 ALTER ROUTINE LOAD 命令手册。

作业控制

用户可以通过 STOP/PAUSE/RESUME 三个命令来控制作业的停止,暂停和重启。

具体命令请参阅 STOP ROUTINE LOAD,PAUSE ROUTINE LOAD,RESUME ROUTINE LOAD 命令文档。

更多帮助

关于 ROUTINE LOAD 的更多详细语法和最佳实践,请参阅 ROUTINE LOAD 命令手册。

相似文档
  • 用户可以通过 JDBC 协议,使用 INSERT 语句进行数据导入。 INSERT 语句的使用方式和 MySQL 等数据库中 INSERT 语句的使用方式类似。 INSERT 语句支持以下两种语法: * INSERT INTO table SELECT ... * INSERT INTO table VALUES(...)
  • PALO 可以创建通过 ODBC 协议访问的外部表。创建完成后,可以通过 SELECT 语句直接查询外部表的数据,也可以通过 INSERT INTO SELECT 的方式导入外部表的数据。
  • PALO 支持导入 JSON 格式的数据。本文档主要说明在进行JSON格式数据导入时的注意事项。 支持的导入方式: 目前只有以下导入方式支持 Json 格式的数据导入: 将本地 JSON 格式的文件通过 STREAM LOAD 方式导入。 通过 ROUNTINE LOAD 订阅并消费 Kafka 中的 JSON 格式消息。 暂不支持其他方式的 JSON 格式数据导入。
  • 导入原子性: PALO 中的所有导入操作都有原子性保证,即一个导入作业中的数据要么全部成功,要么全部失败。不会出现仅部分数据导入成功的情况。 在 BROKER LOAD 中我们也可以实现多多表的原子性导入。
  • PALO 支持丰富的列映射、转换和过滤操作。可以非常灵活的处理需要导入的原始数据。 本文档主要介绍如何在导入中使用这些功能。 总体介绍: PALO 在导入过程中对数据处理步骤分为以下几步: 数据按原始文件中的列的顺序读入到 PALO。 通过前置过滤条件(PRECEDING FILTER)对原始数据进行一次过滤。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部