上云无忧 > 文档中心 > 百度智能云Elasticsearch - 基于ELK构建日志分析系统
Elasticsearch
百度智能云Elasticsearch - 基于ELK构建日志分析系统

文档简介:
介绍: 日志是一个系统不可缺少的组成部分,日志记录系统产生的各种行为,基于日志信息可以对系统进行问题排查,性能优化,安全分析等工作。 使用ELK(Elasticsearch、Logstash、Kibana) 结合Filebeat与Kafka构建日志分析系统,可以方便的对日志数据进行采集解析存储,以及近实时的查询分析。
*此产品及展示信息均由百度智能云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

介绍

日志是一个系统不可缺少的组成部分,日志记录系统产生的各种行为,基于日志信息可以对系统进行问题排查,性能优化,安全分析等工作。 使用ELK(Elasticsearch、Logstash、Kibana) 结合Filebeat与Kafka构建日志分析系统,可以方便的对日志数据进行采集解析存储,以及近实时的查询分析。

各组件在日志系统中的职责与关系如下所示:

  1. Filebeat:采集原始日志数据,输出数据到Kafka;
  2. Kafka:作为消息队列,存储原始日志数据;
  3. Logstash:消费Kafka中数据,解析过滤后,将其存储到Elasticsearch中;
  4. Elasticsearch:存储用于分析的日志数据,并提供查询、聚合的底层能力;
  5. Kibana:提供日志分析的可视化界面;

操作流程:

  1. 环境准备

    包括:创建百度Elasticsearch(简称BES)集群、Kibana服务,安装Filebeat、Logstash、Kafka。

  2. 配置Filebeat:

    配置Filebeat的input为系统日志,output为Kafka,将日志数据采集到Kafka的指定topic中。

  3. 配置Logstash:

    配置Logstash的input为Kafka,output为BES,使用Logstash消费topic中的数据并传输到BES中。

  4. 查看日志消费状态:

    在消息队列Kafka中查看日志数据的消费的状态,验证日志数据是否采集成功。

  5. 通过Kibana查看日志数据:

    在Kibana控制台的Discover页面,查询采集的日志数据。

环境准备

  1. 创建BES集群

    参考集群创建中的内容,完成BES集群的创建。

    本文档中创建的BES集群选择7.4.2版本,Filebeat组件、Logstash组件使用的也是7.4.2版本。

  2. 创建Kibana服务

    在完成BES集群创建后,除了已购买的BES节点,默认会附赠一个单节点的Kibana服务(Kibana节点支持按需自定义配置),如图所示:

    通过上图连接信息中所示的Kibana HTTP URL可以直接访问Kibana服务。

    Kibana用户名密码与BES服务的用户名密码相同,详见账号使用说明

  3. 安装Filebeat

    下载7.4.2的oss版本Filebeat;

    wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-oss-7.4.2-linux-x86_64.tar.gz

    解压安装Filebeat;

    tar -zxvf filebeat-oss-7.4.2-linux-x86_64.tar.gz
  4. 安装Logstash

    下载7.4.2的oss版本Logstash;

    wget https://artifacts.elastic.co/downloads/logstash/logstash-oss-7.4.2.tar.gz

    解压安装Logstash;

    tar -zxvf logstash-oss-7.4.2.tar.gz
  5. 安装、配置Kafka服务

    5.1 下载Zookeeper(Kafka服务依赖Zookeeper);

    wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz

    5.2 解压安装Zookeeper;

    tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz

    5.3 配置Zookeeper;

    vi conf/zoo.cfg

    zoo.cfg内容如下所示:

    tickTime=2000
    dataDir=/your_path/apache-zookeeper-3.6.2-bin/data
    clientPort=2181
    initLimit=5
    syncLimit=2
    admin.serverPort=8888

    配置参数说明,如下所示:

    参数名称 参数说明
    tickTime 服务器之间或客户端与服务器之间维持心跳的时间间隔
    dataDir 数据文件目录
    clientPort 客户端连接端口
    initLimit Leader-Follower初始通信时限
    syncLimit Leader-Follower同步通信时限
    admin.serverPort 内置Jetty绑定的port

    5.4 启动Zookeeper;

    bin/zkServer.sh start

    5.5 下载Kafka;

    wget https://archive.apache.org/dist/kafka/2.0.1/kafka_2.12-2.0.1.tgz

    5.6 解压安装Kafka;

    tar -zxvf kafka_2.12-2.0.1.tgz

    5.7 配置Kafka;

    cd kafka_2.12-2.0.1
    vi config/server.properties

    server.properties内容如下所示:

    broker.id=0
    zookeeper.connect=localhost:2181

    配置参数说明,如下所示:

    参数名称 参数说明
    broker.id Kafka集群实例的唯一标识
    zookeeper.connect Zookeeper的连接地址

    5.8 启动Kafka;

    bin/kafka-server-start.sh config/server.properties

    5.9 创建名称为 logdata 的 topic,用于接收采集数据,命令示例如下所示:

    bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 2 --partitions 20 --topic logdata

    命令参数说明,如下所示:

    参数名称 参数说明
    zookeeper Zookeeper的连接地址
    topic topic的名称
    replication-factor topic的副本数
    partitions topic的分区数量

配置Filebeat

1.创建Filebeat采集配置文件;

cd filebeat-oss-7.4.2-linux-x86_64
 vi filebeat.kafka.yml

filebeat.kafka.yml内容如下:

filebeat.inputs:
   - type: log
     enabled: true
     paths:
         - /your_path/*.log
 
 output.kafka:
     hosts: ["**.**.**.**:9092"]
     topic: logdata
     version: 2.0.1

配置参数说明,如下所示:

参数名称 参数说明
type 输入类型,设置为log,表示输入源为日志
enabled input处理器的开关
paths 需要监控的日志文件的路径
hosts 消息队列Kafka的地址
topic 日志输出到消息队列Kafka的topic
version Kafka的版本

2.启动Filebeat;

./filebeat -e -c filebeat.kafka.yml

配置Logstash

1.创建配置文件logstash.conf;

cd logstash-oss-7.4.2
vi logstash.conf

logstash.conf内容如下:

input {
   kafka {
     bootstrap_servers => ["127.0.0.1:9092"]
     group_id => "log-data"
     topics => ["logdata"] 
     codec => json
  }
}
filter {
}
output {
  elasticsearch {
    hosts => "http://es_ip:es_http_port"
    user =>"your username"
    password =>"your password"
    index => "kafka‐%{+YYYY.MM.dd}"
  }
}

配置参数说明,如下所示:

参数名称 参数说明
bootstrap_servers Kafka的地址
group_id Kafka消费者组id
topics Kafka的topic名称
codec 数据格式
hosts BES服务地址
user BES账号
password BES账号的密码
index 数据写入的索引

2.启动Logstash;

bin/logstash -f logstash.conf

更多Logstash的使用说明,详见Logstash使用指南。

查看日志消费状态

使用如下命令查看消费状态;

bin/kafka-consumer-groups.sh  --bootstrap-server 127.0.0.1:9092 --group log-data --describe

命令参数说明,如下所示:

参数名称 参数说明
bootstrap_servers Kafka的地址
group Kafka消费者组

执行结果如下所示:

结果属性说明,如下所示:

参数名称 参数说明
topic Kafka的topic名称
partition topic的分区id
current-offset consumer消费的具体位移
log-end-offset partition的最高位移
lag 堆积量
consumer-id 消费者id

通过Kibana查看日志数据

完成以下的配置后,即可通过Kibana查看日志数据。

1.创建索引模式

在左侧导航栏,点击Management(下图中红框位置)后,在Kibana区域点击Index Patterns(下图中蓝框位置),然后再点击Create index pattern按钮(下图绿框位置)

在Index pattern处(下图红框位置)填入kafka*,然后点击点击Next Step按钮(下图中蓝框位置)

为Time Filter field name选择@timestamp字段(下图中红框位置),然后点击Create index pattern按钮(下图中蓝框位置)

2.查看日志数据

在左侧导航栏,点击Discover(下图中红框位置)后,即可以查询日志数据

更多Kibana的使用说明,详见Kibana使用指南中的内容。

相似文档
  • Elasticsearch系统常见问题: 如何查看Es安装了哪些插件? 线程池队列满导致错误? Too Many Open Files的错误? Es 中一个分片一般设置多大? 当集群为red或者yellow的时候怎么办? 如何cancel掉慢查询?
  • 本文档主要目的: 用来积累用户常见的问题。 文档中的操作可通过 Kibana 或在chrome浏览器中安装sense插件完成。用户也可以通过curl的方式自行实现。
  • 说明: 本文档主要介绍了通过elasticsearch-hadoop中的Spark访问ES时常见配置项意义。本文中的es-spark是elasticsearch-hadoop中和Spark相关联的包,用户通过自己的Spark集群读写ES集群,elasticsearch-hadoop基本上兼容了目前ES所有的版本。
  • 协议生效时间:2020 年 9 月 1 日。 本服务等级协议(Service Level Agreement,以下简称 “SLA”)规定了百度智能云向用户提供的Elasticsearch的服务可用性等级指标及赔偿方案。
  • 数据仓库 Palo 是百度智能云上提供的PB级别的MPP数据仓库服务,以较低的成本提供在大数据集上的高性能分析和报表查询功能。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部