项目的业务存在如下场景:当系统启动后,定阅对应的kafka主题后,可能会存在积压的大量消息需要处理,此时如果开放本系统的服务,由于本身正在进行剧烈的数据处理,此时提供的服务并不可靠。
基于些,则需要一种机制在尽可能处理完消息后(或系统处于一个相对稳定的状态)再开放服务。
出于将业务与技术关注点分离的思路,则提炼出本文的所谓静默检查小框架。
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class SilentTimer {
private static final Logger LOGGER = LoggerFactory.getLogger(SilentTimer.class);
private Consumer passFun;
private int silentSpan;
private TimeUnit silentTimeUnit;
public SilentTimer(int silentSpan, TimeUnit silentTimeUnit) {
this.silentSpan = silentSpan;
this.silentTimeUnit = silentTimeUnit;
}
private long currentN1 = 0;
private long currentN2 = 0;
public ScheduledExecutorService getSilentCheckExe() {
return silentCheckExe;
}
private ScheduledExecutorService silentCheckExe;
public void start() {
if (silentCheckExe != null && !silentCheckExe.isShutdown()) {
LOGGER.error("Silent check timer has been run");
return;
}
silentCheckExe = new ScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder().setNameFormat("silent-check-%d").setDaemon(true).build());
long spanMillis = silentTimeUnit.toMillis(silentSpan);
long current = System.currentTimeMillis();
silentCheckExe.scheduleWithFixedDelay(() -> {
if (System.currentTimeMillis() - currentN2 > spanMillis) {
silentCheckExe.shutdown();
if (passFun != null) {
passFun.accept(System.currentTimeMillis() - current);
}
}
if (currentN2 - currentN1 > spanMillis) {
silentCheckExe.shutdown();
if (passFun != null) {
passFun.accept(System.currentTimeMillis() - current);
}
}
}, 2,
silentSpan, silentTimeUnit);
}
public void tick() {
if (currentN1 == 0 || currentN2 == 0) {
long timeMillis = System.currentTimeMillis();
currentN1 = timeMillis;
currentN2 = timeMillis;
return;
}
currentN1 = currentN2;
currentN2 = System.currentTimeMillis();
}
/**
* 入参开启到检测到定时器总共时长ms
*
* @param passFun
*/
public void setPassFun(Consumer passFun) {
this.passFun = passFun;
}
}
SilentTimer仅需要传入二个参数和一个方法,分别是:
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class SilentTimerTest {
@Test
public void testSilent() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean flag = new AtomicBoolean(true);
//静默检查周期6s,等待6s后启动
SilentTimer silentTimer = new SilentTimer(6, TimeUnit.SECONDS);
silentTimer.setPassFun(p -> {
flag.set(false);
latch.countDown();
});
silentTimer.start();
ScheduledExecutorService triggerExe = new ScheduledThreadPoolExecutor(
1, new ThreadFactoryBuilder().setNameFormat("silent-check2-%d")
.setDaemon(true).build());
triggerExe.scheduleAtFixedRate(silentTimer::tick, 0, 7, TimeUnit.SECONDS);
latch.await();
}
@Test
public void testSilent2() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean flag = new AtomicBoolean(true);
//静默检查周期6s,等待6s后启动
SilentTimer silentTimer = new SilentTimer(6, TimeUnit.SECONDS);
silentTimer.setPassFun(p -> {
flag.set(false);
latch.countDown();
});
silentTimer.start();
silentTimer.start();
ScheduledExecutorService triggerExe = new ScheduledThreadPoolExecutor(
1, new ThreadFactoryBuilder().setNameFormat("silent-check2-%d")
.setDaemon(true).build());
triggerExe.scheduleAtFixedRate(silentTimer::tick, 0, 7, TimeUnit.SECONDS);
latch.await();
silentTimer.start();
}
}
| 留言与评论(共有 0 条评论) “” |