前言
微服务架构中,前置检查功能项对于服务的可靠性有重要意义,使用场景如:
1、如检查基础服务,如不正常需要熔断
2、如检查被依赖服务,不正常需要熔断
3、本服务有较长的初始化逻辑,需要完成后,才能通知提供正常REST功能
实践
import com.google.common.util.concurrent.ThreadFactoryBuilder;import lombok.extern.slf4j.Slf4j;import java.util.List;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.ScheduledThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.function.Consumer;@Slf4jpublic class ApplicationStartupChecker { private boolean checkSwitch = true; private ScheduledExecutorService scheduledExecutorService; private Consumer passConsumer; private AdvanceCheckRegister advanceCheckRegister = new AdvanceCheckRegisterImpl(); private String threadName; private int checkCycle; private TimeUnit timeUnit; private int checkCount = 0; public ApplicationStartupChecker(String threadName, int checkCycle, TimeUnit timeUnit) { scheduledExecutorService = new ScheduledThreadPoolExecutor( 1, new ThreadFactoryBuilder().setNameFormat(threadName + “-%d”).setDaemon(true).build()); this.threadName = threadName; this.checkCycle = checkCycle; this.timeUnit = timeUnit; } public ApplicationStartupChecker(String threadName) { this(threadName, 30, TimeUnit.SECONDS); } public void bootstrap() { if (scheduledExecutorService.isShutdown() || scheduledExecutorService.isTerminated()) { scheduledExecutorService = new ScheduledThreadPoolExecutor( 1, new ThreadFactoryBuilder().setNameFormat(threadName + “-%d”).build()); } scheduledExecutorService.schedule(this::guidance, 0, TimeUnit.SECONDS); } private void guidance() { if (!checkSwitch) { scheduledExecutorService.shutdown(); log.info(“Not need check”); if (passConsumer != null) { passConsumer.accept(advanceCheckRegister.getCheckerNames()); } } checkCount++; log.info(“No.{} app startup checking”, checkCount); try { advanceCheckRegister.foreach(Checker::check); } catch (Throwable e) { log.error(“check with error.”, e); } boolean pass = advanceCheckRegister.pass(); if (pass) { scheduledExecutorService.shutdown(); log.info(“All check passed:{}”, String.join(“,”, advanceCheckRegister.getCheckerNames())); if (passConsumer != null) { passConsumer.accept(advanceCheckRegister.getCheckerNames()); } } else { scheduledExecutorService.schedule(this::guidance, this.checkCycle, this.timeUnit); } } public void setPassConsumer(Consumer passConsumer) { this.passConsumer = passConsumer; } public void addChecker(Checker kafkaHealthChecker) { advanceCheckRegister.registerEntity(kafkaHealthChecker); }}
配置用于保存检查项:
import java.util.List;import java.util.function.Consumer;public interface AdvanceCheckRegister { void registerEntity(Checker checkEntity); boolean pass(); void foreach(Consumer consumer); List getCheckerNames();}package com.zte.sdn.oscp.check;import java.util.ArrayList;import java.util.List;import java.util.Objects;import java.util.function.Consumer;import java.util.stream.Collectors;public class AdvanceCheckRegisterImpl implements AdvanceCheckRegister { private final List checkEntityList = new ArrayList(); @Override public void registerEntity(Checker entity) { Objects.requireNonNull(entity,”Register Checker can not be null.”); if ( entity.getName() == null || entity.getName().isEmpty()) { throw new IllegalArgumentException(entity.getClass().getSimpleName()+” must have valid name”); } checkEntityList.add(entity); } @Override public boolean pass() { return this.checkEntityList.stream().allMatch(Checker::isPass); } @Override public void foreach(Consumer consumer) { for (Checker checkEntity : checkEntityList) { boolean check = checkEntity.check(); checkEntity.setPass(check); if (!check) { return; } } } @Override public List getCheckerNames() { return checkEntityList.stream().map(Checker::getName).collect(Collectors.toList()); }}
检测项接口
public interface Checker { String getName(); boolean check(); boolean isPass(); void setPass(boolean pass);}package com.zte.sdn.oscp.check;public abstract class AbstractChecker implements Checker { private boolean pass; @Override public boolean isPass() { return this.pass; } @Override public void setPass(boolean pass) { this.pass = pass; }}
检查项
新增Checker项,只需要继承指定接口,实现检测逻辑,即可
public class DataBaseHealthChecker extends AbstractChecker { @Override public String getName() { return “DataBaseHealthChecker”; } @Getter @VisibleForTesting private int count; @Override public boolean check() { count++; if (count >= 3) { return true; } return false; }}
测试
循环检测直至通过,通过后回调对应方法,不过这是单独线程,不会阻塞主线程
public class CheckTest { @Test public void checkTest() throws InterruptedException { ApplicationStartupChecker applicationStartupChecker = new ApplicationStartupChecker(“abc”); KafkaHealthChecker kafkaHealthChecker = new KafkaHealthChecker(); DataBaseHealthChecker dataBaseHealthChecker = new DataBaseHealthChecker(); applicationStartupChecker.addChecker(kafkaHealthChecker); applicationStartupChecker.addChecker(dataBaseHealthChecker); CountDownLatch latch = new CountDownLatch(1); applicationStartupChecker.setPassConsumer(t -> { latch.countDown(); }); applicationStartupChecker.bootstrap(); latch.await(); int kafkaCheck = kafkaHealthChecker.getCount(); int dbCheck = dataBaseHealthChecker.getCount(); Assert.assertEquals(5, kafkaCheck, 0.0); Assert.assertEquals(3, dbCheck, 0.0); } @Test public void checkTestNOChecker() throws InterruptedException { ApplicationStartupChecker applicationStartupChecker = new ApplicationStartupChecker(“abc”); CountDownLatch latch = new CountDownLatch(1); applicationStartupChecker.setPassConsumer(t -> { latch.countDown(); }); applicationStartupChecker.bootstrap(); latch.await(); } @Test public void checkTestNoConsumer() throws InterruptedException { ApplicationStartupChecker applicationStartupChecker = new ApplicationStartupChecker(“abc”); KafkaHealthChecker kafkaHealthChecker = new KafkaHealthChecker(); DataBaseHealthChecker dataBaseHealthChecker = new DataBaseHealthChecker(); applicationStartupChecker.addChecker(kafkaHealthChecker); applicationStartupChecker.addChecker(dataBaseHealthChecker); applicationStartupChecker.bootstrap(); while (dataBaseHealthChecker.getCount() < 3) { TimeUnit.SECONDS.sleep(5); } Assert.assertEquals(5, kafkaHealthChecker.getCount(), 0.0); Assert.assertEquals(3, dataBaseHealthChecker.getCount(), 0.0); } @Test public void checkTestNoConsumerTimeOut() throws InterruptedException { ApplicationStartupChecker applicationStartupChecker = new ApplicationStartupChecker("abc", 2, TimeUnit.SECONDS); KafkaHealthChecker kafkaHealthChecker = new KafkaHealthChecker(); DataBaseHealthChecker dataBaseHealthChecker = new DataBaseHealthChecker(); applicationStartupChecker.addChecker(kafkaHealthChecker); applicationStartupChecker.addChecker(dataBaseHealthChecker); applicationStartupChecker.bootstrap(); while (dataBaseHealthChecker.getCount() { latch.countDown(); }); applicationStartupChecker.bootstrap(); applicationStartupChecker.bootstrap(); applicationStartupChecker.bootstrap(); applicationStartupChecker.bootstrap(); latch.await(); int kafkaCheck = kafkaHealthChecker.getCount(); int dbCheck = dataBaseHealthChecker.getCount(); Assert.assertEquals(5, kafkaCheck, 0.0); Assert.assertEquals(3, dbCheck, 0.0); }}
其它
如果想产生检查不通过,程序不进行下一步动作的强依赖,实际上也简单,只需要在ApplicationStartupChecker中增加
>private CountDownLatch wait=new CountDownLatch(1);
检查通过后减1,主程序中await即可。