背景
不管是 Flink On Yarn 还是 On k8s,如果任务正常运行,我们是可以通过 Flink Web UI 去查看 JobManager 和 TaskManager 日志,虽然日志量大的时候去不同的 TaskManager 找日志有点困难(如何快速知道日志在哪个 TaskManager 上;在 TaskManager 里面可能有多个滚动的日志文件,如何快速找到 root cause 异常;如果 TaskManager OOM 掉了该容器的日志就看不到了),但是起码给了一个可以看日志的途径。
熟悉 Flink On Yarn 的应该知道 Flink 任务运行结束/失败后,只能去 Yarn UI 看到任务的 Jobmanager 日志,对于 TaskManager 日志这些是看不到的,这对于有时候想排查下任务失败的原因日志会比较困难(不过大多数任务挂掉的原因日志都会在 Jobmanager 存在)。
熟悉 Flink On K8s 的那更能体验到查看日志的痛苦了,在任务运行失败和结束后,所有的 Pod 都会退出,如果没有收集这些运行日志,那几乎很难知道任务为啥会失败。
Flink History Server 不像 Spark History Server 一样可以看到任务所有运行的 Excutor 日志,所以对于故障定位 Flink 任务异常日志这个场景,Flink 自带的那些体验不是很友好。因此也有本文的出现,来讲述一下如何针对上面两种运行模式下 Flink 任务的日志收集,来解决我们不方便定位任务异常失败的需求。
当然了,我们收集到这些日志数据后,可以用来做异常日志告警提醒任务负责人作业异常信息(这个后面可以专门开篇文章来写),也可以收集起来存储到 ES,方便用户排查任务异常日志。
方案选择
常见的收集日志方案有下面两种:
1、统一 LogAgent 收集。不管是使用 Flink On Yarn 还是 Flink On K8s,日志都可以配置一个路径(路径有规则),然后每台计算节点机器专门部署一个 LogAgent (比如有 Filebeat)去收集这些运行日志。K8s 的话会比 Yarn 的日志要收集的话稍微会复杂一些,需要 Flink 任务挂载磁盘,这样日志文件数据路径比较固定,否则日志文件是在容器 Pod 内,会随着 Pod 的生命周期而消失。这种方式需要在每台机器都部署一个专门用来收集日志的 Agent,还要额外维护它的稳定性,不然可能会漏收集到任务的日志。
2、自定义 Kafka Appender。这种方式要根据日志框架进行自定义一个 Appender,将定义好的 Appender 打包后放到 Flink lib 目录,然后配置好 log4j 配置,任务启动后会自动加载这个依赖,运行过程中会自动实时将日志发送到 Kafka。这个 Appender 定义可以比较灵活(具体的可以看下文的代码实现),比如加入一些过滤条件:只收集 warn 级别以上的日志(因为任务多了的话收集所有的级别日志数据量会很大,但是对排查问题带来的作用有限)。这种方式和任务运行在 Yarn 和 K8s 无关,都可以正常收集日志,不用单独配置,也不用单独去维护什么组件的稳定性,唯一的缺点就是对已经在运行的任务如果想要收集日志需要重启一下即可,相比来说我个人觉得还是这种方式会比较合适。
整个架构如上,你理解图中的 Reporter 包含三个:第一个是自定义的 Kafka Appender,第二个是自定义的 Kafka Metrics Reporter,第三个是根据官方的 Prometheus PushGateway Metrics Reporter 做了内部改造的。前两个是本篇要讲解的,后两个后面也可以单独再开文章来讲。