上云无忧 > 文档中心 > 百度智能云MapReduce开源组件介绍 - Spark
百度智能云MapReduce开源组件介绍 - Spark

文档简介:
Spark简介: 本文以分析Web日志统计每天的PV和UV为例,介绍如何在百度智能云平台使用Spark。 Spark是开源的大规模数据处理引擎。Spark的先进的DAG执行引擎支持周期性数据流和内存计算,在内存中的运算速度是MapReduce的100倍以上,在硬盘中的运算速度是MapReduce的10倍以上。
*此产品及展示信息均由百度智能云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

Spark简介

本文以分析Web日志统计每天的PV和UV为例,介绍如何在百度智能云平台使用Spark。

Spark是开源的大规模数据处理引擎。Spark的先进的DAG执行引擎支持周期性数据流和内存计算,在内存中的运算速度是MapReduce的100倍以上,在硬盘中的运算速度是MapReduce的10倍以上。Spark提供了Java、Scala、Python和R语言的高水平API,同时Spark已无缝融合了丰富的工具:Spark SQL(SQL)、MLlib(机器学习)、GraphX(图形处理)、Spark Streaming(流式处理)。Spark可访问存储在HDFS、HBase、Cassandra、本地文件系统等上的数据,支持文本文件、序列文件、以及任何Hadoop的输入文件。

Spark提供端到端的服务:

  1. Spark的Driver包含您的作业程序,完成作业程序的解析和生成;
  2. Driver向集群的Master节点申请运行作业所需的资源;
  3. Master节点为作业分配满足要求的Core节点,并在该节点按要求创建Executor;
  4. Driver将Spark作业的代码和文件传送给分配的Executor;
  5. Executor运行作业,将结果返回给Driver或写入指定的输出位置。

集群准备

  1. 准备数据,请参考数据准备。
  2. 百度智能云环境准备。
  3. 登录控制台,选择“产品服务->MapReduce BMR”,点击“创建集群”,进入集群创建页,并做如下配置:

    • 设置集群名称
    • 设置管理员密码
    • 关闭日志开关
    • 选择镜像版本“BMR 1.0.0(hadoop 2.7)”
    • 选择内置模板“spark”
  4. 请保持集群的其他默认配置不变,点击“完成”可在集群列表页可查看已创建的集群,当集群状态由“初始化中”变为“空闲中”时,集群创建成功。

Spark Java

程序准备

百度智能云提供的Spark样例程序的代码已上传至:https://github.com/BCEBIGDATA/bmr-sample-java,您可通过GitHub克隆代码至本地设计自己的程序,并上传到对象存储BOS(具体操作详见对象存储BOS入门指南)。

/**
 * Analyze log with Spark.
 */ public class AccessLogAnalyzer { private static final SimpleDateFormat 
logDateFormat = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss Z", Locale.US); 
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
 private static String fetchDate(String gmsTime) { String date; try { date = simpleDateFormat.
format(logDateFormat.parse(gmsTime)); } catch (ParseException e) { date = null; } return date; 
} private static final Pattern LOGPATTERN = Pattern.compile( "(\\S+)\\s+-\\s+\\[
(.*?)\\]\\s+\"(.*?)\"\\s+(\\d{3})\\s+(\\S+)\\s+" + "\"(.*?)\"\\s+\"(.*?)\"\\s+
(.*?)\\s+\"(.*?)\"\\s+(\\S+)\\s+(\\S+)\\s+(\\S+)"); private static Tuple2<String, String> 
extractKey(String line) { Matcher m = LOGPATTERN.matcher(line); if (m.find())
 { String ipAddr = m.group(1); String date = fetchDate(m.group(2)); return new 
Tuple2<>(date, ipAddr); } return new Tuple2<>(null, null); } public static void main(String[]
 args) { if (args.length != 3) { System.err.println("usage: spark-submit com.baidu.
cloud.bmr.spark.AccessLogAnalyzer <input> <pv> <uv>"); System.exit(1); } SparkConf conf 
= new SparkConf().setAppName("AccessLogAnalyzer"); JavaSparkContext sc = new 
JavaSparkContext(conf); // Parses the log to log records and caches the result. 
JavaPairRDD<String, String> distFile = sc.textFile(args[0]).mapToPair
( new PairFunction<String, String, String>() { @Override public Tuple2<String, String>
 call(String s) { return extractKey(s); } }); distFile.cache(); // Changes the log
 info to (date, 1) format, and caculates the page view. JavaPairRDD<String, Integer> 
pv = distFile.mapToPair( new PairFunction<Tuple2<String, String>, String, Integer>()
 { @Override public Tuple2<String, Integer> call(Tuple2<String, String> tuple)
 { return new Tuple2<>(tuple._1(), 1); } }).reduceByKey(new Function2<Integer,
 Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) 
{ return i1 + i2; } }); // Coalesces to 1 partition and saves as file. Notice
 that this is for demo purpose only. pv.coalesce(1, true).saveAsTextFile(args[1]);
 // Changes the log info to (date, remoteAddr) and caculates the unique visitors.
 JavaPairRDD<String, Integer> uv = distFile.groupByKey().mapToPair( new PairFunction
<Tuple2<String, Iterable<String>>, String, Integer>() { @Override public Tuple2<String, Integer>
 call(Tuple2<String, Iterable<String>> tuple) { int size = new HashSet((Collection<?>)
 tuple._2()).size(); return new Tuple2<>(tuple._1(), size); } }); // Coalesces to 1
 partition and saves as file. Notice that this is for demo purpose only.
 uv.coalesce(1, true).saveAsTextFile(args[2]); } }

运行Spark作业

  1. 在“产品服务>MapReduce>MapReduce-作业列表”页中,点击“创建作业”,进入创建作业页。
  2. 配置Spark作业参数,具体如下:

    • 作业类型:选择“Spark作业”。
    • 作业名称:输入作业名称,长度不可超过255个字符。
    • 应用程序位置:若使用您自行编译的程序,请上传程序jar包至BOS或者您本地的HDFS中,并在此输入程序路径;您也可直接使用百度智能云提供的样例程序,路径如下: - 华北-北京区域的集群对应的样例程序路径:bos://bmr-public-bj/sample/spark-1.0-SNAPSHOT.jar。 - 华南-广州区域的集群对应的样例程序路径:bos://bmr-public-gz/sample/spark-1.0-SNAPSHOT.jar。
    • 失败后操作:继续。
    • Spark-submit:--class com.baidu.cloud.bmr.spark.AccessLogAnalyzer
    • 应用程序参数:指定输入数据的路径、结果输出的路径(可选BOS或HDFS),其中输出路径必须具有写权限且该路径不能已存在。以样例日志作为输入数据,BOS作为输出路径为例,输入如下: - 华北-北京区域的BMR集群对应的参数:bos://bmr-public-bj/data/log/accesslog-1k.log bos://{your-bucket}/output/pv bos://{your-bucket}/output/uv。 - 华南-广州区域的BMR集群对应的参数:bos://bmr-public-gz/data/log/accesslog-1k.log bos://{your-bucket}/output/pv bos://{your-bucket}/output/uv。
  3. 在“集群适配”区,选择适配的集群。
  4. 点击“完成”,则作业创建完成;运行中的作业状态会由“等待中”更新为“运行中”,当作业运行完毕后状态会更新为“已完成”,便可查看到查询结果了。

查看结果

请到您所选的存储系统(BOS或HDFS)中查看输出结果,以下是在BOS中查看输出结果的说明:

如果使用系统提供的输入数据和程序,可以在bos://{your-bucket}/output/pv和bos://{your-bucket}/output/uv路径下查看输出结果如下:

------PV------
20151003    139
20151005    372
20151006    114
20151004    375
------UV------
20151003    111
20151005    212
20151006    97
20151004    247

Spark Scala

程序准备

您可以直接使用样例程序。也可设计自己的程序,并在命令行下cd到程序代码的根目录下,执行mvn package生成jar文件,并上传到对象存储BOS(具体操作详见对象存储BOS入门指南)。

运行Spark作业

  1. 在“产品服务>MapReduce>MapReduce-作业列表”页中,点击“创建作业”,进入创建作业页。
  2. 配置Spark作业参数,具体如下:

    • 作业类型:选择“Spark作业”。
    • 作业名称:输入作业名称,长度不可超过255个字符。
    • 应用程序位置:若使用您自行编译的程序,请上传程序jar包至BOS或者您本地的HDFS中,并在此输入程序路径;您也可直接使用百度智能云提供的样例程序,仅华北-北京区域的集群可使用,路径为bos://bmr-public-data/apps/spark/bmr-spark-scala-samples-1.0-SNAPSHOT-jar-with-dependencies.jar。
    • 失败后操作:继续。
    • Spark-submit:--class com.baidubce.bmr.sample.AccessLogStatsScalaSample
    • 应用程序参数:指定输入数据的路径、结果输出的路径(可选BOS或HDFS),其中输出路径必须具有写权限且该路径不能已存在。以样例日志作为输入数据,BOS作为输出路径为例,输入如下: - 华北-北京区域的BMR集群对应的参数:bos://bmr-public-bj/data/log/accesslog-1k.log bos://{your-bucket}/output/pv bos://{your-bucket}/output/uv。 - 华南-广州区域的BMR集群对应的参数:bos://bmr-public-gz/data/log/accesslog-1k.log bos://{your-bucket}/output/pv bos://{your-bucket}/output/uv。
  3. 在“集群适配”区,选择适配的集群。
  4. 点击“完成”,则作业创建完成;运行中的作业状态会由“等待中”更新为“运行中”,当作业运行完毕后状态会更新为“已完成”,便可查看到查询结果了。

查看结果

请到您所选的存储系统(BOS或HDFS)中查看输出结果,以下是在BOS中查看输出结果的说明:

如果使用系统提供的输入数据和程序,可以在bos://{your-bucket}/output/pv和bos://{your-bucket}/output/uv路径下查看输出结果如下:

-----PV-----
(20151003,139)
(20151005,372)
(20151006,114)
(20151004,375)
-----UV-----
(20151003,111)
(20151005,212)
(20151006,97)
(20151004,247)

Spark SQL

配置

所有配置与[Spark Scala](#Spark Scala)一致,只需修改Spark-submit为 --class com.baidubce.bmr.sample.AccessLogStatsSQLSample

查看结果

请到您所选的存储系统(BOS或HDFS)中查看输出结果,以下是在BOS中查看输出结果的说明:

如果使用系统提供的输入数据和程序,可以在bos://{your-bucket}/output/pv和bos://{your-bucket}/output/uv路径下查看输出结果如下:

------PV------
+--------+---+
|    date| pv|
+--------+---+
|20151003|139|
|20151004|375|
|20151005|372|
|20151006|114|
+--------+---+

------UV------
+--------+---+
|    date| uv|
+--------+---+
|20151003|111|
|20151004|247|
|20151005|212|
|20151006| 97|
+--------+---+

相似文档
  • Hive简介: 本文以分析Web日志统计用户访问次数最多的前5个小时段为例,介绍如何在百度智能云平台使用Hive。MapReduce提供了两种使用Hive的方式: 通过Hue使用Hive,请参考Hue。 控制台中提交Hive作业。
  • HBase简介: 本文以分析Web日志统计每天的PV和UV为例,介绍如何在百度智能云平台使用HBase。 HBase是运行在Hadoop上的NoSQL数据库,它是一个分布式的和可扩展的大数据仓库,能够利用HDFS的分布式处理模式和Hadoop的MapReduce程序模型。HBase融合key/value存储模式带来实时查询的能力,以及通过MapReduce进行离线处理或者批处理的能力。
  • Sqoop简介: 本样例场景是:通过Sqoop将RDS上的数据导入Hive,Hive中的数据表的location为BOS路径,Hive数据表的partition为dt(string),根据dt指定日期,区分每一天的导入数据。
  • Pig简介: 本文以分析Web日志统计每天的PV和UV为例,介绍如何在百度智能云平台使用Pig。 Pig是基于Hadoop的大规模数据分析平台,把类SQL的数据分析请求转换为一系列经过优化处理的MapReduce运算。
  • Hue简介: 本文以网站日志分析来介绍可Web访问的Hue服务。开发者可以在Web界面中通过SQL语句就能分析海量日志,大大降低了使用门槛。 Hue为Hadoop数据分析提供了图形界面系统,仅使用浏览器便能够在Hadoop平台上导入数据、处理数据以及分析数据。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部