上云无忧 > 文档中心 > Spark访问百度智能云Elasticsearch常见问题
Elasticsearch
Spark访问百度智能云Elasticsearch常见问题

文档简介:
说明: 本文档主要介绍了通过elasticsearch-hadoop中的Spark访问ES时常见配置项意义。本文中的es-spark是elasticsearch-hadoop中和Spark相关联的包,用户通过自己的Spark集群读写ES集群,elasticsearch-hadoop基本上兼容了目前ES所有的版本。
*此产品及展示信息均由百度智能云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

说明

本文档主要介绍了通过elasticsearch-hadoop中的Spark访问ES时常见配置项意义。本文中的es-spark是elasticsearch-hadoop中和Spark相关联的包,用户通过自己的Spark集群读写ES集群,elasticsearch-hadoop基本上兼容了目前ES所有的版本

版本号检测异常

es-spark 运行时通常会自动检测ES集群的版本号,获取的版本号主要是用来对不同集群的版本做API的兼容处理

一般情况下用户不用关注ES版本号,但是在云上有时候自动检测集群的版本号会发生一些莫名其妙的检测不到的错误,可以通过配置解决:

配置项

es.internal.es.version:"6.5.3"

在一些较新版本的es-spark包中同样需要配置:

es.internal.es.cluster.name:"Your Cluter Name"

实现原理

配置完以后,es-spark不会请求 / 目录,解析version,会直接使用用户配置的version:

INTERNAL_ES_VERSION = "es.internal.es.version"
INTERNAL_ES_CLUSTER_NAME = "es.internal.es.cluster.name"

public static EsMajorVersion discoverEsVersion(Settings settings, Log log) {
      return discoverClusterInfo(settings, log).getMajorVersion();
}

// 不同版本的elasticsearch-hadoop可能会有差异
public static ClusterInfo discoverClusterInfo(Settings settings, Log log) {
        ClusterName remoteClusterName = null;
        EsMajorVersion remoteVersion = null;
        // 尝试从配置中获取集群名字
        String clusterName = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_CLUSTER_NAME);
        // 尝试从配置中获取集群UUID
        String clusterUUID = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_CLUSTER_UUID);
        // 尝试从配置中获取ES version
        String version = settings.getProperty(InternalConfigurationOptions.INTERNAL_ES_VERSION);
        // 如果集群名字和版本号没有从配置文件中拿到,则发起网络请求(请求根目录)
        if (StringUtils.hasText(clusterName) && StringUtils.hasText(version)) {
            if (log.isDebugEnabled()) {
                log.debug(String.format("Elasticsearch cluster [NAME:%s][UUID:%s][VERSION:%s] already
 present in configuration; skipping discovery",
                        clusterName, clusterUUID, version));
            }
            remoteClusterName = new ClusterName(clusterName, clusterUUID);
            remoteVersion = EsMajorVersion.parse(version);
            return new ClusterInfo(remoteClusterName, remoteVersion);
        }
      ....
}

如果启用集群名、版本号自动检测功能,需要保证分配给es-spark的用户有访问根目录/的GET权限

GET /

数据节点发现

配置项:

es.nodes.wan.only: false  默认为false 
es.nodes.discovery: true   默认为true

百度云ES集群前面有一个BLB负载均衡,配置es.nodes的时候写这个BLB的地址即可,需要保证es-spark可以访问BLB的地址

  • es.nodes.wan.only: false,es.nodes.discovery: true: Spark会通过访问es.nodes中指定的host(可以为多个) 得到ES集群所有开启HTTP服务节点的ip和port,后续对数据的访问会直接访问分片数据所在的节点上(需要保证ES集群所有节点都能够被Spark集群访问到)
  • es.nodes.wan.only: true,es.nodes.discovery: false或不设置:Spark发送给ES的所有请求都需要通过这个节点进行转发,效率相对比较低

具体代码逻辑:

ES_NODES_DISCOVERY = "es.nodes.discovery"
ES_NODES_WAN_ONLY = "es.nodes.wan.only"
ES_NODES_WAN_ONLY_DEFAULT = "false"

InitializationUtils#discoverNodesIfNeeded
    public static List<NodeInfo> discoverNodesIfNeeded(Settings settings, Log log) {
        if (settings.getNodesDiscovery()) { // 需要读取配置项
            RestClient bootstrap = new RestClient(settings);

            try {
                List<NodeInfo> discoveredNodes = bootstrap.getHttpNodes(false);
                if (log.isDebugEnabled()) {
                    log.debug(String.format("Nodes discovery enabled - found %s", discoveredNodes));
                }

                SettingsUtils.addDiscoveredNodes(settings, discoveredNodes);
                return discoveredNodes;
            } finally {
                bootstrap.close();
            }
        }

        return null;
    }
 
public boolean getNodesDiscovery() {
        // by default, if not set, return a value compatible with the WAN setting
        // otherwise return the user value.
        // this helps validate the configuration
        return Booleans.parseBoolean(getProperty(ES_NODES_DISCOVERY), !getNodesWANOnly());
 //默认值是!getNodesWANOnly() 
    }
    
public boolean getNodesWANOnly() {
        return Booleans.parseBoolean(getProperty(ES_NODES_WAN_ONLY, ES_NODES_WAN_ONLY_DEFAULT));
    }

用户权限问题

如果启动节点发现功能,需要保证es-spark使用的用户有访问

GET /_nodes/http
GET /{index}/_search_shards

的权限,否则会导致整个Job失败

配置Bulk导入有错误的处理方式

Spark 写ES的时候发生错误且经过几次尝试后会中断job,默认的情况下会直接中断当前Job,导致整个任务失败,如果只是想把导入失败的文档打印到日志中,可以通过如下配置解决:

配置错误发生错误的处理机制

es.write.rest.error.handlers = log 
es.write.rest.error.handler.log.logger.name: es_error_handler

设置后当出现bulk写错误的时候,Job不会中断,会以log的形式输出到日志中,日志前缀为es_error_handler

如何拿到除_source外的其他文档元数据

正常情况下,我们通过调用ES的_search API返回的每条数据如下:

{
            "_index": "d_index",
            "_type": "doc",
            "_id": "51rrB2sBaX4YjyPY-2EG",
            "_score": 1,
            "_source": {
               "A": "field A value",
               "B": "field B value"
            }
         }

但是用Spark去读ES的时候,默认是不读除_source字段意外的其他字段,如_id _version, 在一些场景下,业务可能需要拿到_id,可以通过如下配置:

es.read.metadata:true  //这个配置默认是false
es.read.metadata.field: "_id" // 配置需要读取的元数据字段
es.read.metadata.version: 默认false 读取es的版本号

文档的元数据字段信息会放在一个_metadata的字段里面

导入的时候指定id、version的方法

做数据迁移的时候,比如从一个低版本的ES集群迁移到高版的ES集群,我们可以用es-spark边度边写,如果需要指定_id, _rouring, _version 这些信息,可以设置:

es.mapping.id:”_meta._id”  指定json中id的路径
es.mapping.routing:”_meta._rouring”  指定json中id的路径
es.mapping.version:”_meta._version”  指定json中id的路径

_meta.xxx是所需的字段在Json文档中的路径

spark默认写ES的时候refresh的会在每次bulk结束的时候调用

我们建议设置为false,有ES内部index的 refresh_interval来控制refresh,否则ES集群会有大量的线程在refesh会带来很大的CPU和磁盘压力

es.batch.write.refresh: false 默认是true

控制每次Bulk写入的量

es.batch.size.bytes:1mb 每次bulk写入文档的大小,默认1mb
es.batch.size.entries:1000 每次bulk写入的文档数,默认是1000

用户可根据ES集群套餐合理设置

读相关设置

es.scroll.size: 50, 默认是50 这个值相对来说比较小,可以适当增大至1000-10000
es.input.use.sliced.partitions:  true  为了提高并发,es会进行scroll-slice进行切分
es.input.max.docs.per.partition: 100000 根据这个值进行切分slice

es-spark 读取 ES集群数据的时候,会按照每个分片的总数进行切分做scroll-slice处理:

int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition);

numDocs为单个分片的文档总数,如果文档有5千万,这时候会切分成500个sclice,会对后端的线上ES集群造成巨大的CPU压力,所以一般建议关闭scroll-slice,以避免影响在线业务

建议的参数:

es.scroll.size: 2000 //尽量根据文档的大小来选择
es.input.use.sliced.partitions: false

控制需要写入的文档字段

有时候业务导入数据的时候,希望有一些字段不被写如ES,可以设置:

es.mapping.exclude:默认是none
// 多个字段用逗号进行分割,直接 . * 表达
es.mapping.exclude = *.description
es.mapping.include = u*, foo.*

执行upsert操作

示例:

String update_params = "parmas:update_time";
String update_script = "ctx._source.update_time = params.update_time";
// 设置sparkConfig
SparkConf sparkConf = new SparkConf()
        .setAppName("YourAppName”)
        .set("es.net.http.auth.user", user)
        .set("es.net.http.auth.pass", pwd)
        .set("es.nodes", nodes)
        .set("es.port", port)
        .set("es.batch.size.entries", "50")
        .set("es.http.timeout","5m")
        .set("es.read.metadata.field", "_id")
        .set("es.write.operation","upsert")
        .set("es.update.script.params", update_params)
        .set("es.update.script.inline", update_script)
        .set("es.nodes.wan.only", "true");

注:

es.update.script.params 为执行更新需要的参数列表
es.update.script.inline 为执行update使用script脚本
相似文档
  • 协议生效时间:2020 年 9 月 1 日。 本服务等级协议(Service Level Agreement,以下简称 “SLA”)规定了百度智能云向用户提供的Elasticsearch的服务可用性等级指标及赔偿方案。
  • 数据仓库 Palo 是百度智能云上提供的PB级别的MPP数据仓库服务,以较低的成本提供在大数据集上的高性能分析和报表查询功能。
  • 2020-12-15:查询性能优化(Join Reorder),多表关联查询性能提升100+倍,内存消耗减少 5~10 倍。 2020-11-25:UNIQUE表VALUE列查询下推,查询性能提升2-100倍。 UNIQUE单版本和多版本读取加速,读取性能较之前有 20%-40% 提升。
  • MySQL协议兼容: PALO提供兼容MySQL协议的连接接口,使得用户不必再单独部署新的客户端库或者工具,可以直接使用MySQL的相关库或者工具;由于提供了MySQL接口,也容易与上层应用兼容;用户学习曲线降低,方便用户上手使用。
  • PALO 拥有非常简洁优雅的系统架构。其核心组件仅包括两类有状态的组件和一个可选的无状态组件。而在使用百度智能云托管的PALO服务时,额外搭配一个前端控制台用于进行一些集群管理操作。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部