elk其实就是Elasticsearch,Logstash, Kibana 三者的简写,其实就是将日志收集到es中,不是打印在控制台或者文件中,这样的好处就是便于我们进行日志的搜索和定位日志,当然这只是输出日志,其实还需要和traceId进行结合,将整体的链路进行串联起来,才可以能更好的发挥日志的用处,其实就是将各个服务器上搜集日志后传输给Logstash,然后Logstash输出到es中,然后日志存储起来
Elasticsearch是个开源分布式搜索引擎,主要是提供数据的存储以及搜索,检索性能比较高
Kibana 是es的可视化界面,进行对es中的数据进行操作
Logstash是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式,是日志的搜集,处理,转发等一系列操作
Logstash主要有三个插件组成:input、filter和output。输出插件就是input,input其实包含的有很多种,大概有好几十种类,比如有:stdin、file、kafka、beats、elasticsearch、jdbc等,今天我们主要是讲集成kafka,具体的插件可以去官网山进行自行查询https://www.elastic.co/guide/en/logstash/current/input-plugins.html
public abstract class BuildStrategy extends ContextAwareBase {
/**
* 消息发送者
*/
public static LogMessageSender sender;
/**
* 获取发送者
*
* @return
*/
public static LogMessageSender sender() {
return sender;
}
/**
* 构建 {@link LogMessageSender}
*
* @return MessageSender
*/
public abstract LogMessageSender build();
/**
* Kafka构建策略
*/
@Getter
@Setter
public class KafkaBuildStrategy extends BuildStrategy {
private KafkaLogBackConfig config;
@Override
public LogMessageSender build() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, StringUtils.isEmpty(this.getConfig().getClientId()) ? ElkConstants.DEFAULT_CLIENT_ID :
this.getConfig().getClientId());
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer());
props.put(ProducerConfig.ACKS_CONFIG, this.config.getAcks());
props.put(ProducerConfig.BATCH_SIZE_CONFIG, this.config.getBatchSize());
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, this.config.getBufferMemory());
if (BuildStrategy.sender == null) {
LogMessageSender sender =
LogKafkaSender.builder().topic(this.getConfig().getTopic()).producer(new KafkaProducer<>(props)).build();
BuildStrategy.sender = sender;
}
return sender;
}
}@Getter
@Setter
public class KafkaLogBackConfig {
/**
* 主题
*/
private String topic = ElkConstants.DEFAULT_TOPIC;
/**
* kafka服务地址配置
*/
private String bootstrapServers;
/**
* ack
*/
private String acks;
private Integer batchSize;
private Integer bufferMemory;
private String clientId;
private String compressionType;
private String keySerializer;
private String valueSerializer;
private Integer retries;
private String transactionIdPrefix;
}/**
* 异步传输策略
*
* @Date 2022/8/11 10:06
* @Version 1.0
*/
public class AsynchronousDeliveryStrategy extends DeliveryStrategy {
@Override
public boolean send(LogMessageSender sender, LogJSONEvent event) {
// return sender.sendAsyncMessage(ElkConstants.DEFAULT_TOPIC, JSON.toJSONString(event));
return sender.sendAsync(event);
}
}/**
* 同步传输策略
*
* @Date 2022/8/11 10:28
* @Version 1.0
*/
public class BlockingDeliveryStrategy extends DeliveryStrategy {
@Override
public boolean send(LogMessageSender sender, LogJSONEvent event) {
// return sender.sendMessage(ElkConstants.DEFAULT_TOPIC, JSON.toJSONString(event));
return sender.send(event);
}
}/**
* 日志传输策略
*
* @date 2022/8/11 10:07
*/
@Getter
@Setter
public abstract class DeliveryStrategy extends ContextAwareBase {
/**
* 发送日志
*
* @param sender
* @param event
* @return
*/
public abstract boolean send(LogMessageSender sender, LogJSONEvent event);
}public interface LoggerEvent {
}/**
* 日志 事件
*
* @Date 2022/8/11 10:11
* @Version 1.0
*/
@Getter
@Setter
@Builder
public class LogJSONEvent implements LoggerEvent {
/**
* 来源应用
*/
private String source;
/**
* 来源应用IP地址
*/
private String host;
/**
* 日志文本
*/
private String message;
/**
* 时间戳
*/
@JSONField(name = "@timestamp")
private String timestamp;
/**
* 日志输出类名
*/
private String logger;
/**
* 日志级别
*/
private String level;
/**
* 日志输出线程
*/
private String thread;
/**
* 异常堆栈
*/
private String exception;
/**
* traceId
*/
private String traceId;
}
/**
* Kafka消息发送者
*/
@Setter
@Getter
@Builder
public class LogKafkaSender extends ContextAwareBase implements LogMessageSender {
private final KafkaProducer producer;
private String topic;
private long timeout;
@Override
public boolean send(LoggerEvent event) {
return send(topic, event);
}
@Override
public boolean sendAsync(LoggerEvent event) {
if (StringUtils.isEmpty(topic)) {
topic = ElkConstants.DEFAULT_TOPIC;
}
return sendAsync(topic, event);
}
@Override
public boolean send(String topic, LoggerEvent event) {
try {
final Future future = producer.send(new ProducerRecord<>(topic, JSON.toJSONString(event)));
if (timeout > 0L) future.get(timeout, TimeUnit.MILLISECONDS);
else if (timeout == 0) future.get();
return true;
} catch (InterruptedException e) {
return false;
} catch (ExecutionException | TimeoutException e) {
addError("kafka sync send log failed : " + e.getMessage());
}
return true;
}
@Override
public boolean sendAsync(String topic, LoggerEvent event) {
producer.send(new ProducerRecord<>(topic, JSON.toJSONString(event)), (metadata, exception) -> {
if (exception != null) {
addError("kafka sync send log failed : " + exception.getMessage());
}
});
return true;
}
@Override
public void init() {
// init
}
@Override
public void destroy() {
producer.close();
}
}
/**
* 消息发送者,处理发送逻辑
*/
public interface LogMessageSender {
/**
* 同步发送
*
* @param event
* @return
*/
boolean send(LoggerEvent event);
/**
* 异步发送
*
* @param event
* @return
*/
boolean sendAsync(LoggerEvent event);
/**
* 同步发送
*
* @param topic
* @param event
* @return
*/
boolean send(String topic, LoggerEvent event);
/**
* 异步发送
*
* @param topic
* @param event
* @return
*/
boolean sendAsync(String topic, LoggerEvent event);
/**
* 初始化
*/
void init();
/**
* 销毁
*/
void destroy();
}@Getter
@Setter
public class LogbackAppender extends LogbackAppenderConfig {
private BuildStrategy buildStrategy;
private AsynchronousDeliveryStrategy deliveryStrategy = new AsynchronousDeliveryStrategy();
private LogMessageSender sender;
/**
* 日志来源应用
*/
private String source;
/**
* 编码
*/
private Encoder encoder;
@Override
protected void append(ILoggingEvent event) {
if (sender != null) {
IThrowableProxy tp = event.getThrowableProxy();
deliveryStrategy.send(sender,
LogJSONEvent.builder().host(this.getHost())
.level(event.getLevel().toString())
.source(this.getSource())
.message(event.getFormattedMessage())
.timestamp(this.getDf().format(new Date(event.getTimeStamp())))
.logger(event.getLoggerName())
.thread(event.getThreadName())
.traceId(event.getMDCPropertyMap().get(Constants.TRACE_ID))
.exception(tp == null ? null : ThrowableProxyUtil.asString(tp)).build());
}
}
@Override
public void start() {
addInfo("log back add kafka init begin");
sender = buildStrategy.build();
if (sender != null) {
sender.init();
}
super.start();
}
}
@Getter
@Setter
abstract class LogbackAppenderConfig extends UnsynchronizedAppenderBase {
/**
* 日期格式化
*/
private DateFormat df = new SimpleDateFormat(ElkConstants.DEFAULT_DATE_FORMAT);
/**
* 日志来源应用IP
*/
private String host = IpUtils.getHostIp();
} <?xml version="1.0" encoding="UTF-8"?>
${log.pattern}
${log.path}/info.log
${log.path}/info.%d{yyyy-MM-dd}.log
60
${log.pattern}
INFO
ACCEPT
DENY
${log.path}/error.log
${log.path}/error.%d{yyyy-MM-dd}.log
60
${log.pattern}
ERROR
ACCEPT
DENY
LOGSTASH_KAFKA_TOPIC
${bootstrapServers}
${acks}
${batchSize}
${bufferMemory}
${keySerializer}
${valueSerializer}
${retries}
${applicationName}
input {
kafka {
group_id => "kafka-logstash-consumer-group"
topics => ["LOGSTASH_KAFKA_TOPIC","LOGSTASH_KAFKA_CUSTOM_TOPIC"]
bootstrap_servers => "127.0.0.1:9092"
codec => "json"
}
}
filter {
json {
source =>"message"
}
mutate {
remove_field => ["event"]
}
if[source]{
mutate {
add_field => {
"index_name" =>"%{source}-%{+YYYY.MM.dd}"
}
}
} else {
mutate {
add_field => {
"index_name" =>"non-source-%{+YYYY.MM.dd}"
}
}
}
}
output {
elasticsearch {
action => "index"
hosts => "localhost:9200"
index => "clover-%{index_name}"
}
}spring:
kafka:
bootstrap-servers: 192.168.101.40:9092
producer:
retries: 10
acks: 1
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer | 留言与评论(共有 0 条评论) “” |