上云无忧 > 文档中心 > 百度智能云MapReduce开源组件介绍 - Flink
百度智能云MapReduce开源组件介绍 - Flink

文档简介:
Flink简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。
*此产品及展示信息均由百度智能云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

Flink简介

Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。

创建集群

登录百度智能云控制台,选择“产品服务->MapReduce BMR”,点击“创建集群”,进入集群创建页。BMR2.0.0及以上版本已支持 Flink 组件集成,购置集群时勾选 Flink 组件即可, 如下图所示:

使用简介

  1. 远程登录到创建好的集群中

    ssh hdfs@$public_ip
    使用创建集群时输入的密码

  2. 运行WordCount作业 (未开启Kerberos认证)
  • 先上传一份文件到HDFS中

    hdfs dfs -put /etc/hadoop/conf/core-site.xml /tmp

  • 执行如下命令,在Yarn上提交作业作业:

    flink run --jobmanager yarn-cluster \
    -yn 1 \
    -ytm 1024 \
    -yjm 1024 \
    /opt/bmr/flink/examples/batch/WordCount.jar \
    --input hdfs://$ACTIVE_NAMENODE_HOSTNAME:$PORT/tmp/core-site.xml \
    --output hdfs://$ACTIVE_NAMENODE_HOSTNAME:$PORT/tmp/out
  • 作业成功提交后,最终的运行结果如下:

实时流计算(Scala)

数据计算过程通过BMR的Flink消费百度消息服务BMS。本文以使用Scala为例,Flink版本1.8.2,线上Kafka版本2.1。具体步骤如下:

第一步 创建Topic并下载百度消息服务的证书

(本步骤详情请参考文档 Spark流式应用场景)

下载证书:

第二步 编写业务代码

package com.baidu.inf.flink
 
import java.util.Properties
 
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.LoggerFactory
 
object CloudFlinkConsumeKafkaDemo {
  private val logger = LoggerFactory.getLogger(this.getClass)
 
  def main(args: Array[String]): Unit = {
    logger.info("************ Flink Consume Kafka Demo start **************")
    if (args.length < 7) {
      logger.error(" Parameters Are Missing , " +
        "Needs : <topic> " +
        "<groupId> " +
        "<brokerHosts> " +
        "<truststore_location> " +
        "<truststore_pass> " +
        "<keystore_location> " +
        "<keystore_pass>")
      System.exit(-1)
    }
    val Array(topic, groupId, brokerHosts,
    truststore_location, truststore_pass,
    keystore_location, keystore_pass, _*) = args
 
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)
    env.getConfig.disableSysoutLogging
 
    val kafkaProperties = new Properties()
    kafkaProperties.setProperty("bootstrap.servers", brokerHosts)
    kafkaProperties.setProperty("key.deserializer", classOf[StringDeserializer].getName)
    kafkaProperties.setProperty("value.deserializer", classOf[StringDeserializer].getName)
    kafkaProperties.setProperty("group.id", groupId)
    kafkaProperties.setProperty("auto.offset.reset", "latest")
    kafkaProperties.setProperty("serializer.class", "kafka.serializer.StringEncoder")
    kafkaProperties.setProperty("security.protocol", "SSL")
    kafkaProperties.setProperty("ssl.truststore.location", truststore_location)
    kafkaProperties.setProperty("ssl.truststore.password", truststore_pass)
    kafkaProperties.setProperty("ssl.keystore.location", keystore_location)
    kafkaProperties.setProperty("ssl.keystore.password", keystore_pass)
    kafkaProperties.setProperty("enable.auto.commit", "true")
 
    val ds = env.addSource(
      new FlinkKafkaConsumer[String](topic,
        new SimpleStringSchema(),
        kafkaProperties))
 
    ds.print()
    env.execute()
  }
}

第三步 编译代码,打成可执行Jar文件,上传到服务器上

(注:要保证第一步下载的证书文件在集群每个节点上相同的路径下都存在)

运行作业示例:
flink run --jobmanager yarn-cluster -yn 1 -ytm 1024 -yjm 1024 /root/flink-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar "676c4bb9b72c49c7bd3b089c181af9ec__demo02" "group1" "kafka.fsh.baidubce.com:9091" "/tmp/client.truststore.jks" "kafka" "/tmp/client.keystore.jks" "0yw0ckrt"

第四步 消息队列中生产一些消息,在Flink作业监控页面上查看对应输出

通过Tunnel登录到集群的Yarn页面上(通过SSH-Tunnel访问集群)

在yarn console找到对应作业的application的单击application名称,进入作业详情页面:
(在Flink的原生页面上,点击TaskManagers > Stdout,查看作业运行情况)

参考

  1. Flink应用场景
  2. Release Notes - Flink 1.8
相似文档
  • Druid简介: Druid是一个高性能的实时数据分析系统,由MetaMarkets公司在2012开源,专门为OLAP场景而设计。Druid遵从Lambda架构,支持批量和实时两种方式导入数据,并提供高性能的数据查询。
  • Impala简介: Impala是Cloudera公司主导开发的MPP架构的查询系统,它提供SQL语义,能够快速的查询存储在HDFS、HBASE中的数据。此外Impala使用与Hive相同的元数据、SQL语法、ODBC驱动。
  • ClickHouse简介: ClickHouse是一个用于联机分析(OLAP)的列式数据库管理系统(DBMS)。它是由俄罗斯搜索引擎公司Yandex开发,并于2016年6月发布的开源DBMS,与Hadoop,Spark相比,ClickHouse轻量很多。
  • 分布式概述: Kafka是一个分布式,多分区,多副本的流处理消息中间件,具备高吞吐量、高可扩展性、可持久化等特性,广泛应用于实时数据传输、日志收集、实时监控数据聚合等实际应用场景中。BMR中的Kafka是基于开源社区的Kafka,提供一种可快速部署,用户独享的Kafka集群。
  • 1.什么是Alluxio? Alluxio 是世界上第一个面向基于云的数据分析和人工智能的开源的数据编排技术。 它为数据驱动型应用和存储系统构建了桥梁, 将数据从存 储层移动到距离数据驱动型应用更近的位置从而能够更容易被访问。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部