Kubernetes集群监控-Kafka对接EL(Fluentd)K构建超强日志体系

简介

Kafka 在日志体系中的核心作用

在当今复杂的分布式系统环境下,日志管理变得愈发重要,而 Kafka 在其中扮演着极为关键的角色。对于大型系统,特别是基于微服务设计的分布式系统而言,日志来源众多,数据量庞大。如果直接将大量日志通过传统方式收集输出至存储或分析工具,很有可能会导致诸如 Elasticsearch 等工具崩溃,这时候就凸显出 Kafka 的价值了。

Kafka 具有出色的分布式扩容能力,能够轻松应对不断增长的日志数据量。在高并发场景下,它的优势更是明显,例如在面对大量微服务同时产生日志并需要处理的情况时,依然可以稳定地接收和暂存这些日志信息,起到了很好的缓冲作用。而且,它采用发布 - 订阅的消息模式,消息生产者(也就是各个产生日志的应用、服务等)将日志消息发布到 Kafka 的主题 (Topic) 中,多个消息消费者(比如后续对接的用于日志处理、分析等的工具)可以同时订阅这些主题来消费消息,实现了一对多的数据流转模式,方便不同环节的组件对日志进行相应处理,为整个日志体系搭建起了可靠的数据流转基础。

架构图

EL (Fluentd) K 的组合优势

高效的日志收集与处理

Fluentd 在日志收集阶段发挥重要作用。Fluentd 具有丰富的插件系统,能够轻松地从各种数据源(如服务器、容器等)收集日志。无论是文本文件、系统日志还是网络数据,Fluentd 都能有效地进行采集,确保日志数据不会遗漏。

强大的日志缓冲能力

Kafka 在图中处于日志缓冲的位置。Kafka 是一个高吞吐量、分布式的消息队列系统。它能够在短时间内处理大量的日志数据,并将其进行缓存。这一特性使得在日志高峰期,系统不会因为数据量过大而出现堵塞或者数据丢失的情况,保证了日志数据的完整性和及时性。

灵活的日志转换

Logstash 在图中负责日志转换。Logstash 可以对从 Kafka 接收到的日志数据进行灵活的转换操作。例如,它可以对日志格式进行标准化处理,将不同格式的日志转换为统一的、便于存储和分析的格式。同时,还可以进行数据过滤、字段提取等操作,让日志数据更加符合实际分析需求。

可靠的日志存储

Elasticsearch 作为日志存储组件,具有出色的分布式存储和搜索能力。它能够将经过处理的日志数据进行可靠的存储,并且提供快速的全文搜索功能。无论是从海量的日志数据中查找特定事件,还是进行复杂的数据分析,Elasticsearch 都能够高效地满足需求。

直观的日志呈现

Kibana 在图中的最后环节负责日志呈现。Kibana 可以与 Elasticsearch 紧密集成,通过直观的可视化界面将存储在 Elasticsearch 中的日志数据进行展示。用户可以轻松地创建各种图表、仪表盘,从不同维度对日志数据进行分析和监控,快速发现系统中的潜在问题。

综上所述,EL (Fluentd) K 的组合通过各个组件的协同工作,形成了一个从日志收集、缓冲、转换、存储到呈现的完整、高效且可靠的日志处理体系。

搭建 Kafka 集群

Kafka 最初由 Linkedin 公司开发,是一款分布式、分区化、多副本且多订阅者的系统,依托 zookeeper 协调运作,既可用作分布式日志系统,也能当作 MQ 系统,在 web/nginx 日志处理、访问日志管理以及消息服务等领域应用广泛。2010 年,Linkedin 将其贡献给 Apache 基金会,使之成为顶级开源项目。其主要应用场景集中在日志收集系统与消息系统两大方面。

安装 jdk 依赖

首先,我们需要安装 jdk 依赖。前往 jdk 下载地址:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html 下载合适版本的 jdk。

在服务器上执行以下操作:

1
2
3
4
5
6
7
8
[root@kafka-server ~]# cd /server/tools
# 安装 jdk 软件包,此处以 jdk-8u431-linux-x64.rpm 为例
[root@k8s-master tools]# rpm -ivh jdk-8u431-linux-x64.rpm
# 检查 jdk 安装是否成功
[root@k8s-master tools]# java -version
java version "1.8.0_431"
Java(TM) SE Runtime Environment (build 1.8.0_431-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.431-b11, mixed mode)

安装 Kafka

完成 jdk 安装后,开始安装 Kafka。

1
2
3
4
5
6
7
[root@kafka-server ~]# cd /server/tools
# 下载 Kafka 安装包,这里以 kafka_2.12-3.6.2.tgz 为例
[root@kafka-server tools]# wget https://archive.apache.org/dist/kafka/3.6.2/kafka_2.12-3.6.2.tgz
# 解压 Kafka 安装包到 /opt/ 目录
[root@kafka-server tools]# tar xf kafka_2.12-3.6.2.tgz.tgz -C /opt/
# 创建软链接,方便后续操作
[root@kafka-server tools]# ln -s /opt/kafka_2.12-3.6.2.tgz /opt/kafka

配置 Kafka

安装完成后,对 Kafka 进行配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[root@kafka-server kafka]# vim /opt/kafka/config/server.properties 
# 设置 Kafka 日志存储目录
log.dirs=/data/kafka/logs
# 指定 zookeeper 连接地址
zookeeper.connect=localhost:2181
# 设置日志刷新消息间隔
log.flush.interval.messages=10000
# 设置日志刷新时间间隔
log.flush.interval.ms=1000
# 启用删除主题功能
delete.topic.enable=true
# 设置 Kafka 服务器主机名
host.name=kafka-server.boysec.cn

[root@kafka-server kafka]# mkdir -p /data/kafka/logs

启动 Kafka

最后,启动 Kafka 服务。

1
2
3
4
5
[root@k8s-slave kafka]# /opt/kafka/bin/zookeeper-server-start.sh -daemon /opt/kafka/config/zookeeper.properties
[root@k8s-slave kafka]# /opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
# 检查 Kafka 服务是否成功启动,查看 9092 端口是否监听
[root@k8s-slave kafka]# netstat -lntup |grep 9092
tcp6 0 0 10.1.1.130:9092 :::* LISTEN 103655/java

通过以上步骤,我们完成了 Kafka 的安装、配置与启动,为后续构建强大的日志体系奠定了坚实的基础。在实际应用中,可根据具体需求进一步优化 Kafka 的各项参数设置,以实现更高效稳定的运行。

验证Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 创建topic
/opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 10.1.1.130:9092 --replication-factor 1 --partitions 3 --topic cola

# 查看topic
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server 10.1.1.130:9092

# 本地测试--发送消息
/opt/kafka/bin/kafka-console-producer.sh --broker-list 10.1.1.130:9092 --topic cola

# 本地测试---查看接收消息
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 10.1.1.130:9092 --topic cola --from-beginning

# 查看特定主题详细信息
/opt/kafka/bin/kafka-topics.sh --bootstrap-server 10.1.1.130:9092 --describe --topic cola

Kafka-Manager(可选安装)

Kafka-Manager 是雅虎开源的一款用于管理 Apache Kafka 集群的工具。

Github 地址https://github.com/yahoo/CMAK。

功能特点

  • 集群管理
    • 支持对多个集群进行管理。
    • 能够轻松查看集群状态,包括主题、消费者、偏移量、代理(brokers)、副本分布、分区分布等信息。

方法一(废弃)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
FROM hseeberger/scala-sbt

ENV ZK_HOSTS=10.1.1.130:2181 \
KM_VERSION=2.0.0.2

RUN mkdir -p /tmp && \
cd /tmp && \
wget https://github.com/yahoo/kafka-manager/archive/${KM_VERSION}.tar.gz && \
tar xf ${KM_VERSION}.tar.gz && \
cd /tmp/CMAK-${KM_VERSION} && \
sbt clean dist && \
unzip -d / ./target/universal/kafka-manager-${KM_VERSION}.zip && \
rm -fr /tmp/${KM_VERSION} /tmp/kafka-manager-${KM_VERSION}

WORKDIR /kafka-manager-${KM_VERSION}

EXPOSE 9000
ENTRYPOINT ["./bin/kafka-manager","-Dconfig.file=conf/application.conf"]

构建镜像: docker build . -t harbor.od.com/infra/kafka-manager:v2.0.0.2

方法二

注意事项:
在此方法中,我们可直接下载镜像来部署 Kafka-Manager。值得一提的是,本人已制作了 3.0.0.53.0.0.6 版本的 kafaka-manager 镜像,这些镜像均可直接用于部署操作,极大地简化了部署流程。

资源配置清单相关操作:
首先,我们需要在服务器上创建用于存放 Kafka-Manager 的 Kubernetes 资源配置清单的目录,执行以下命令:

1
mkdir -p /var/k8s-yaml/kafka-manager

此目录将用于存放后续创建 Kubernetes 资源(如 Deployment、Service 等)所需的 yaml 文件,以便更好地组织和管理与 Kafka-Manager 部署相关的资源配置信息,使整个部署过程更加清晰、有序且易于维护。

vim /var/k8s-yaml/kafka-manager/deployment.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
kind: Deployment
apiVersion: apps/v1
metadata:
name: kafka-manager
namespace: logging
labels:
name: kafka-manager
spec:
replicas: 1
selector:
matchLabels:
app: kafka-manager
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1
maxSurge: 1
revisionHistoryLimit: 7
progressDeadlineSeconds: 600
template:
metadata:
labels:
app: kafka-manager
spec:
containers:
- name: kafka-manager
image: wangxiansen/kafka-manager:debian-v3.0.0.6
imagePullPolicy: IfNotPresent
ports:
- containerPort: 9000
protocol: TCP
env:
- name: ZK_HOSTS
value: 10.1.1.10:2181
- name: APPLICATION_SECRET
value: letmein
terminationGracePeriodSeconds: 30
securityContext:
runAsUser: 0

vim /var/k8s-yaml/kafka-manager/svc.yaml

1
2
3
4
5
6
7
8
9
10
11
12
kind: Service
apiVersion: v1
metadata:
name: kafka-manager
namespace: logging
spec:
ports:
- protocol: TCP
port: 9000
targetPort: 9000
selector:
app: kafka-manager

vim /var/k8s-yaml/kafka-manager/ingress.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
apiVersion: traefik.containo.us/v1alpha1
kind: IngressRoute
metadata:
name: kafka-manager-web
namespace: logging
spec:
entryPoints:
- web
routes:
- match: Host(`km.od.com`)
kind: Rule
services:
- name: kafka-manager
port: 9000

应用配置清单:

1
2
3
kubectl apply -f /var/k8s-yaml/kafka-manager/deployment.yaml
kubectl apply -f /var/k8s-yaml/kafka-manager/svc.yaml
kubectl apply -f /var/k8s-yaml/kafka-manager/ingress.yaml

访问:http://km.od.com

添加集群

集群状态

Fluentd 配置 Kafka

现在有了 Kafka,我们就可以将 Fluentd 的日志数据输出到 Kafka 了,只需要将 Fluentd 配置中的 <match> 更改为使用 Kafka 插件即可,但是在 Fluentd 中输出到 Kafka,需要使用到 fluent-plugin-kafka 插件,所以需要我们自定义下 Docker 镜像,最简单的做法就是在上面 Fluentd 镜像的基础上新增 kafka 插件即可,Dockerfile 文件如下所示:

1
2
3
FROM quay.io/fluentd_elasticsearch/fluentd:v5
RUN echo "source 'https://mirrors.tuna.tsinghua.edu.cn/rubygems/'" > Gemfile && gem install bundler
RUN gem install fluent-plugin-kafka --no-document

使用上面的 Dockerfile 文件构建一个 Docker 镜像即可,我这里构建过后的镜像名为 wangxiansen/fluentd_kafka:v5

vim /var/k8s-yaml/fluentd/configmap.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
kind: ConfigMap
apiVersion: v1
metadata:
name: fluentd-conf
namespace: logging
data:
# 容器日志
containers.input.conf: |-
<source>
@id fluentd-containers.log
@type tail # Fluentd 内置的输入方式,其原理是不停地从源文件中获取新的日志
path /var/log/containers/*.log # Docker 容器日志路径
pos_file /var/log/containers.log.pos # 记录读取的位置
tag raw.kubernetes.* # 设置日志标签
read_from_head true # 从头读取
<parse> # 多行格式化成JSON
# 可以使用我们介绍过的 multiline 插件实现多行日志
@type multi_format # 使用 multi-format-parser 解析器插件
<pattern>
format json # JSON解析器
time_key time # 指定事件时间的时间字段
time_format %Y-%m-%dT%H:%M:%S.%NZ # 时间格式
</pattern>
<pattern>
format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/
time_format %Y-%m-%dT%H:%M:%S.%N%:z
</pattern>
</parse>
</source>

# 在日志输出中检测异常(多行日志),并将其作为一条日志转发
# https://github.com/GoogleCloudPlatform/fluent-plugin-detect-exceptions
<match raw.kubernetes.**> # 匹配tag为raw.kubernetes.**日志信息
@id raw.kubernetes
@type detect_exceptions # 使用detect-exceptions插件处理异常栈信息
remove_tag_prefix raw # 移除 raw 前缀
message log
stream stream
multiline_flush_interval 5
</match>

<filter **> # 拼接日志
@id filter_concat
@type concat # Fluentd Filter 插件,用于连接多个日志中分隔的多行日志
key message
multiline_end_regexp /\n$/ # 以换行符“\n”拼接
separator ""
</filter>

# 添加 Kubernetes metadata 数据
<filter kubernetes.**>
@id filter_kubernetes_metadata
@type kubernetes_metadata
</filter>

# 修复 ES 中的 JSON 字段
# 插件地址:https://github.com/repeatedly/fluent-plugin-multi-format-parser
<filter kubernetes.**>
@id filter_parser
@type parser # multi-format-parser多格式解析器插件
key_name log # 在要解析的日志中指定字段名称
reserve_data true # 在解析结果中保留原始键值对
remove_key_name_field true # key_name 解析成功后删除字段
<parse>
@type multi_format
<pattern>
format json
</pattern>
<pattern>
format none
</pattern>
</parse>
</filter>

# 删除一些多余的属性
<filter kubernetes.**>
@type record_transformer
remove_keys $.docker.container_id,$.kubernetes.container_image_id,$.kubernetes.pod_id,$.kubernetes.namespace_id,$.kubernetes.master_url,$.kubernetes.labels.pod-template-hash
</filter>

# 只保留具有logging=true标签的Pod日志
<filter kubernetes.**>
@id filter_log
@type grep
<regexp>
key $.kubernetes.labels.logging
pattern ^true$
</regexp>
</filter>
<filter **>
@type stdout
</filter>

###### 监听配置,一般用于日志聚合用 ######
forward.input.conf: |-
# 监听通过TCP发送的消息
<source>
@id forward
@type forward
</source>

output.conf: |-
<match **>
@id kafka
@type kafka
@log_level info

# list of seed brokers
brokers "#{ENV['FLUENT_KAFKA_BROKERS'] || 'localhost:9092'}"
use_event_time true

# topic settings 默认topic是messages
topic_key k8slog
default_topic "#{ENV['FLUENT_KAFKA_TOPIC'] || 'messages'}"

# data type settings
<format>
@type json
</format>

# producer settings
required_acks -1
compression_codec gzip
</match>

vim /var/k8s-yaml/fluentd/daemonset.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd
namespace: logging
labels:
app: fluentd
kubernetes.io/cluster-service: 'true'
spec:
selector:
matchLabels:
app: fluentd
template:
metadata:
labels:
app: fluentd
kubernetes.io/cluster-service: 'true'
spec:
tolerations:
- operator: Exists
serviceAccountName: fluentd-es
containers:
- name: fluentd
image: wangxiansen/fluentd_kafka:v5
env:
- name: K8S_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: FLUENT_KAFKA_BROKERS
value: "10.1.1.10:9092"
- name: FLUENT_ELASTICSEARCH_LOGSTASH_PREFIX
value: "k8s"
resources:
limits:
cpu: 100m
memory: 400Mi
requests:
cpu: 50m
memory: 300Mi
volumeMounts:
- name: fluentconfig
mountPath: /etc/fluent/config.d
- name: varlog
mountPath: /var/log
- name: dockerlog
mountPath: /var/log/containers/
# mountPath: /data/docker/containers
volumes:
- name: fluentconfig
configMap:
name: fluentd-conf
- name: varlog
hostPath:
path: /var/log
- name: dockerlog
hostPath:
path: /var/log/containers/
# path: /data/docker/containers

vim /var/k8s-yaml/fluentd/rbac.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
apiVersion: v1
kind: ServiceAccount
metadata:
name: fluentd-es
namespace: logging
labels:
k8s-app: fluentd-es
kubernetes.io/cluster-service: 'true'
addonmanager.kubernetes.io/mode: Reconcile
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: fluentd-es
labels:
k8s-app: fluentd-es
kubernetes.io/cluster-service: 'true'
addonmanager.kubernetes.io/mode: Reconcile
rules:
- apiGroups:
- ''
resources:
- 'namespaces'
- 'pods'
verbs:
- 'get'
- 'watch'
- 'list'
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: fluentd-es
labels:
k8s-app: fluentd-es
kubernetes.io/cluster-service: 'true'
addonmanager.kubernetes.io/mode: Reconcile
subjects:
- kind: ServiceAccount
name: fluentd-es
namespace: logging
apiGroup: ''
roleRef:
kind: ClusterRole
name: fluentd-es
apiGroup: ''

应用配置清单:

1
2
3
kubectl apply -f /var/k8s-yaml/fluentd/rbac.yaml
kubectl apply -f /var/k8s-yaml/fluentd/configmap.yaml
kubectl apply -f /var/k8s-yaml/fluentd/daemonset.yaml

Logstash 精细化数据加工与中转

虽然数据从 Kafka 到 Elasticsearch 的方式多种多样,比如可以使用 Kafka Connect Elasticsearch Connector 来实现,我们这里还是采用更加流行的 Logstash 方案,上面我们已经将日志从 Fluentd 采集输出到 Kafka 中去了,接下来我们使用 Logstash 来连接 Kafka 与 Elasticsearch 间的日志数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
---
apiVersion: storage.k8s.io/v1
kind: StorageClass
# 创建存储动态供给
metadata:
name: local-storage
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer
---
apiVersion: v1
kind: PersistentVolume
# 创建pv
metadata:
name: logstash-local
labels:
pv-type: logstash-local-pv
spec:
accessModes:
- ReadWriteOnce
capacity:
storage: 5Gi
storageClassName: local-storage
local:
path: /data/nfs-volume/logstash
nodeAffinity:
required:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/hostname
operator: In
values:
- k8s-node01.local
---
apiVersion: logstash.k8s.elastic.co/v1alpha1
kind: Logstash
metadata:
name: logstash
namespace: logging
spec:
version: 8.15.1
count: 1
elasticsearchRefs:
- clusterName: k8s
name: elasticsearch
namespace: logging
volumeClaimTemplates:
- metadata:
name : logstash-data
spec:
storageClassName: local-storage
accessModes:
- ReadWriteOnce
selector:
matchLabels:
pv-type: logstash-local-pv
resources:
requests:
storage: 2G
pipelines:
- pipeline.id: main
config.string: |
input {
kafka {
bootstrap_servers => "10.1.1.10:9092"
consumer_threads => 4
group_id => "test_log"
client_id => "test_log" # 注意: 多台logstash实例消费同一个topics时; client_id需要指定不同的名字
topics_pattern => "messages" # 要与fluentd中配置的topic一直
}
}
filter {
json {
source => "message"
}
}
output {
elasticsearch {
hosts => [ "${ELASTICSEARCH_HOSTS}" ]
user => "${ELASTICSEARCH_USER}"
password => "${ELASTICSEARCH_PASSWORD}"
ssl_verification_mode => 'none'
# 索引是根据k8s的namespace区分的
index => "k8s-logstash-%{[kubernetes][namespace_name]}-%{+YYYY.MM.dd}"
}
# 日志输出的标准输出显示
stdout {
codec => rubydebug
}
}

podTemplate:
spec:
nodeSelector:
logstash: "true"
containers:
- name: logstash
env:
- name: ELASTICSEARCH_HOSTS
value: "https://elasticsearch-es-http.logging:9200"
- name: ELASTICSEARCH_SSL_AUTHORITY
value: "false"
- name: ELASTICSEARCH_USER
valueFrom:
secretKeyRef:
key: username
name: elastic-auth
- name: ELASTICSEARCH_PASSWORD
valueFrom:
secretKeyRef:
key: password
name: elastic-auth
- name: LS_JAVA_OPTS
value: "-Xmx512m -Xms512m"
resources:
requests:
memory: 1Gi
cpu: "500m"
limits:
memory: 1Gi
cpu: "700m"

安装 Kibana

参考这个安装:Kubernetes集群监控-使用EL(Fluentd)K实现日志监控和分析

浏览器访问 https://kibana.od.com/ 同样也是用户elastic 密码:admin123

验证EL (Fluentd) K集群

下面我们部署一个简单的测试应用, 新建 counter.yaml 文件,文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
apiVersion: v1
kind: Pod
metadata:
name: counter
labels:
logging: 'true' # 一定要具有该标签才会被采集
spec:
containers:
- name: count
image: busybox
args:
[
/bin/sh,
-c,
'i=0; while true; do echo "$i: $(date)"; i=$((i+1)); sleep 1; done',
]

该 Pod 只是简单将日志信息打印到 stdout,所以正常来说 Fluentd 会收集到这个日志数据,在 Kibana 中也就可以找到对应的日志数据了。回到 Kibana Dashboard 页面,点击左侧最下面的 Management -> Stack Management,进入管理页面,点击左侧 数据 下面的 索引管理 就会发现索引数据:

kibana日志索引

Discover