elk中logstash集成kafka收集日志

一,elk介绍

elk其实就是Elasticsearch,Logstash, Kibana 三者的简写,其实就是将日志收集到es中,不是打印在控制台或者文件中,这样的好处就是便于我们进行日志的搜索和定位日志,当然这只是输出日志,其实还需要和traceId进行结合,将整体的链路进行串联起来,才可以能更好的发挥日志的用处,其实就是将各个服务器上搜集日志后传输给Logstash,然后Logstash输出到es中,然后日志存储起来

Elasticsearch是个开源分布式搜索引擎,主要是提供数据的存储以及搜索,检索性能比较高

Kibana 是es的可视化界面,进行对es中的数据进行操作

Logstash是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式,是日志的搜集,处理,转发等一系列操作

二,ELK集成方式

Logstash主要有三个插件组成:input、filter和output。输出插件就是input,input其实包含的有很多种,大概有好几十种类,比如有:stdin、file、kafka、beats、elasticsearch、jdbc等,今天我们主要是讲集成kafka,具体的插件可以去官网山进行自行查询https://www.elastic.co/guide/en/logstash/current/input-plugins.html

三,kafka集成方式收集日志

logback中xml配置


elk集成kafka插件

目录结构

BuildStrategy

public abstract class BuildStrategy extends ContextAwareBase {

    /**
     * 消息发送者
     */
    public static LogMessageSender sender;

    /**
     * 获取发送者
     *
     * @return
     */
    public static LogMessageSender sender() {
        return sender;
    }

    /**
     * 构建 {@link LogMessageSender}
     *
     * @return MessageSender
     */
    public abstract LogMessageSender build();


KafkaBuildStrategy

/**
 * Kafka构建策略
 */
@Getter
@Setter
public class KafkaBuildStrategy extends BuildStrategy {

    private KafkaLogBackConfig config;


    @Override
    public LogMessageSender build() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
        props.put(ProducerConfig.CLIENT_ID_CONFIG, StringUtils.isEmpty(this.getConfig().getClientId()) ? ElkConstants.DEFAULT_CLIENT_ID :
                this.getConfig().getClientId());
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
        props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getKeySerializer());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getValueSerializer());
        props.put(ProducerConfig.ACKS_CONFIG, this.config.getAcks());
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, this.config.getBatchSize());
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, this.config.getBufferMemory());
        if (BuildStrategy.sender == null) {
            LogMessageSender sender =
                    LogKafkaSender.builder().topic(this.getConfig().getTopic()).producer(new KafkaProducer<>(props)).build();
            BuildStrategy.sender = sender;
        }
        return sender;
    }
}

KafkaLogBackConfig

@Getter
@Setter
public class KafkaLogBackConfig {
    /**
     * 主题
     */
    private String topic = ElkConstants.DEFAULT_TOPIC;
    /**
     * kafka服务地址配置
     */
    private String bootstrapServers;
    /**
     * ack
     */
    private String acks;
    private Integer batchSize;
    private Integer bufferMemory;
    private String clientId;
    private String compressionType;
    private String keySerializer;
    private String valueSerializer;
    private Integer retries;
    private String transactionIdPrefix;
}

AsynchronousDeliveryStrategy

/**
 * 异步传输策略
 *
 * @Date 2022/8/11 10:06
 * @Version 1.0
 */
public class AsynchronousDeliveryStrategy extends DeliveryStrategy {

    @Override
    public boolean send(LogMessageSender sender, LogJSONEvent event) {
       // return sender.sendAsyncMessage(ElkConstants.DEFAULT_TOPIC, JSON.toJSONString(event));
        return sender.sendAsync(event);
    }
}

BlockingDeliveryStrategy

/**
 * 同步传输策略
 *
 * @Date 2022/8/11 10:28
 * @Version 1.0
 */
public class BlockingDeliveryStrategy extends DeliveryStrategy {

    @Override
    public boolean send(LogMessageSender sender, LogJSONEvent event) {
       // return sender.sendMessage(ElkConstants.DEFAULT_TOPIC, JSON.toJSONString(event));
        return sender.send(event);
    }
}

DeliveryStrategy

/**
 * 日志传输策略
 *
 * @date 2022/8/11 10:07
 */
@Getter
@Setter
public abstract class DeliveryStrategy extends ContextAwareBase {


    /**
     * 发送日志
     *
     * @param sender
     * @param event
     * @return
     */
    public abstract boolean send(LogMessageSender sender, LogJSONEvent event);

}

LoggerEvent

public interface LoggerEvent {
}

LogJSONEvent

/**
 * 日志 事件
 *
 * @Date 2022/8/11 10:11
 * @Version 1.0
 */
@Getter
@Setter
@Builder
public class LogJSONEvent implements LoggerEvent {
    /**
     * 来源应用
     */
    private String source;
    /**
     * 来源应用IP地址
     */
    private String host;

    /**
     * 日志文本
     */
    private String message;

    /**
     * 时间戳
     */
    @JSONField(name = "@timestamp")
    private String timestamp;

    /**
     * 日志输出类名
     */
    private String logger;

    /**
     * 日志级别
     */
    private String level;

    /**
     * 日志输出线程
     */
    private String thread;

    /**
     * 异常堆栈
     */
    private String exception;
    /**
     * traceId
     */
    private String traceId;
}

LogKafkaSender

/**
 * Kafka消息发送者
 */
@Setter
@Getter
@Builder
public class LogKafkaSender extends ContextAwareBase implements LogMessageSender {

    private final KafkaProducer producer;

    private String topic;

    private long timeout;

    @Override
    public boolean send(LoggerEvent event) {
        return send(topic, event);
    }

    @Override
    public boolean sendAsync(LoggerEvent event) {
        if (StringUtils.isEmpty(topic)) {
            topic = ElkConstants.DEFAULT_TOPIC;
        }
        return sendAsync(topic, event);
    }

    @Override
    public boolean send(String topic, LoggerEvent event) {
        try {
            final Future future = producer.send(new ProducerRecord<>(topic, JSON.toJSONString(event)));
            if (timeout > 0L) future.get(timeout, TimeUnit.MILLISECONDS);
            else if (timeout == 0) future.get();
            return true;
        } catch (InterruptedException e) {
            return false;
        } catch (ExecutionException | TimeoutException e) {
            addError("kafka sync send log failed : " + e.getMessage());
        }
        return true;
    }

    @Override
    public boolean sendAsync(String topic, LoggerEvent event) {
        producer.send(new ProducerRecord<>(topic, JSON.toJSONString(event)), (metadata, exception) -> {
            if (exception != null) {
                addError("kafka sync send log failed : " + exception.getMessage());
            }
        });
        return true;
    }

    @Override
    public void init() {
        // init
    }

    @Override
    public void destroy() {
        producer.close();
    }
}

LogMessageSender

/**
 * 消息发送者,处理发送逻辑
 */
public interface LogMessageSender {

    /**
     * 同步发送
     *
     * @param event
     * @return
     */
    boolean send(LoggerEvent event);

    /**
     * 异步发送
     *
     * @param event
     * @return
     */
    boolean  sendAsync(LoggerEvent event);

    /**
     * 同步发送
     *
     * @param topic
     * @param event
     * @return
     */
    boolean send(String topic, LoggerEvent event);

    /**
     * 异步发送
     *
     * @param topic
     * @param event
     * @return
     */
    boolean sendAsync(String topic, LoggerEvent event);

    /**
     * 初始化
     */
    void init();

    /**
     * 销毁
     */
    void destroy();
}

LogbackAppender

@Getter
@Setter
public class LogbackAppender extends LogbackAppenderConfig {
    private BuildStrategy buildStrategy;

    private AsynchronousDeliveryStrategy deliveryStrategy = new AsynchronousDeliveryStrategy();
    private LogMessageSender sender;
    /**
     * 日志来源应用
     */
    private String source;
    /**
     * 编码
     */
    private Encoder encoder;

    @Override
    protected void append(ILoggingEvent event) {
        if (sender != null) {
            IThrowableProxy tp = event.getThrowableProxy();
            deliveryStrategy.send(sender,
                    LogJSONEvent.builder().host(this.getHost())
                            .level(event.getLevel().toString())
                            .source(this.getSource())
                            .message(event.getFormattedMessage())
                            .timestamp(this.getDf().format(new Date(event.getTimeStamp())))
                            .logger(event.getLoggerName())
                            .thread(event.getThreadName())
                            .traceId(event.getMDCPropertyMap().get(Constants.TRACE_ID))
                            .exception(tp == null ? null : ThrowableProxyUtil.asString(tp)).build());
        }
    }

    @Override
    public void start() {
        addInfo("log back add kafka init begin");
        sender = buildStrategy.build();
        if (sender != null) {
            sender.init();
        }
        super.start();
    }
}

LogbackAppenderConfig

@Getter
@Setter
abstract class LogbackAppenderConfig extends UnsynchronizedAppenderBase {
    /**
     * 日期格式化
     */
    private DateFormat df = new SimpleDateFormat(ElkConstants.DEFAULT_DATE_FORMAT);

    /**
     * 日志来源应用IP
     */
    private String host = IpUtils.getHostIp();

}

四,logback的日志文件模板

<?xml version="1.0" encoding="UTF-8"?>

    
    
    
    

    
    
        
            ${log.pattern}
        
    

    
    
        ${log.path}/info.log
        
        
            
            ${log.path}/info.%d{yyyy-MM-dd}.log
            
            60
        
        
            ${log.pattern}
        
        
            
            INFO
            
            ACCEPT
            
            DENY
        
    

    
        ${log.path}/error.log
        
        
            
            ${log.path}/error.%d{yyyy-MM-dd}.log
            
            60
        
        
            ${log.pattern}
        
        
            
            ERROR
            
            ACCEPT
            
            DENY
        
    
      
    
    
    
    
    
    
    
    
    
        
            
                
                LOGSTASH_KAFKA_TOPIC
                ${bootstrapServers}
                ${acks}
                ${batchSize}
                ${bufferMemory}
                ${keySerializer}
                ${valueSerializer}
                ${retries}
            
        
        
        
        
        ${applicationName}
    
    
    
    

    
    


    
        
    

    
    
        
        
        
    

五,logstash的配置文件配置

logstash.conf

input {
	kafka {
		group_id => "kafka-logstash-consumer-group"
		topics => ["LOGSTASH_KAFKA_TOPIC","LOGSTASH_KAFKA_CUSTOM_TOPIC"]   
		bootstrap_servers => "127.0.0.1:9092"
		codec => "json"
	}
}
filter {
	json {
		source =>"message"
	}
    mutate {
        remove_field => ["event"]
    }

	if[source]{
		mutate	{
			add_field => {
				"index_name" =>"%{source}-%{+YYYY.MM.dd}"
			}
		}
	} else {
		mutate	{
			add_field => {
				"index_name" =>"non-source-%{+YYYY.MM.dd}"
			}
		}
	}
}
output {
	elasticsearch {
		action => "index"
		hosts => "localhost:9200"
		index => "clover-%{index_name}"
	}
}

六,kafka的配置

spring: 
  kafka:
    bootstrap-servers: 192.168.101.40:9092
    producer:
      retries: 10
      acks: 1
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章