上云无忧 > 文档中心 > 百度数据仓库 Palo 导入本地数据
百度数据仓库 Palo Doris版
百度数据仓库 Palo 导入本地数据

文档简介:
Stream Load 用于将本地文件导入到 PALO 中。 不同于其他命令的提交方式,Stream Load 是通过 HTTP 协议与 PALO 进行连接交互的。 该方式中涉及 HOST:PORT 应为 HTTP 协议端口。 公有云用户必须使用 Compute Node(BE)的 HTTP 协议端口,默认为 8040。
*此产品及展示信息均由百度智能云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

Stream Load 用于将本地文件导入到 PALO 中。

不同于其他命令的提交方式,Stream Load 是通过 HTTP 协议与 PALO 进行连接交互的。

该方式中涉及 HOST:PORT 应为 HTTP 协议端口。

  • 公有云用户必须使用 Compute Node(BE)的 HTTP 协议端口,默认为 8040。
  • 私有化部署用户可以使用 Leader Node(FE)的 HTTP 协议端口,默认为 8030。但须保证客户端所在机器网络能够联通 Compute Node 所在机器。

本文文档我们以 curl 命令为例演示如何进行数据导入。

文档最后,我们给出一个使用 Java 导入数据的代码示例。

导入数据

Stream Load 的请求体如下:

PUT /api/{db}/{table}/_stream_load
  1. 创建一张表

    通过 CREATE TABLE 命令创建一张表用于存储待导入的数据。具体的导入方式请查阅 CREATE TABLE 命令手册。示例如下:

    CREATE TABLE IF NOT EXISTS load_test ( id INT, name VARCHAR(128) ) DISTRIBUTED BY HASH(id) BUCKETS 8;
  2. 导入数据

    执行以下 curl 命令导入本地文件:

    curl -u user:passwd -H "label:example_label_1" -T /path/to/local/your_file.txt http://host:port/api/example
    • user:passwd 为在 PALO 中创建的用户。初始用户为 admin,密码为创建 PALO 集群时设置的密码。
    • host:port 为 Compute Node 的 HTTP 协议端口,默认是 8040,可以在智能云 PALO 集群详情页面查看。
    • label: 可以在 Header 中指定 Label 唯一标识这个导入任务。

    关于 Stream Load 命令的更多高级操作,请参阅 Stream Load 命令文档。

  3. _db/load_test/_stream_load
  4. 等待导入结果

    Stream Load 命令是同步命令,返回成功即表示导入成功。如果导入数据较大,可能需要较长的等待时间。示例如下:

    { "TxnId": 1003, "Label": "example_label_1", "Status": "Success", "Message": "OK", "NumberTotalRows":
    • Status 字段状态为 Success 即表示导入成功。
    • 其他字段的详细介绍,请参阅 Stream Load 命令文档。
  5.  1000000, "NumberLoadedRows": 1000000, "NumberFilteredRows": 1, "NumberUnselectedRows": 0, 
  6. "LoadBytes": 40888898, "LoadTimeMs": 2144, "BeginTxnTimeMs": 1, "StreamLoadPutTimeMs": 2, 
  7. "ReadDataTimeMs": 325, "WriteDataTimeMs": 1933, "CommitAndPublishTimeMs": 106, "ErrorURL": 
  8. "http://192.168.1.1:8042/api/_load_error_log?file=__shard_0/error_log_insert_stmt_db18266d4d9b4ee5
  9. -abb00ddd64bdf005_db18266d4d9b4ee5_abb00ddd64bdf005" }

使用建议

  • Stream Load 只能导入本地文件。
  • 建议一个导入请求的数据量控制在 1 GB 以内。如果有大量本地文件,可以分批并发提交。

Java 代码示例

这里通过一个简单的 JAVA 示例来执行 Stream Load:

(感谢 hf200012 提供示例)

package demo.doris; import org.apache.commons.codec.binary.Base64; import org.apache.http.HttpHeaders;
 import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.
HttpPut; import org.apache.http.entity.FileEntity; import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.DefaultRedirectStrategy; import org.apache.http.impl.client.Http
ClientBuilder; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils;
 import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; /*
这是一个 PALO Stream Load 示例,需要依赖
<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
</dependency>
 */ public class PALOStreamLoader { // 1. 对于公有云公户,这里填写 Compute Node 
地址以及 HTTP 协议访问端口(8040)。 // 2. 对于开源用户,可以选择填写 FE 地址以及 FE 的 
http_port,但须保证客户端和 BE 节点的连通性。 private final static String HOST = "your_host"; private
 final static int PORT = 8040; private final static String DATABASE = "db1"; // 要导入的数据库 private 
final static String TABLE = "tbl1"; // 要导入的表 private final static String USER = "root"; // PALO 
用户名 private final static String PASSWD = ""; // PALO 密码 private final static String LOAD_FILE_NAME 
= "/path/to/1.txt"; // 要导入的本地文件路径 private final static String loadUrl = String.format
("http://%s:%s/api/%s/%s/_stream_load", HOST, PORT, DATABASE, TABLE); private final static 
HttpClientBuilder httpClientBuilder = HttpClients .custom() .setRedirectStrategy(new DefaultRedirectStrategy()
 { @Override protected boolean isRedirectable(String method) { // 如果连接目标是 FE,则需要处理 307 redirect。
 return true; } }); public void load(File file) throws Exception { try (CloseableHttpClient client = 
httpClientBuilder.build()) { HttpPut put = new HttpPut(loadUrl); put.setHeader(HttpHeaders.EXPECT, 
"100-continue"); put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD)); 
// 可以在 Header 中设置 stream load 相关属性,这里我们设置 label 和 column_separator。 put.setHeader
("label","label1"); put.setHeader("column_separator",","); // 设置导入文件。 // 这里也可以使用
 StringEntity 来传输任意数据。 FileEntity entity = new FileEntity(file); put.setEntity(entity); 
try (CloseableHttpResponse response = client.execute(put)) { String loadResult = ""; if (response.getEntity
() != null) { loadResult = EntityUtils.toString(response.getEntity()); } final int statusCode = response.
getStatusLine().getStatusCode(); if (statusCode != 200) { throw new IOException( String.format("Stream 
load failed. status: %s load result: %s", statusCode, loadResult)); } System.out.println("Get load result: 
" + loadResult); } } } private String basicAuthHeader(String username, String password) { final String 
tobeEncode = username + ":" + password; byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes
(StandardCharsets.UTF_8)); return "Basic " + new String(encoded); } public static void main(String[] args) 
throws Exception{ PALOStreamLoader loader = new PALOStreamLoader(); File file = new File(LOAD_FILE_NAME);
 loader.load(file); } }

相似文档
  • 本文档主要介绍如何导入 BOS 中存储的数据。 准备工作: 请先通过以下步骤,在百度对象存储(Baidu Object Storage,BOS)上存放需导入到PALO中的数据。 开通 BOS 服务: 请参阅 开始使用BOS。 创建 Bucket: 请参阅 创建Bucket。 注意:Bucket 所属地域必须和 PALO 集群所属地域相同。PALO 地域通常可以在 PALO 控制台页面左上角查看。
  • 用户可以通过提交例行导入作业,直接订阅Kafka中的消息数据,以近实时的方式进行数据同步。 PALO 自身能够保证不丢不重的订阅 Kafka 中的消息,即 Exactly-Once 消费语义。
  • 用户可以通过 JDBC 协议,使用 INSERT 语句进行数据导入。 INSERT 语句的使用方式和 MySQL 等数据库中 INSERT 语句的使用方式类似。 INSERT 语句支持以下两种语法: * INSERT INTO table SELECT ... * INSERT INTO table VALUES(...)
  • PALO 可以创建通过 ODBC 协议访问的外部表。创建完成后,可以通过 SELECT 语句直接查询外部表的数据,也可以通过 INSERT INTO SELECT 的方式导入外部表的数据。
  • PALO 支持导入 JSON 格式的数据。本文档主要说明在进行JSON格式数据导入时的注意事项。 支持的导入方式: 目前只有以下导入方式支持 Json 格式的数据导入: 将本地 JSON 格式的文件通过 STREAM LOAD 方式导入。 通过 ROUNTINE LOAD 订阅并消费 Kafka 中的 JSON 格式消息。 暂不支持其他方式的 JSON 格式数据导入。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部