静默检查

一、前言

项目的业务存在如下场景:当系统启动后,定阅对应的kafka主题后,可能会存在积压的大量消息需要处理,此时如果开放本系统的服务,由于本身正在进行剧烈的数据处理,此时提供的服务并不可靠。

基于些,则需要一种机制在尽可能处理完消息后(或系统处于一个相对稳定的状态)再开放服务。

二、实践

出于将业务与技术关注点分离的思路,则提炼出本文的所谓静默检查小框架。

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

public class SilentTimer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SilentTimer.class);
    private Consumer passFun;
    private int silentSpan;
    private TimeUnit silentTimeUnit;

    public SilentTimer(int silentSpan, TimeUnit silentTimeUnit) {
        this.silentSpan = silentSpan;
        this.silentTimeUnit = silentTimeUnit;
    }

    private long currentN1 = 0;
    private long currentN2 = 0;

    public ScheduledExecutorService getSilentCheckExe() {
        return silentCheckExe;
    }

    private ScheduledExecutorService silentCheckExe;

    public void start() {
        if (silentCheckExe != null && !silentCheckExe.isShutdown()) {
            LOGGER.error("Silent check timer has been run");
            return;
        }
        silentCheckExe = new ScheduledThreadPoolExecutor(1,
                new ThreadFactoryBuilder().setNameFormat("silent-check-%d").setDaemon(true).build());
        long spanMillis = silentTimeUnit.toMillis(silentSpan);
        long current = System.currentTimeMillis();
        silentCheckExe.scheduleWithFixedDelay(() -> {
            if (System.currentTimeMillis() - currentN2 > spanMillis) {
                silentCheckExe.shutdown();
                if (passFun != null) {
                    passFun.accept(System.currentTimeMillis() - current);
                }
            }
            if (currentN2 - currentN1 > spanMillis) {
                silentCheckExe.shutdown();
                if (passFun != null) {
                    passFun.accept(System.currentTimeMillis() - current);
                }
            }
        }, 2,
        silentSpan, silentTimeUnit);
    }

    public void tick() {
        if (currentN1 == 0 || currentN2 == 0) {
            long timeMillis = System.currentTimeMillis();
            currentN1 = timeMillis;
            currentN2 = timeMillis;
            return;
        }
        currentN1 = currentN2;
        currentN2 = System.currentTimeMillis();
    }

    /**
     * 入参开启到检测到定时器总共时长ms
     *
     * @param passFun
     */
    public void setPassFun(Consumer passFun) {
        this.passFun = passFun;
    }
}


SilentTimer仅需要传入二个参数和一个方法,分别是:

  • 静默检查的周期时间
  • 静默检查通过后的回调逻辑
  • 触发静默重置方法

三、测试


import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;


public class SilentTimerTest {

    @Test
    public void testSilent() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        AtomicBoolean flag = new AtomicBoolean(true);
        //静默检查周期6s,等待6s后启动
        SilentTimer silentTimer = new SilentTimer(6, TimeUnit.SECONDS);
        silentTimer.setPassFun(p -> {
            flag.set(false);
            latch.countDown();
        });
        silentTimer.start();
        ScheduledExecutorService triggerExe = new ScheduledThreadPoolExecutor(
                1, new ThreadFactoryBuilder().setNameFormat("silent-check2-%d")
                .setDaemon(true).build());
        triggerExe.scheduleAtFixedRate(silentTimer::tick, 0, 7, TimeUnit.SECONDS);
        latch.await();
    }

    @Test
    public void testSilent2() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        AtomicBoolean flag = new AtomicBoolean(true);
        //静默检查周期6s,等待6s后启动
        SilentTimer silentTimer = new SilentTimer(6, TimeUnit.SECONDS);
        silentTimer.setPassFun(p -> {
            flag.set(false);
            latch.countDown();
        });
        silentTimer.start();
        silentTimer.start();
        ScheduledExecutorService triggerExe = new ScheduledThreadPoolExecutor(
                1, new ThreadFactoryBuilder().setNameFormat("silent-check2-%d")
                .setDaemon(true).build());
        triggerExe.scheduleAtFixedRate(silentTimer::tick, 0, 7, TimeUnit.SECONDS);
        latch.await();
        silentTimer.start();
    }
}



发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章