# 第一章 ElasticStack
## 1 概述
ElasticStack 表示的是一套技术栈,包括但不限于elasticsearch,Logstash,kibana,beats(filebeat,metricsbeat),filebeat采集数据,elastic存储数据,kibana展示数据
| 名字 | 说明 |
| ————– | ——————– |
| ElasticSrearch | 数据存储,检索 |
| Filebeat | 采集数据 |
| logstash | 数据拉取,转换,处理 |
| kibana | 数据展示 |
| kafka | 消息队列 |
## 2 ElasticSearch单机
“`sh
1 下载
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.17.26-amd64.deb
2 安装
dpkg -i elasticsearch-7.17.26-amd64.deb
3 配置
egrep ^[a-z] /etc/elasticsearch/elasticsearch.yml
cluster.name: ysl-forest
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
discovery.type: single-node
4 相关参数说明:
cluster.name
指定ES集群的名称
path.data
数据存储目录。
path.logs
表示日志存储目录。
network.host
暴露ES的IP地址。
discovery.type
部署ES的类型,此处表示的是单点。
5 启动服务
systemctl start elasticsearch.service
6 查看端口
ss -ntl | egrep “9[2|3]00”
7 端口说明:
9200:
对外提供服务的端口,支持http|https协议。
9300:
ES集群数据传输,master选举的端口,使用tcp协议。
8 访问查看
curl -X GET 10.0.0.91:9200/_cat/nodes
“`
## 3 ElasticSearch集群
“`sh
1 配置
grep ^[a-z] /etc/elasticsearch/elasticsearch.yml
cluster.name: ysl-forest
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
discovery.seed_hosts: [“10.0.0.91”, “10.0.0.92”,”10.0.0.93″]
cluster.initial_master_nodes: [“10.0.0.91”, “10.0.0.92”,”10.0.0.93″]
2 相关参数说明:
discovery.seed_hosts
当前集群的主机列表。
cluster.initial_master_nodes:
第一次启动时,可以选举为master的主机列表。
3 3个节点都配置
4 启动
systemctl start elasticsearch
5 访问每个节点
curl 10.0.0.91:9200
curl 10.0.0.92:9200
curl 10.0.0.93:9200
6 查看集群状态 每个节点均可执行
curl 10.0.0.93:9200/_cat/nodes?v
“`
## 4 脑裂快速解决方案
“`sh
1.停止所有节点的服务
systemctl stop elasticsearch.service
2.检查配置文件是否正确,每个节点执行,或者掉了哪个节点就在哪个节点执行
grep ^[a-z] /etc/elasticsearch/elasticsearch.yml
cluster.name: ysl-forest
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
discovery.seed_hosts: [“10.0.0.91”, “10.0.0.92”,”10.0.0.93″]
cluster.initial_master_nodes: [“10.0.0.91”, “10.0.0.92”,”10.0.0.93″]
3.清空有问题的数据
rm -rf /var/{lib,log}/elasticsearch/* /tmp/*
4.所有节点重启启动ES服务
systemctl start elasticsearch.service
5.验证测试
curl 10.0.0.93:9200/_cat/nodes?v
ip heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name
10.0.0.91 45 96 0 0.08 0.02 0.01 cdfhilmrstw * elk91
10.0.0.93 17 96 0 0.05 0.01 0.00 cdfhilmrstw – elk93
10.0.0.92 23 97 0 0.08 0.02 0.01 cdfhilmrstw – elk92
“`
## 5 ES常见术语
“`sh
– ES集群的常见术语
– 索引: index
索引是ES数据的存储逻辑单元,用于客户端的数据读写。
– 分片: shard
一个所有可以有1个或多个分片,可以实现数据的分布式存储。
– 副本: replica
副本分片,可以对分片数据进行备份,可以实现数据读取的负载均衡。
主分片负责数据的读写,而副本分片只能实现数据读取。
– 文档: document
是用户的实际存储数据。
文档存储在对应的分片中的。
文档中分为元数据和源数据。
元数据:用于描述源数据的数据。
源数据: 用户实际存储的数据。
面试题: ES集群的颜色有哪些?分别代表什么含义?
– GREEN:
表示健康状态,所有的主分片和副本分片均能正常访问。
– YELLOW:
表示亚健康状态,表示有部分副本分片无法访问。
– RED:
表示不健康状态,表示有部分主分片无法访问。
“`
## 6 部署kibana
“`sh
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.17.26-amd64.deb
dpkg -i kibana-7.17.26-amd64.deb
grep ^[a-z] kibana.yml
server.port: 5601
server.host: “0.0.0.0”
elasticsearch.hosts: [“http://elk01:9200″,”http://elk02:9200″,”http://elk03:9200”]
i18n.locale: “zh-CN”
相关参数说明:
server.port
暴露的端口号。
server.host
监听的IP地址。
elasticsearch.hosts
ES集群的地址。
i18n.locale
指定语言。
systemctl start kibana
root@elk01:/etc/kibana# ss -ntl | grep 5601
“`
## 7 filebeat
“`sh
部署
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.17.26-amd64.deb
dpkg -i filebeat-7.17.26-amd64.deb
配置
cat /etc/filebeat/config/01-stdin-to-console.yaml
filebeat.inputs:
– type: stdin
output.console:
pretty: true
cat /etc/filebeat/config/01-stdin-to-console.yaml
filebeat.inputs:
– type: stdin
output.console:
pretty: true
[root@elk92 ~]# filebeat -e -c /etc/filebeat/config/01-stdin-to-console.yaml
“`
### 1 filebeat采集本地文件
“`sh
– filebeat采集本地文件案例
1.编写filebeat的配置文件
cat /etc/filebeat/config/02-log-to-console.yaml
filebeat.inputs:
– type: log
paths:
– /tmp/stu*.txt
output.console:
pretty: true
温馨提示:
– 1.filebeat默认按行读取数据;
– 2.如果filebeat数据目录不存在,则每次启动filebeat时会自动创建;
– 3.如果filebeat数据目录存在,则filebeat重启时并不会重复采集数据,如果想要从头采集数据,可以删除filebeat的数据目录
“`
### 2 filebeat采集数据到ES集群
“`sh
– filebeat采集数据到ES集群
1.编写配置文件
cat /etc/filebeat/config/03-log-to-es.yaml
filebeat.inputs:
– type: log
paths:
– /tmp/stu*.txt
output.elasticsearch:
hosts: [“http://10.0.0.91:9200″,”http://10.0.0.92:9200″,”http://10.0.0.93:9200”]
rm -rf /var/lib/filebeat/
filebeat -e -c /etc/filebeat/config/03-log-to-es.yaml
“`
kibana查看数据



### 3 filebeat自定义索引和分片副本
“`sh
1.编写filebeat的配置文件
cat /etc/filebeat/config/04-log-to-es_custom_index.yaml
filebeat.inputs:
– type: log
paths:
– /tmp/stu*.txt
output.elasticsearch:
hosts:
– “http://10.0.0.91:9200”
– “http://10.0.0.92:9200”
– “http://10.0.0.93:9200”
index: “ysl-forest-filebeat-%{+yyyy.MM.dd}”
# 禁用索引生命周期,若启用则自定义索引失效。
setup.ilm.enabled: false
# 索引模板的名称,可以自定义
setup.template.name: ysl-forest
# 定义索引的匹配模式
setup.template.pattern: “ysl-forest-filebeat*”
# 配置索引模板
setup.template.settings:
# 自定义索引的分片数量
index.number_of_shards: 3
# 自定义副本的数量
index.number_of_replicas: 0
# 如果索引模板已经存在,则覆盖已经存在的模板,默认值为false
# 生产环境中建议设置false,因为改为true会降低性能.
setup.template.overwrite: true
2.启动filebeat实例
filebeat -e -c /etc/filebeat/config/04-log-to-es_custom_index.yaml
“`
kibana查看







### 4 filebeat采集nginx
“`sh
cat 05-nginx-to-es.yaml
filebeat.inputs:
– type: log
paths:
– /root/access.log*
output.elasticsearch:
hosts: [“http://10.0.0.91:9200″,”http://10.0.0.92:9200″,”http://10.0.0.93:9200”]
index: “ysl-nginx-accesslog-%{+yyyy.MM.dd}”
setup.ilm.enabled: false
setup.template.name: ysl-nginx
setup.template.pattern: ysl-nginx*
setup.template.settings:
index.number_of_shards: 3
index.number_of_replicas: 0
setup.template.overwrite: false
“`
### 5 多个filebeat输入源写入不同的es索引
“`sh
打标签tags
cat 06-multiple-input-to-es.yaml
filebeat.inputs:
– type: log
paths:
– /var/log/syslog*
tags: syslog
– type: log
paths:
– /var/log/kerne*
tags: messagelog
output.elasticsearch:
hosts:
– “http://10.0.0.91:9200”
– “http://10.0.0.92:9200”
– “http://10.0.0.93:9200”
indices:
– index: “ysl-syslog-%{+yyyy.MM.dd}”
when.contains:
tags: “syslog”
– index: “ysl-message-%{+yyyy.MM.dd}”
when.contains:
tags: “messagelog”
setup.ilm.enabled: false
setup.template.name: ysl
setup.template.pattern: “ysl*”
setup.template.settings:
index.number_of_shards: 3
index.number_of_replicas: 0
setup.template.overwrite: false
“`
## 8 elastic下载和查看文档
### 1 elastic search
https://elastic.co






### 2 kibana




### 3 filebeat



## 9 filebeat模块管理
“`sh
1 查看
filebeat modules list
结果中的
Enabled:
Disabled:
表示启用和禁用的模块
2 启用和禁用
filebeat modules enable + 模块名
filebeat modules enable nginx tomcat
Enabled nginx
Enabled tomcat
filebeat modules disable nginx
Disabled nginx
3 模块管理的地城逻辑
root@elk02:/etc/filebeat/modules.d# ll
total 308
drwxr-xr-x 2 root root 4096 Feb 24 17:05 ./
drwxr-xr-x 4 root root 4096 Feb 21 12:17 ../
-rw-r–r– 1 root root 484 Nov 13 23:20 activemq.yml.disabled
-rw-r–r– 1 root root 476 Nov 13 23:20 apache.yml.disabled
-rw-r–r– 1 root root 281 Nov 13 23:20 auditd.yml.disabled
其实就是修改文件
4 filebeat使用nginx模块案例
filebeat modules enable nginx
Enabled nginx
egrep -v “^.*#|^$” /etc/filebeat/modules.d/nginx.yml
– module: nginx
access:
enabled: true
var.paths: [“/root/access.log*”]
error:
enabled: true
ingress_controller:
enabled: false
5 编写filebeat的配置文件写入es集群
cat 07-modules-to-es.yaml
filebeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
output.elasticsearch:
hosts:
– “http://10.0.0.91:9200”
index: “ysl-forest-modules-nginx-accesslog-%{+yyyy.MM.dd}”
setup.ilm.enabled: false
setup.template.name: ysl-forest-modules-nginx
setup.template.pattern: “ysl-forest-modules-nginx*”
setup.template.settings:
index.number_of_shards: 1
index.number_of_replicas: 0
setup.template.overwrite: true
“`
## 10 kibana展示

### 1 PV
页面访问量,一条对一个PV






### 2 IP统计




### 3 带宽统计









### 4 地图



### 5 仪表盘





## 11 filestream模块

官网上说在7.16版本的时候log模块已经弃用,推荐使用filestream
“`sh
cat 08-filestream-to-es.yaml
filebeat.inputs:
– type: filestream
paths:
– /tmp/test.log
output.elasticsearch:
hosts:
– “http://10.0.0.91:9200”
index: “ysl-filestream-%{+yyyy.MM.dd}”
setup.ilm.enabled: false
setup.template.name: ysl-filestream
setup.template.pattern: “ysl-filestream*”
setup.template.settings:
index.number_of_shards: 1
index.number_of_replicas: 0
setup.template.overwrite: false
“`
## 12 filebeat多行匹配-固定行数匹配
“`dh
测试数据
cat >>/tmp/test_count.log<
基于配置文件启动logstash
cat /etc/logstash/conf.d/01-stdin-to-stdout.conf
# 表示数据从哪里来
input {
# 表示从标准输入来
stdin {
type => stdin
}
}
# 表示数据到哪里去
output {
# 表示从标准输出去
stdout {
codec => rubydebug
# codec => json
}
}
logstash -f /etc/logstash/conf.d/01-stdin-to-stdout.conf
如果想要从头采集数据,可以删除Logstash的数据目录。
rm -rf /usr/share/logstash/data/
对接文件
cat 02-file-to-stdout.conf
input {
file {
# 首次采集文件时从哪开始读取数据
start_position => “beginning”
# 指定文件的路径
path => “/tmp/testlogstash.log”
}
}
output {
stdout {}
}
echo testlogstash >> /tmp/testlogstash.log
logstash -f /etc/logstash/conf.d/02-file-to-stdout.conf
“`
## 16 ELFK架构案例-filebeat对接logstash
“`sh
logstash配置
cat 03-beats-to-es.conf
input {
beats {
port => 5044
}
}
output {
stdout {}
}
logstash -r -f /etc/logstash/conf.d/03-beats-to-es.conf
-f 指定配置文件
-r 热加载,修改配置文件后不需要重启logstash
ss -ntl | grep 5044
LISTEN 0 4096 *:5044 *:*
此时5044已被监听
filebeat配置
ss -ntl | grep 9000
LISTEN 0 4096 *:9000 *:*
cat /etc/filebeat/config/12-tcp-to-logstash.yaml
filebeat.inputs:
– type: tcp
# 数据从本地的9000端口来
host: “0.0.0.0:9000”
# 数据到Logstash去
output.logstash:
hosts: [“10.0.0.91:5044”]
filebeat -e -c /etc/filebeat/config/12-tcp-to-logstash.yaml
echo www.yangsenlin.top | nc 127.0.0.1 9000
最终数据会到logstash上去
“`

“`sh
将数据发送到elasticsearch,然后再kibana上展示
cat 03-beats-to-es.conf
input {
beats {
port => 5044
}
}
# 可以对event事件进行过滤处理。
filter {
mutate {
remove_field => [ “agent”,”ecs”,”@version”,”host”,”log”,”input”,”tags”]
}
}
output {
stdout {}
elasticsearch {
# 指定ES集群的地址,多个地址用[]和逗号
hosts => “10.0.0.91:9200”
# 指定ES的索引
index => “ysl-elfk-logstash”
}
}
echo www.yangsenlin.top | nc 127.0.0.1 9000
“`

到kibana上查看

## 17 ELK分析数据






## 18 filebeat采集docker日志
“`sh
安装docker导入nginx镜像启动docker容器
docker run -d –name myweb -p 81:80 nginx:1.24-alpine
在宿主机上访问几次
for i in `seq 3`;do curl 10.0.0.91:81;done
查看日志
docker logs -f myweb
“`
### 1 第一种方法
“`sh
cat /etc/filebeat/config/13-docker-nginx-to-es.yaml
filebeat.inputs:
– type: log
paths:
– /var/lib/docker/containers/*.log
output.elasticsearch:
hosts: [“http://10.0.0.91:9200”]
index: “ysl-docker-nginx-accesslog-%{+yyyy.MM.dd}”
setup.ilm.enabled: false
setup.template.name: ysl-docker-nginx
setup.template.pattern: ysl-docker-nginx*
setup.template.settings:
index.number_of_shards: 3
index.number_of_replicas: 0
setup.template.overwrite: false
“`
### 2 第二种方法
https://www.elastic.co/guide/en/beats/filebeat/7.17/filebeat-input-docker.html

“`sh
cat 13-docker-nginx-to-es.yaml
filebeat.inputs:
– type: docker
#容器相关参数
containers:
##指定容器id
ids:
– ‘*’
#容器日志存储路径,默认值“/var/lib/docker/containers”
path: “/var/lib/docker/containers”
#指定采集的数据流类型,支持stderr,stdout,all(default)
stream: “stdout”
output.elasticsearch:
hosts: [“http://10.0.0.91:9200”]
index: “ysl-docker-nginx-accesslog-%{+yyyy.MM.dd}”
setup.ilm.enabled: false
setup.template.name: ysl-docker-nginx
setup.template.pattern: ysl-docker-nginx*
setup.template.settings:
index.number_of_shards: 1
index.number_of_replicas: 1
setup.template.overwrite: false
“`

### 3 第三种方法
“`sh
使用filebeat模块
filebeat modules list
filebeat modules enable docker
Module docker doesn’t exist!
没有docker模块
使用container模块
“`

“`sh
cat 14-container-nginx-to-es.yaml
filebeat.inputs:
– type: container
paths:
– ‘/var/lib/docker/containers/*/*.log’
output.elasticsearch:
hosts: [“http://10.0.0.91:9200”]
index: “ysl-docker-nginx-accesslog-%{+yyyy.MM.dd}”
setup.ilm.enabled: false
setup.template.name: ysl-docker-nginx
setup.template.pattern: ysl-docker-nginx*
setup.template.settings:
index.number_of_shards: 1
index.number_of_replicas: 1
setup.template.overwrite: false
“`

## 19 filebeat多实例
“`sh
第一个实例
filebeat -e -c /etc/filebeat/config/13-docker-nginx-to-es.yaml
第二个实例
filebeat -e -c /etc/filebeat/config/14-container-nginx-to-es.yaml –path.data /test/filebeat-container
“`
## 20 filebeat mutate插件
“`sh
准备测试数据
cat >>generate_log.py<
}
}
# 可以对event事件进行过滤处理。
filter {
mutate {
remove_field => [ “agent”,”ecs”,”@version”,”host”,”log”,”input”,”tags”]
}
}
output {
stdout {}
# elasticsearch {
# # 指定ES集群的地址
# hosts => “10.0.0.91:9200”
#
# # 指定ES的索引
# index => “ysl-elfk-logstash”
# }
}
logstash -rf /etc/logstash/conf.d/04-beats-apps-mutate-es.conf
浏览以下网址查找strip
https://www.elastic.co/guide/en/logstash/7.17/plugins-filters-mutate.html
cat 04-beats-apps-mutate-es.conf
input {
beats {
port => 5001
}
}
# 可以对event事件进行过滤处理。
filter {
mutate {
remove_field => [ “agent”,”ecs”,”@version”,”host”,”log”,”input”,”tags”]
}
mutate {
split => { “message” => “|” }
}
}
output {
stdout {}
# elasticsearch {
# # 指定ES集群的地址
# hosts => “10.0.0.91:9200”
#
# # 指定ES的索引
# index => “ysl-elfk-logstash”
# }
}
“`

以上是将字段分隔
“`sh
以下是增加字段
cat 04-beats-apps-mutate-es.conf
input {
beats {
port => 5001
}
}
# 可以对event事件进行过滤处理。
filter {
mutate {
remove_field => [ “agent”,”ecs”,”@version”,”host”,”log”,”input”,”tags”]
}
mutate {
split => { “message” => “|” }
add_field => {
“other” => “%{[message][0]}”
“userid” => “%{[message][1]}”
“action” => “%{[message][2]}”
“svip” => “%{[message][3]}”
“price” => “%{[message][4]}”
}
}
}
output {
stdout {}
# elasticsearch {
# # 指定ES集群的地址
# hosts => “10.0.0.91:9200”
#
# # 指定ES的索引
# index => “ysl-elfk-logstash”
# }
}
“`

“`sh
完整处理流程
input {
beats {
port => 5001
}
}
filter {
mutate {
split => { “message” => “|” }
add_field => {
“other” => “%{[message][0]}”
“userid” => “%{[message][1]}”
“action” => “%{[message][2]}”
“svip” => “%{[message][3]}”
“price” => “%{[message][4]}”
}
}
mutate {
split => { “other” => ” “}
add_field => {
“dt” => “%{[other][1]} %{[other][2]}”
}
remove_field => [ “agent”,”ecs”,”@version”,”host”,”log”,”input”,”tags”,”message”,”other”]
}
}
#output {
# stdout {}
elasticsearch {
# 指定ES集群的地址
hosts => [“10.0.0.91:9200”]
# 指定ES的索引
index => “ysl-elfk-apps-mutate”
}
}
“`


## 21 filebeat date插件
““sh
cat 05-beats-apps-date-es.conf
input {
beats {
port => 5001
}
}
filter {
mutate {
split => { “message” => “|” }
add_field => {
“other” => “%{[message][0]}”
“userid” => “%{[message][1]}”
“action” => “%{[message][2]}”
“svip” => “%{[message][3]}”
“price” => “%{[message][4]}”
}
}
mutate {
split => { “other” => ” “}
add_field => {
“dt” => “%{[other][1]} %{[other][2]}”
}
remove_field => [ “agent”,”ecs”,”@version”,”host”,”log”,”input”,”tags”,”message”,”other”]
}
date {
match => [ “dt”, “yyyy-MM-dd HH:mm:ss” ]
# 如果不定义,则默认会覆盖”@timestamp”字段
target => “ysl-datetime”
}
}
output {
stdout {}
elasticsearch {
hosts => [“10.0.0.91:9200”]
index => “ysl-elfk-apps-date-%{+yyyy.MM.dd}”
}
}
““



## 22 filebeat grok geoip 插件
“`sh
https://www.elastic.co/guide/en/logstash/7.17/introduction.html
使用grok groip处理nginx、tomcat日志
会使用到以下的模块,查看模块内容可以得到是怎么回事
find /usr/share/logstash/ -name httpd
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.3.4/patterns/legacy/httpd
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.3.4/patterns/ecs-v1/httpd
cat /usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-patterns-core-4.3.4/patterns/legacy/httpd
HTTPDUSER %{EMAILADDRESS}|%{USER}
HTTPDERROR_DATE %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}
# Log formats
HTTPD_COMMONLOG %{IPORHOST:clientip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] “(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})” (?:-|%{NUMBER:response}) (?:-|%{NUMBER:bytes})
HTTPD_COMBINEDLOG %{HTTPD_COMMONLOG} %{QS:referrer} %{QS:agent}
# Error logs
HTTPD20_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[%{LOGLEVEL:loglevel}\] (?:\[client %{IPORHOST:clientip}\] ){0,1}%{GREEDYDATA:message}
HTTPD24_ERRORLOG \[%{HTTPDERROR_DATE:timestamp}\] \[(?:%{WORD:module})?:%{LOGLEVEL:loglevel}\] \[pid %{POSINT:pid}(:tid %{NUMBER:tid})?\]( \(%{POSINT:proxy_errorcode}\)%{DATA:proxy_message}:)?( \[client %{IPORHOST:clientip}:%{POSINT:clientport}\])?( %{DATA:errorcode}:)? %{GREEDYDATA:message}
HTTPD_ERRORLOG %{HTTPD20_ERRORLOG}|%{HTTPD24_ERRORLOG}
# Deprecated
COMMONAPACHELOG %{HTTPD_COMMONLOG}
COMBINEDAPACHELOG %{HTTPD_COMBINEDLOG}
“`

“`sh
“`
“`sh
cat 16-nginx-to-logstash.yaml
filebeat.inputs:
– type: filestream
paths:
– /root/access.log*
output.logstash:
hosts: [“10.0.0.91:5044”]
cat 06-nginx-grok_geoip_useragent_date-es.conf
input {
beats {
port => 5044
}
}
filter {
mutate {
remove_field => [ “agent”,”ecs”,”@version”,”host”,”log”,”input”,”tags”]
}
grok {
match => {
“message” => “%{HTTPD_COMMONLOG}”
}
}
useragent {
source => “message”
target => “ysl-useragent”
}
geoip {
source => “clientip”
}
date {
# “04/Jan/2025:10:53:54 +0800”
match => [ “timestamp”, “dd/MMM/yyyy:HH:mm:ss Z” ]
}
}
output {
stdout {}
elasticsearch {
hosts => [“10.0.0.91:9200”]
index => “ysl-elfk-nginx-%{+yyyy-MM-dd}”
}
}
“`
## 23 ElasticStack故障排查思路
“`sh
Kibana如果查询不到数据,可能是由什么原因呢?
– Filebeat端存在问题的可能性:
– filebeat挂掉无法采集数据;
– 配置文件和实际采集的数据不对应;
– 源数据文件为空,未能写入;
– 数据已经采集过了,本地缓存offset未清空;
– logstash和Filebeat同理,也会存在类似的问题。
– ES集群挂掉,导致kibana无法查询数据;
– kibana的时间选择有问题,也会查询不到数据;
– kibana做了KQL数据过滤,也可能导致数据查询不到;
– kibana的索引被删除,索引模式不生效;
“`
## 24 logstash 多实例
“`sh
第一个logstash正常启动
第二个logstash如下
logstash -rf /etc/logstash/conf.d/06-nginx-grok_geoip_useragent_date-es.conf –path.data /tmp/logstash
修改5044,添加–path.data /tmp/logstahs(根据实际修改)
“`
## 25 logstash if多分支语句
“`sh
cat /etc/logstash/conf.d/07-if-tcp.conf
input {
tcp {
port => 6666
type => “xixi”
}
tcp {
port => 7777
type => “haha”
}
tcp {
port => 8888
type => “heihei”
}
}
filter {
mutate {
remove_field => [ “@version”,”port” ]
}
if [type] == “xixi” {
mutate {
add_field => {
“school” => “ysl”
}
}
} else if [type] == “haha” {
mutate {
add_field => {
“class” => “forest”
}
}
} else {
mutate {
add_field => {
“address” => “哈哈”
}
}
}
}
output {
stdout {}
if [type] == “xixi” {
elasticsearch {
hosts => [“10.0.0.91:9200”]
index => “ysl-elfk-logstash-xixi”
}
} else if [type] == “haha” {
elasticsearch {
hosts => [“10.0.0.91:9200”]
index => “ysl-elfk-logstash-haha”
}
}else {
elasticsearch {
hosts => [“10.0.0.91:9200”]
index => “ysl-elfk-logstash-heihei”
}
}
}
“`
## 26 logstash pipeline
“`sh
所谓的pipeline就是Logstash的input,filter,output为一个整体的描述,我们称之为pipeline,默认Logstash仅有一个main的pipeline。
pipeline核心思想就是”拆“
以上多分支语句可以写成如下
cat > /etc/logstash/conf.d/08-pipeline-xixi.conf <
type => “xixi”
}
}
filter {
mutate {
remove_field => [ “@version”,”port” ]
add_field => {
“school” => “ysl”
}
}
}
output {
# stdout {}
elasticsearch {
hosts => [“10.0.0.91:9200”]
index => “ysl-forest-elfk-logstash-xixi”
}
}
EOF
cat > /etc/logstash/conf.d/09-pipeline-haha.conf <
type => “haha”
}
}
filter {
mutate {
add_field => {
“class” => “forest”
}
remove_field => [ “@version”,”port” ]
}
}
output {
# stdout {}
elasticsearch {
hosts => [“10.0.0.91:9200”]
index => “ysl-forest-elfk-logstash-haha”
}
}
EOF
cat > /etc/logstash/conf.d/10-pipeline-heihei.conf <
type => “heihei”
}
}
filter {
mutate {
add_field => {
“address” => “嘿嘿”
}
remove_field => [ “@version”,”port” ]
}
}
output {
# stdout {}
elasticsearch {
hosts => [“10.0.0.91:9200”]
index => “ysl-forest-elfk-logstash-heihei”
}
}
EOF
单独使用logstash -r也是起logstash,只不过会用/etc/logstash/pipelines.yml配置文件,所以需要将配置文件都写入pipeline.yml里即可
tail -6 /etc/logstash/pipelines.yml
– pipeline.id: xixi
path.config: “/etc/logstash/conf.d/08*.conf”
– pipeline.id: haha
path.config: “/etc/logstash/conf.d/09*.conf”
– pipeline.id: heihei
path.config: “/etc/logstash/conf.d/10*.conf”
logstash -r默认会加载/usr/share/logstash/config/pipelines.yml,直接起会报错,需要建目录和软连接
mkdir -pv /usr/share/logstash/config/
ln -svf /etc/logstash/pipelines.yml /usr/share/logstash/config/pipelines.yml
logstash -r
kibana查看数据
“`
## 27 elasticsearch原理
“`sh
数据写到es集群其实就是往索引写入,索引将数据存储到对应的分片
curl 10.0.0.91:9200/_cat/nodes?v
ip heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name
10.0.0.91 54 21 6 0.77 1.47 0.79 cdfhilmrstw * elk01
其中”cdfhilmrstw”代表ES支持的各种角色,其中m表示master,d代表数据节点,c代表的是协调节点。
ES集群半数以上节点宕机,则集群服务不可用!
– ES的原理之文档的写入流程
1.客户端提交向ES索引写入请求;
2.将写请求转发给master节点;
3.master节点会根据文档ID(系统自动生成,也可以自定义但不推荐)计算出对应的写入分片;
4.对应的primary shard开始写入数据,replica shard开始同步写入;
写入成功策略: (primary numbers + replica )/ 2 + 1
5.写入成功后提交master结果;
6.返回给客户端写入成功;
– ES的分片数量:
推荐生产环境设置为10个分片,1个副本。
– ES的原理之单个文档的读取流程:
1.客户端根据文档的ID去协调节点查询数据;
2.根据文档的路由ID计算数据存储的编号信息;
hash(routing) % primary_number_shards —> 存储的分片编号
3.根据查询出来的文档编号找本地的”cluster_state”信息查询分片所在节点;
4.去对应节点拉取数据返回给客户端;
– ES的原理之多个文档的读取流程:
1.客户端发起DSL查询语句给协调节点;
2.协调节点查询本地的”cluster_state”的映射信息,找到该索引的所有分片;
3.进入query(搜索):
– 3.1 各节点的分片查询本地的数据并将查询结果放在一个队列中;
– 3.2 各个分片仅将文档的_id及相关性评分返回;
4.fetch(取回):
1.根据query节点各分片返回的数据进行全局排序,决定要取回的文档ID;
2.根据文档ID取回真实数据;
5.最终将数据返回给客户端;
这张图展示了 Elasticsearch (ES) 的数据写入和存储机制,包括客户端请求、内存区域、操作系统内核区域和磁盘区域的交互过程。下面是对图中各部分的详细解释:
### 客户端请求
– 客户端通过 HTTP 或 TCP 协议向 Elasticsearch 发送请求。图中展示了一个 POST 请求,发送 JSON 格式的数据到 `10.0.0.91:9200/ysl_linux/_doc` 地址。
### ES 主进程
– 请求被发送到 Elasticsearch 主进程,主进程负责处理请求并将其路由到相应的分片(shard)。
### 内存区域
– **分片(Shard)**:Elasticsearch 将数据分割成多个分片,每个分片是一个独立的 Lucene 索引。图中展示了一个名为 `index(BKWwEsPkRKK60ZtzJ0Nc7Q)` 的索引,它被分割成多个主分片(primary shards)。
– **倒排索引(Segments)**:每个分片包含多个倒排索引段(segments),每个段包含文档的倒排索引和删除标记(.del 文件)。
– **Searchable 和 Unsearchable**:图中用不同颜色区分了可查询(searchable)和不可查询(unsearchable)的段。可查询的段可以被外界查询,而不可查询的段则不能。
### WAL(Write Ahead Log)
– **预写日志(WAL)**:在数据写入内存区域之前,Elasticsearch 会先将数据写入预写日志(translog),以确保数据的持久性和一致性。
– **Translog 文件**:图中展示了多个 translog 文件(如 translog-4.tlog、translog.ckp 等),这些文件记录了数据写入的事件。
### 操作系统内核区域
– **flush 阶段**:将 Elasticsearch 的段文件提交到操作系统的缓冲区(OS buffer)。
– **reflush 阶段**:将操作系统缓冲区中的数据同步到磁盘。
### 磁盘区域
– **段文件(Segments)**:最终,数据被持久化存储到磁盘上,形成多个段文件。这些段文件包含倒排索引和删除标记。
### 数据写入流程
1. **WAL(Write Ahead Log)**:记录本次的事件,确保数据的持久性。
2. **flush 阶段**:将 Elasticsearch 的段文件提交到操作系统的缓冲区。
3. **reflush 阶段**:将操作系统缓冲区中的数据同步到磁盘。
4. **merge**:定期合并段文件,整理被删除的文件,进行物理删除。
### 总结
这张图展示了 Elasticsearch 的数据写入和存储机制,包括客户端请求、内存区域、操作系统内核区域和磁盘区域的交互过程。通过预写日志(WAL)、flush 和 reflush 阶段,Elasticsearch 确保了数据的持久性和一致性,并将数据持久化存储到磁盘上。同时,通过定期的 merge 操作,Elasticsearch 优化了存储空间和查询性能。
“`
### 1 正排索引
“`sh
以MySQL为例:
如果我们想要查询Linux,则会将blog表的所有content字段全部进行全量查询。性能极差!
MySQL存储上限理论值是64T,但经过实际考察,比如zabbix监控800台服务器,数据量达到1TB,发现性能就极差了!
如果数据量达到100PB如何解决呢?—-》 而ES很轻松就解决了。
“`
### 2 倒排索引
“`sh
所谓的倒排索引,其实就是会多出一张倒排表,用户查询时并不会直接查询数据,而是先去查倒排表。
I : [1001,…]
love: [1001,…]
linux: [1001,1002,…]
…
先去倒排表查询数据,如果能查到再去原表查询,如果查不到,则直接结束查询。
“`
## 28 es集群的api
“`sh
curl -s 10.0.0.91:9200/_cluster/health | jq
{
“cluster_name”: “my-application”,
“status”: “yellow”,
“timed_out”: false,
“number_of_nodes”: 1,
“number_of_data_nodes”: 1,
“active_primary_shards”: 11,
“active_shards”: 11,
“relocating_shards”: 0,
“initializing_shards”: 0,
“unassigned_shards”: 1,
“delayed_unassigned_shards”: 0,
“number_of_pending_tasks”: 0,
“number_of_in_flight_fetch”: 0,
“task_max_waiting_in_queue_millis”: 0,
“active_shards_percent_as_number”: 91.66666666666666
}
curl -s 10.0.0.91:9200/_cluster/health | jq “.status”
“yellow”
相关性参数说明:
https://www.elastic.co/guide/en/elasticsearch/reference/7.17/cluster-health.html#cluster-health-api-response-body
推荐阅读:
https://www.elastic.co/guide/en/elasticsearch/reference/7.17/cluster.html
“`
## 29 es和logstash的jvm优化
“`sh
es的jvm优化
egrep -v “^.*#|^$” /etc/elasticsearch/jvm.options | grep -i xm
-Xms256m
-Xmx256m
es集群每台都修改,修改完成后重启es集群
生产环境JVM优化
– 1.推荐是物理内存的一半;
– 2.建议JVM的heap堆内存大小不应该超过32GB,官方建议是26GB是比较安全的阈值;
– 3.关于ES集群扩容建议是1GB对应20个分片,如果物理机有32GB,最多该节点以后640个分片就应该考虑扩容了;
推荐阅读:
https://www.elastic.co/guide/en/elasticsearch/reference/7.17/advanced-configuration.html
logstash的jvm优化
egrep -v “^.*#|^$” /etc/logstash/jvm.options | grep -i xm
-Xms256m
-Xmx256m
重启logstash
对于Logstash而言,如果数据量较大,建议调大内存,生产环境建议是物理机的一半。
“`
## 30 meticbeat
“`sh
mericbeat监控各个插件的服务,通过kibana展示,数据存储在es集群
下载
wget https://artifacts.elastic.co/downloads/beats/metricbeat/metricbeat-7.17.26-amd64.deb
安装
dpkt -i metricbeat-7.17.26-amd64.deb
配置
egrep -v “^.*#|^$” /etc/metricbeat/metricbeat.yml
metricbeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
index.codec: best_compression
setup.kibana:
host: “10.0.0.91:5601”
output.elasticsearch:
hosts: [“10.0.0.91:9200”]
processors:
– add_host_metadata: ~
– add_cloud_metadata: ~
– add_docker_metadata: ~
– add_kubernetes_metadata: ~
启动
systemctl restart metricbeat
kibana查看数据
“`

“`sh
metribeat模块
metricbeat modules list
metricbeat modules enable nginx elasticsearch
ls -l /etc/metricbeat/modules.d/*.yml
-rw-r–r– 1 root root 284 Nov 13 23:20 /etc/metricbeat/modules.d/elasticsearch.yml
-rw-r–r– 1 root root 348 Nov 13 23:20 /etc/metricbeat/modules.d/nginx.yml
-rw-r–r– 1 root root 956 Nov 13 23:20 /etc/metricbeat/modules.d/system.yml
metricbeat采集nginx配置
egrep -v “^.*#|^$” /etc/metricbeat/modules.d/nginx.yml
– module: nginx
enabled: true
period: 10s
hosts: [“http://10.0.0.91:81”]
server_status_path: “status”
metricbeat采集elasticsearch配置
egrep -v “^.*#|^$” /etc/metricbeat/modules.d/elasticsearch.yml
– module: elasticsearch
metricsets:
– node
– node_stats
period: 10s
hosts: [“http://10.0.0.91:9200”]
修改metricbeat配置
egrep -v “^.*#|^$” /etc/metricbeat/metricbeat.yml
metricbeat.config.modules:
path: ${path.config}/modules.d/*.yml
reload.enabled: false
setup.template.settings:
index.number_of_shards: 1
index.codec: best_compression
setup.dashboards.enabled: true
setup.kibana:
host: “10.0.0.91:5601”
output.elasticsearch:
hosts: [“10.0.0.91:9200”]
processors:
– add_host_metadata: ~
– add_cloud_metadata: ~
– add_docker_metadata: ~
– add_kubernetes_metadata: ~
重启metricbeat
kibana查看数据
“`

系统



## 31 heartbeat
“`sh
下载
wget https://artifacts.elastic.co/downloads/beats/heartbeat/heartbeat-7.17.26-amd64.deb
安装
dpkg -i heartbeat-7.17.26-amd64.deb
配置
cat heartbeat.yml
heartbeat.config.monitors:
path: ${path.config}/monitors.d/*.yml
reload.enabled: false
reload.period: 5s
heartbeat.monitors:
– type: http
enabled: true
id: my-heartbeat-http
name: my-heartbeat-http
urls: [“http://10.0.0.91:9200”]
schedule: ‘@every 10s’
– type: tcp
enabled: true
id: my-heartbeat-tcp
name: my-heartbeat-tcp
urls: [“10.0.0.91:80”]
schedule: ‘@every 10s’
– type: icmp
enabled: true
id: my-heartbeat-icmp
name: my-heartbeat-icmp
urls: [“10.0.0.91”]
schedule: ‘@every 10s’
setup.template.settings:
index.number_of_shards: 1
index.codec: best_compression
setup.kibana:
output.elasticsearch:
hosts: [“10.0.0.91:9200”]
processors:
– add_observer_metadata:
启动
systemctl restart heartbeat-elastic.service
kibana查看数据
“`

## 32 es集群加密
“`sh
1 生产证书文件
/usr/share/elasticsearch/bin/elasticsearch-certutil cert -out /etc/elasticsearch/elastic-certificates.p12 -pass “”
2 如果是集群,将证书文件拷贝到其他es节点
chmod 640 /etc/elasticsearch/elastic-certificates.p12
scp -p /etc/elasticsearch/elastic-certificates.p12 root@10.0.0.92:/etc/elasticsearch/
3 修改es集群的配置文件
cat >>/etc/elasticsearch/elasticsearch.yml<
}
}
output {
elasticsearch {
hosts => [“10.0.0.91:9200”]
index => “ysl-logstash-tls-es”
user => elastic
password => “123456”
}
}
2 启动
logstash -rf /etc/logstash/conf.d/11-tcp-to-es_tls.conf
3发送测试数据
echo test_logstash_es_tls | nc 10.0.0.91 8888
4 kibana查看
“`

35 基于kibana实现RBAC
“`sh
r role
b basic
a access
c ctrol
基于访问角色控制
“`


“`sh
rbac是根据要求创建角色,再创建用户,权限管理
“`
## 35 filebeat对接redis
“`sh
1 部署redis
root@elk01:~# docker load -i redis-7.2.5.tar.gz
9853575bc4f9: Loading layer 77.83MB/77.83MB
15ba19fd0afe: Loading layer 10.75kB/10.75kB
98723f5366da: Loading layer 10.75kB/10.75kB
811293dc7f13: Loading layer 4.143MB/4.143MB
7879a25fefe5: Loading layer 37.57MB/37.57MB
643b046a00e2: Loading layer 1.536kB/1.536kB
5f70bf18a086: Loading layer 1.024kB/1.024kB
b3a84b16d771: Loading layer 4.096kB/4.096kB
Loaded image: redis:7.2.5
root@elk01:~#
2 启动redis
docker run -d –network host –name redis-server –restart always redis:7.2.5
docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
9a741a1c37bf redis:7.2.5 “docker-entrypoint.s…” 15 seconds ago Up 15 seconds redis-server
ss -ntl | grep 6379
LISTEN 0 511 0.0.0.0:6379 0.0.0.0:*
LISTEN 0 511 [::]:6379 [::]:*
root@elk01:~#
3 编写filebeat配置文件
cat >>/etc/filebeat/19-tcp-to-redis.yaml<
filebeat-redis
127.0.0.1:6379[10]> type filebeat-redis
list
127.0.0.1:6379[10]> LRANGE filebeat-redis 0 -1
{“@timestamp”:”2025-03-19T10:03:36.732Z”,”@metadata”:{“beat”:”filebeat”,”type”:”_doc”,”version”:”7.17.26″},”host”:{“name”:”elk01″},”agent”:{“type”:”filebeat”,”version”:”7.17.26″,”hostname”:”elk01″,”ephemeral_id”:”df54f4e0-0123-446e-a7bb-e235576119a8″,”id”:”ab0eb895-d64b-425b-b2f3-14d6a4509397″,”name”:”elk01″},”message”:”filebeat-to-redis”,”log”:{“source”:{“address”:”10.0.0.91:59310″}},”input”:{“type”:”tcp”},”ecs”:{“version”:”1.12.0″}}
“`
## 36 logstash对接redis
“`sh
1 编写配置文件
cat >>/etc/logstash/conf.d/13-redis-to-es.conf<
port => 6379
db => 10
data_type => “list”
key => “filebeat-redis”
}
}
filter {
mutate {
remove_field => [ “agent”,”input”,”log”,”@version”,”ecs” ]
}
}
output {
# stdout {}
elasticsearch {
hosts => [“10.0.0.91:9200″,”10.0.0.92:9200″,”10.0.0.93:9200”]
index => “ysl-logstash-elfk-redis”
user => elastic
password => “123456”
}
}
EOF
2 启动logstash
logstash -rf /etc/logstash/conf.d/13-redis-to-es.conf
3 kibana查看数据
4 logstash消费数据redis数据后,redis数据删除
原因:节约资源
root@elk01:~# docker exec -it redis-server redis-cli -n 10 –raw
127.0.0.1:6379[10]> keys *
127.0.0.1:6379[10]>
“`

## 37 elasticstach对于kafka和redis如何选择
“`sh
功能上:
两者都可以用来临时缓存数据。
区别:
– 1.性能上甚至Redis速度回更快,毕竟使用的是内存,kafka稍微逊色,因为数据写入磁盘。
– 2.数据量较大时,Redis不太合适,因为数据都得积压到内存中,效率就会降低,存储成本也比较高,因为建议选择kafka;
– 3.Logstash一旦消费了redis数据就会将数据删除,是一对一消费模式(消费者消费数据后数据删除),而kafka是一对多消费模式(消费者消费数据后数据不删除);
“`
## 38 filebeat基于api-key写入数据到es集群
“`sh
1.为什么要启用api-key
为了安全性,使用用户名和密码的方式进行认证会暴露用户信息。
ElasticSearch也支持api-key的方式进行认证。这样就可以保证安全性。api-key是不能用于登录kibana,安全性得到保障。
而且可以基于api-key实现权限控制。
2.ES启用api-key
vim /etc/elasticsearch/elasticsearch.yml
…
# 添加如下配置
# 启用api_key功能
xpack.security.authc.api_key.enabled: true
# 指定API密钥加密算法
xpack.security.authc.api_key.hashing.algorithm: pbkdf2
# 缓存的API密钥时间
xpack.security.authc.api_key.cache.ttl: 1d
# API密钥保存数量的上限
xpack.security.authc.api_key.cache.max_keys: 10000
# 用于内存中缓存的API密钥凭据的哈希算法
xpack.security.authc.api_key.cache.hash_algo: ssha256
3 拷贝配置文件到其他节点
scp /etc/elasticsearch/elasticsearch.yml 10.0.0.92:/etc/elasticsearch
scp /etc/elasticsearch/elasticsearch.yml 10.0.0.93:/etc/elasticsearch
4 重启es集群
systemctl restart elasticsearch
5 创建api
6 基于api-key解析
echo cV9oSVNaUUJwQUVOWVQyYV9wSjI6akt0d0RXQlJUaS1HLXNZdWhJbXVfZw==| base64 -d | more
q_hISZQBpAENYT2a_pJ2:jKtwDWBRTi-G-sYuhImu_g
7 编写配置文件
cat >>/etc/filebeat/config/20-tcp-to-es.yaml<
}
}
output {
elasticsearch {
hosts => [“10.0.0.91:9200″,”10.0.0.92:9200″,”10.0.0.93:9200”]
index => “ysl-logstash-elfk-apikey-tls”
# user => elastic
# password => “123456”
# 指定api-key的方式认证
api_key => “sZ8uSpQB8kqe7kUTd5OH:VHHFJfqqQ4iE9ZZFnw75CQ”
# 使用api-key则必须启动ssl
ssl => true
# 跳过ssl证书验证
ssl_certificate_verification => false
}
}
[root@elk93 ~]#
[root@elk93 ~]# logstash -rf /etc/logstash/conf.d/14-tcp-to-es_apikey.conf
3.访问测试
[root@elk91 ~]# echo 88888888888888888888888 | nc 10.0.0.93 7777
[root@elk91 ~]# echo 999999999999999999999999 | nc 10.0.0.93 7777
“`
## 41 es7和es8区别
“`sh
– ES7和ES8的区别:
– ES8默认启用了https认证;
– kibana8功能也更加强大;
“`
# 第二章 kafka和zookeeper
## 1 zookeeper
### 1 zookeeper和kafka的关系
“`sh
– Kafka和ZooKeeper的关系
ZooKeeper 是一个分布式协调服务,常用于管理配置、命名和同步服务。
长期以来,Kafka 使用 ZooKeeper 负责管理集群元数据、控制器选举和消费者组协调等任务理,包括主题、分区信息、ACL(访问控制列表)等。
ZooKeeper 为 Kafka 提供了选主(leader election)、集群成员管理等核心功能,为 Kafka提供了一个可靠的分布式协调服务,使得 Kafka能够在多个节点之间进行有效的通信和管理。
然而,随着 Kafka的发展,其对 ZooKeeper的依赖逐渐显露出一些问题,这些问题也是下面 Kafka去除 Zookeeper的原因。
– kafka 2.8+ 为什么要移除zookeeper组件呢?
1.复杂性增加
ZooKeeper 是独立于 Kafka 的外部组件,需要单独部署和维护,因此,使用 ZooKeeper 使得 Kafka的运维复杂度大幅提升。
运维团队必须同时管理两个分布式系统(Kafka和 ZooKeeper),这不仅增加了管理成本,也要求运维人员具备更高的技术能力。
2. 性能瓶颈
作为一个协调服务,ZooKeeper 并非专门为高负载场景设计, 因此,随着集群规模扩大,ZooKeeper在处理元数据时的性能问题日益突出。
例如,当分区数量增加时,ZooKeeper需要存储更多的信息,这导致了监听延迟增加,从而影响Kafka的整体性能。
在高负载情况下,ZooKeeper可能成为系统的瓶颈,限制了Kafka的扩展能力。
3. 一致性问题
Kafka 内部的分布式一致性模型与 ZooKeeper 的一致性模型有所不同。由于 ZooKeeper和 Kafka控制器之间的数据同步机制不够高效,可能导致状态不一致,特别是在处理集群扩展或不可用情景时,这种不一致性会影响消息传递的可靠性和系统稳定性。
4.发展自己的生态
Kafka 抛弃 ZooKeeper,我个人觉得最核心的原因是:Kafka生态强大了,需要自立门户,这样就不会被别人卡脖子。
纵观国内外,有很多这样鲜活的例子,当自己弱小时,会先选择使用别家的产品,当自己羽翼丰满时,再选择自建完善自己的生态圈。
– KAFKA 2.8+引入Kraft模式抛弃ZooKeeper
kafka2.8.0版本引入了基于Raft共识协议的新特性,它允许kafka集群在没有ZooKeeper的情况下运行。
为了剥离和去除ZooKeeper,Kafka引入了自己的亲儿子KRaft(Kafka Raft Metadata Mode)。
KRaft是一个新的元数据管理架构,基于Raft一致性算法实现的一种内置元数据管理方式,旨在替代ZooKeeper的元数据管理功能。
KRaft的优势有以下几点:
简化部署:
Kafka 集群不再依赖外部的 ZooKeeper 集群,简化了部署和运维的复杂性。
KRaft 将所有协调服务嵌入 Kafka 自身,不再依赖外部系统,这样大大简化了部署和管理,因为管理员只需关注 Kafka 集群。
高效的一致性协议:
Raft 是一种简洁且易于理解的一致性算法,易于调试和实现。KRaft 利用 Raft 协议实现了强一致性的元数据管理,优化了复制机制。
提高性能:
由于元数据管理不再依赖 ZooKeeper,Kafka 集群的性能得到了提升,尤其是在元数据读写方面。
增强可扩展性:
KRaft 模式支持更大的集群规模,可以有效地扩展到数百万个分区。
提高元数据操作的扩展性:新的架构允许更多的并发操作,并减少了因为扩展性问题导致的瓶颈,特别是在高负载场景中。
更快的控制器故障转移:
控制器(Controller)的选举和故障转移速度更快,提高了集群的稳定性。
消除 ZooKeeper 作为中间层之后,Kafka 的延迟性能有望得到改善,特别是在涉及选主和元数据更新的场景中。
KRaft模式下,kafka集群中的一些节点被指定为控制器(Controller),它们负责集群的元数据管理和共识服务,所有的元数据都存储在kafka内部的主题中,
而不是ZooKeeper,控制器通过KRaft协议来确保元数据在集群中的准确复制,这种模式使用了基于时间的存储模型,通过定期快照来保证元数据日志不会无限增长。
完全自主:
因为是自家产品,所以产品的架构设计,代码开发都可以自己说了算,未来架构走向完全控制在自己手上。
控制器(Controller)节点的去中心化:
KRaft 模式中,控制器节点由一组 Kafka 服务进程代替,而不是一个独立的 ZooKeeper 集群。
这些节点共同负责管理集群的元数据,通过 Raft 实现数据的一致性。
日志复制和恢复机制:
利用 Raft 的日志复制和状态机应用机制,KRaft 实现了对元数据变更的强一致性支持,这意味着所有控制器节点都能够就集群状态达成共识。
动态集群管理:
KRaft允许动态地向集群中添加或移除节点,而无需手动去ZooKeeper中更新配置,这使得集群管理更为便捷。
“`
### 2 zookeeper集群部署
“`sh
– zookeeper集群部署
1.什么是zookeeper
ZooKeeper是一个集中式服务,用于维护配置信息、命名、提供分布式同步和提供组服务。所有这些类型的服务都以某种形式被分布式应用程序使用。
生成环境中当读取的数据量少于75%,且每秒的QPS少于6w,则zookeeper集群规模建议是3个。
推荐阅读:
https://zookeeper.apache.org/doc/current/zookeeperOver.html
1 下载
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
2 解压到指定目录
mkdir -pv /software
tar xf apache-zookeeper-3.8.4-bin.tar.gz -C /software
ln -s /software/apache-zookeeper-3.8.4-bin/ /software/zookeeper
3 配置环境变量
cat >/etc/profile.d/zk.sh<
echo 92 > /zk/data/zk/myid
echo 93 > /zk/data/zk/myid
7 修改hosts文件
cat >>/etc/hosts<
端口是8099
“`

## 2 kafka
### 1 kafka单点部署
“`sh
1 下载
wget https://dlcdn.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
2 解压
tar xf kafka_2.13-3.9.0.tgz -C /software/
ln -s /software/kafka_2.13-3.9.0 /software/kafka
3 配置
mkdir /kafka/data/kafka -pv
# 表示kafka的唯一标识
broker.id=91
# 数据的存储路径
log.dirs=/ysl/data/kafka
# 连接zookeeper集群的地址
zookeeper.connect=10.0.0.91:2181,10.0.0.92:2181,10.0.0.93:2181/test-kafka390
4 配置环境变量
cat /etc/profile.d/kafka.sh
#!/bin/bash
export KAFKA_HOME=/software/kafka
export PATH=$PATH:$KAFKA_HOME/bin
5 启动
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
6 停止
kafka-server-stop.sh
“`
### 2 kafka jvm调优
“`sh
egrep -v “^.*#|^$” /software/kafka/bin/kafka-server-start.sh | grep -i xm
export KAFKA_HEAP_OPTS=”-Xmx256m -Xms256m”
– 1.生产环境中,建议kafka的堆内存大小不宜过大,推荐设置5~6GB即可。因为其数据存储在磁盘;
– 2.生产环境停止kafka可能时间较长,可以多次执行”kafka-server-stop.sh “脚本,耐心等待,不要使用kill -9会造成数据丢失;
“`
### 3 kafka集群部署
“`sh
1 拷贝程序到其他节点(刚装好单节点的机器执行)
scp -r /software/kafka_2.13-3.9.0/ 10.0.0.92:`pwd`
scp -r /software/kafka_2.13-3.9.0/ 10.0.0.93:`pwd`
scp /etc/profile.d/kafka.sh 10.0.0.92:/etc/profile.d/
scp /etc/profile.d/kafka.sh 10.0.0.93:/etc/profile.d/
2 修改节点的broker.id(各节点执行)
92节点执行
sed -i ‘/^broker/s@91@92@’ /software/kafka_2.13-3.9.0/config/server.properties
grep ^broker.id /software/kafka_2.13-3.9.0/config/server.properties
ln -s /software/kafka_2.13-3.9.0/ /software/kafka
source /etc/profile.d/kafka.sh
93节点执行
sed -i ‘/^broker/s@91@93@’ /software/kafka_2.13-3.9.0/config/server.properties
grep ^broker.id /software/kafka_2.13-3.9.0/config/server.properties
ln -s /software/kafka_2.13-3.9.0/ /software/kafka
source /etc/profile.d/kafka.sh
3 启动,每个节点执行
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
4 zkWeb查看
“`


### 4 生产者和消费者
“`sh
1 生产者产生数据
root@elk03:~# kafka-console-producer.sh –bootstrap-server 10.0.0.92:9092 –topic test-topic
>111111111
>2
>22222222222222
>3333333333333
2 消费者消费数据
kafka-console-consumer.sh –bootstrap-server 10.0.0.91:9092 –topic test-topic –from-beginning
111111111
2
22222222222222
3333333333333
3 再次查看topic列表
kafka-topics.sh –bootstrap-server 10.0.0.92:9092 –list
__consumer_offsets
test-topic
root@elk01:/software#
“`
### 5 topic基本管理
“`sh
1 查看topic列表
kafka-topics.sh –bootstrap-server 10.0.0.92:9092 –list
__consumer_offsets
test-topic
2 创建topic
kafka-topics.sh –bootstrap-server 10.0.0.92:9092 –topic test-topic1 –create –partitions 3 –replication-factor 1
3 查看所有topic详细信息
kafka-topics.sh –bootstrap-server 10.0.0.92:9092 –describe
Topic: test-topic TopicId: n9jlbGyJQw2_PDj0M2bMVw PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test-topic Partition: 0 Leader: 91 Replicas: 91 Isr: 91 Elr: N/A LastKnownElr: N/A
Topic: test-topic1 TopicId: Gw7WcOb6TPqph0yC8AgL4Q PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: test-topic1 Partition: 0 Leader: 92 Replicas: 92 Isr: 92 Elr: N/A LastKnownElr: N/A
Topic: test-topic1 Partition: 1 Leader: 93 Replicas: 93 Isr: 93 Elr: N/A LastKnownElr: N/A
Topic: test-topic1 Partition: 2 Leader: 91 Replicas: 91 Isr: 91 Elr: N/A LastKnownElr: N/A
Topic: __consumer_offsets TopicId: 38K_-RGrTIiTROrveDf-eQ PartitionCount: 50 ReplicationFactor: 1 Configs: compression.type=producer,cleanup.policy=compact,segment.bytes=104857600
Topic: __consumer_offsets Partition: 0 Leader: 92 Replicas: 92 Isr: 92 Elr: N/A LastKnownElr: N/A
Topic: __consumer_offsets Partition: 1 Leader: 93 Replicas: 93 Isr: 93 Elr: N/A LastKnownElr: N/A
Topic: __consumer_offsets Partition: 2 Leader: 91 Replicas: 91 Isr: 91 Elr: N/A LastKnownElr: N/A
Topic: __consumer_offsets Partition: 3 Leader: 92 Replicas: 92 Isr: 92 Elr: N/A LastKnownElr: N/A
…..
4 查看指定topic详细信息
kafka-topics.sh –bootstrap-server 10.0.0.92:9092 –describe –topic test-topic1
Topic: test-topic1 TopicId: Gw7WcOb6TPqph0yC8AgL4Q PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: test-topic1 Partition: 0 Leader: 92 Replicas: 92 Isr: 92 Elr: N/A LastKnownElr: N/A
Topic: test-topic1 Partition: 1 Leader: 93 Replicas: 93 Isr: 93 Elr: N/A LastKnownElr: N/A
Topic: test-topic1 Partition: 2 Leader: 91 Replicas: 91 Isr: 91 Elr: N/A LastKnownElr: N/A
5 修改topic分区数量(只能增大不能减小,减小可能会丢失数据)
kafka-topics.sh –bootstrap-server 10.0.0.92:9092 –describe –topic test-topic
Topic: test-topic TopicId: n9jlbGyJQw2_PDj0M2bMVw PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test-topic Partition: 0 Leader: 91 Replicas: 91 Isr: 91 Elr: N/A LastKnownElr: N/A
test-topic主题只有一个分区,将test-topic分区数量增加到3
kafka-topics.sh –bootstrap-server 10.0.0.92:9092 –describe –topic test-topic
Topic: test-topic TopicId: n9jlbGyJQw2_PDj0M2bMVw PartitionCount: 3 ReplicationFactor: 1 Configs:
Topic: test-topic Partition: 0 Leader: 91 Replicas: 91 Isr: 91 Elr: N/A LastKnownElr: N/A
Topic: test-topic Partition: 1 Leader: 92 Replicas: 92 Isr: 92 Elr: N/A LastKnownElr: N/A
Topic: test-topic Partition: 2 Leader: 93 Replicas: 93 Isr: 93 Elr: N/A LastKnownElr: N/A
6 修改副本数量(生产不建议)
https://www.cnblogs.com/yinzhengjie/p/9808125.html
7 删除topic
kafka-topics.sh –bootstrap-server 10.0.0.92:9092 –topic test-topic1 –delete
root@elk01:/software# kafka-topics.sh –bootstrap-server 10.0.0.92:9092 –list
__consumer_offsets
test-topic
数据删除后并不会立刻删除,而是需要等待一段时间(log.segment.delete.delay.ms,kafka 3.9.0版本中默认是1min)后再删除,有延迟时间。
“`
### 6 消费者
“`sh
– 1.同一个消费者组的消费者不能同时拉取同一个topic的分区;
– 2.当消费者组的消费者数量发生变化时就会触发重平衡;
– 3.由于没有消费者进行消费,因此可能会出现数据延迟的情况。
– 4.当有该消费者组的消费者上线后,会继续消费对应分区的offset之后的数据。
– 5.如果想要重新消费topic的数据,可以重新换一个消费者组即可,只要消费者组在kafka集群中没有记录即可;
“`
### 7 kafka术语
“`sh
– topic
逻辑概念,主题,针对生产者和消费者进行读写的单元。
– partition:
一个topic可以有1个或多个分区,从而实现数据的分布式存储。
– replica:
每个分区最少有一个或多个副本,从而实现数据高可用性。
– producer:
向kafka集群写入数据的一方。
– consumer:
从kafka读取数据的一方。
– offset
分区中记录event数据的偏移量,每条消息对应一个offset记录。
– consumer group
任何消费者都隶属于某一个消费者组,一个消费者组可以有多个消费者。
– rebalance
当消费者组的消费者数量发生变化时就会触发重平衡,即重新为消费者分片分区的过程。
– 为什么kafka存在丢失数据的风险
– isr :
和leader数据同步的副本集合。
– osr:
和leader数据不同步的副本集合。
– ar:
isr + osr,表示所有副本。
– leo:
最后一个偏移量。
– hw:
高水位线,ISR列表中最小的leo就是hw。
对于消费者组而言,仅能看到HW之前的数据,无法消费到HW之后的数据。
“`
### 8 kafka数据一致性问题
“`sh
方案1:
kafka使用一个partition就能保证数据顺序一致性
缺点:
无法充分利用集群的资源
方案2:
kafka使用多个partition则需要对生产者数据进行编号,消费者取出数据后基于编号排序
缺点:
增加额外工作
“`
### 9 kafka参数优化
“`sh
vi /software/kafka_2.13-3.9.0/config/server.properties
broker.id
唯一标识kafka节点。
log.dirs
可以定义多个路径,使用逗号分割,实现数据I/O并发读写以提高性能。
log.retention.hours
数据的保留周期,默认保留7天(168h)
zookeeper.connect
建议配置集群,且设置chroot
listeners = PLAINTEXT://your.host.name:9092
配置监听地址,若不指定则默认为主机名,需要配置hosts解析。
auto.create.topics.enable
自动创建topic,建议关闭。将其设置为false,默认为true。
如果设置为true,当topic不存在时,会自动创建,集群的可用性增强,但维护性较差!
delete.topic.enable:
删除topic时,数据并不会删除,因此不会释放磁盘空间。
num.io.threads
服务器用于处理请求的线程数,其中可能包括磁盘I/O,建议设置为cpu核心数即可。
num.network.threads:
服务器用于从网络接收请求并向网络发送响应的线程数,建议设置为cpu核心数即可。
参考链接:
https://kafka.apache.org/documentation/#brokerconfigs
– 客户端(Producer | consumer)
acks: 0
参考链接:
https://kafka.apache.org/documentation/#producerconfigs
– 客户端(Producer | consumer)
acks: 0
group.id
auto.offset.reset
参考链接:
https://kafka.apache.org/documentation/#consumerconfigs
– kafka集群压测
推荐阅读:
https://www.cnblogs.com/yinzhengjie/p/9953212.html
“`
### 10 ElasticStack架构升级及MQ对比
“`sh
为了减轻Logstash压力以及Logstash和filebeat的耦合性,我们可以考虑在Logstash前面加一套MQ集群。
所谓的MQ,指的是Message Queue,即消息队列。但是这种架构无疑是给系统增加了负担:
– 1.MQ不能存在单点问题;
– 2.MQ具有很强的处理数据能力;
– 3.增加了集群的整体复杂性,运维和开发的同学都得增加学习成本;
也就是说,这意味消息队列要提供以下特性:
– 1.MQ集群吞吐量大,能够承担数据的读写; 5台32core,32GB读取处理消息数量23w/s,写速度可以达到220m/s。
– 2.MQ集群要提供非常强的高可用性,不能是单点的故障;
– 3.文档丰富,社区资源丰富;
市面上有很多MQ产品,典型代表有:
– RocketMQ【阿里巴巴,有社区版本(功能较差,文档不够丰富,仅支持Java相关的API)和SAAS版本(功能强,需要花钱),性能很好,单机每秒能够处理10w+/s】
– ActiveMQ【老牌系统,文档相对丰富,性能一般,单机每秒处理1w+/s】
– Kafka【日志收集,大数据分析,性能非常好,单机每秒处理10w+/s,存在丢失数据的风险,但可以忽略不计,API文档非常丰富,基于Java和Scala语言研发。二次开发比较方能,社区完善了Golang,Python等API】
– RabbitMQ【金融公司,文档丰富,性能较好,单机每秒处理1w+/s,有丰富的WebUI,可以做到数据不丢失。API开发相对来说不太友好。基于Erlang语言研发,国内并不流行,因此二次开发招人比较困难。】
“`
### 11 filebeat采集数据写入kafka
“`sh
1 kafka创建topic
kafka-topics.sh –bootstrap-server 10.0.0.92:9092,10.0.0.91:9092,10.0.0.93:9092 –partitions 3 –replication-factor 1 –topic ysl-elfk –create
Created topic ysl-elfk.
2 配置filebeat输出到kafka
cat >>/etc/filebeat/config/18-tcp-to-kafka.yaml<
topics => [“ysl-elfk”]
group_id => “ysl-logstash004”
auto_offset_reset => “earliest”
}
}
filter {
json {
source => “message”
remove_field => [ “agent”,”input”,”log”,”@version”,”ecs” ]
}
}
output {
# stdout {}
elasticsearch {
hosts => [“10.0.0.91:9200″,”10.0.0.92:9200″,”10.0.0.93:9200”]
index => “ysl-logstash-elfk-kafka”
user => elastic
password => “123456”
}
}
EOF
2 启动logstash
logstash -rf /etc/logstash/conf.d/12-kafka-to-es.conf
3 kibana查看数据
“`
