服务注册:
服务发现:
服务注册,等对应的service的容器启动成功,针对微服务项目,一般是spring boot内置的tomcat启动成功,这个服务才可以使用,这个时候才可以将服务注册到zookeeper中。
那么如何知道tomcat容器启动成功了呢?
通过spring的事件监听机制,当tomcat启动成功会发布一个事件,我们可以监听这个事件,当tomcat启动成功做出相应。
创建springboot项目order-service
org.springframework.boot
spring-boot-starter-parent
2.6.8
org.springframework.boot
spring-boot-starter
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-test
test
org.apache.curator
curator-recipes
5.2.1
server.ip=192.168.9.1
server.port=9090
# 自定义的配置信息
zk.service-name=order-service
zk.server=192.168.1.104:2181监听spring web服务器已经初始化完成事件 WebServerInitializedEvent
public class ZkApplicationListener implements ApplicationListener {
@Override
public void onApplicationEvent(WebServerInitializedEvent event) {
System.out.println("事件监听机制的回调...");
// 获取app.properties配置属性
Environment environment = event.getApplicationContext().getEnvironment();
String serviceName = environment.getProperty("zk.service-name");
String ip = environment.getProperty("server.ip");
String port = environment.getProperty("server.port");
String zkServer = environment.getProperty("zk.server");
// 服务注册
ServiceRegistry zookeeperServiceRegistry = new ZookeeperServiceRegistry(serviceName,ip,port,zkServer);
zookeeperServiceRegistry.register();
}
} # Application Listeners
org.springframework.context.ApplicationListener=\
com.zk.serviceregistry.orderservice.listener.ZkApplicationListener// spring cloud 团队提供了服务注册的接口
public interface ServiceRegistry {
void register();
}public class ZookeeperServiceRegistry implements ServiceRegistry {
private CuratorFramework curatorFramework;
private final String ip;
private final String port;
private final String serviceName;
private final String basePath = "/zk-registry";
public ZookeeperServiceRegistry(String serviceName, String ip, String port, String zkServer) {
this.serviceName = serviceName;
this.ip = ip;
this.port = port;
this.curatorFramework = CuratorFrameworkFactory
.builder()
.connectionTimeoutMs(20000)
.connectString(zkServer)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework.start();
}
@Override
public void register() {
// 服务名称
String serviceNamePath = basePath + "/" + serviceName;
try {
if (curatorFramework.checkExists().forPath(serviceNamePath) == null) {
// 创建持久化的节点,作为服务名称
this.curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(serviceNamePath);
}
String urlNode = curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(serviceNamePath + "/" + ip + ":" + port);
System.out.println("服务 " + urlNode + " 成功注册到zookeeper server...");
} catch (Exception e) {
e.printStackTrace();
}
}
}会发现服务注册已经生效,日志中打印127.0.0.1:9090已经注册到zookeeper server
查看zookeeper,发现创建了新的节点
启动多个服务192.168.9.1:9091,192.168.9.1:9092,192.168.9.1:9093,192.168.9.1:9094,新的服务ip:port也会被依次注册到zookeeper中
停掉某个服务比如192.168.9.1:9094去模拟某个服务宕机的情况,当zookeeper server在一定时间内(默认30s)没有收到来自192.168.9.1:9094服务的反馈时,就会认为此服务已经挂了,会将此服务从zookeeper节点中删除
创建springboot项目user-service
org.springframework.boot
spring-boot-starter-parent
2.6.8
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-test
test
org.apache.curator
curator-recipes
5.2.1
server.port=9999
zk.server=192.168.1.104:2181org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.zk.servicediscovery.userservice.config.ZookeeperDiscoveryAutoConfiguration@Configuration
public class ZookeeperDiscoveryAutoConfiguration {
@Resource
private Environment environment;
@Bean
public ServiceDiscoveryImpl serviceDiscovery(){
return new ServiceDiscoveryImpl(environment.getProperty("zk.server"));
}
}public interface ServiceDiscovery {
// 服务发现:获取所有子节点(所有可用的服务url列表)
List discovery(String serviceName);
// 注册监听:当子节点发生变更(代表有新服务添加或者有服务宕机),则会触发监听,更新服务url列表
void registerWatch(String serviceNamePath);
} public class ServiceDiscoveryImpl implements ServiceDiscovery {
private final CuratorFramework curatorFramework;
private final String basePath = "/zk-registry";
public ServiceDiscoveryImpl(String zkServer) {
this.curatorFramework = CuratorFrameworkFactory
.builder()
.connectionTimeoutMs(20000)
.connectString(zkServer)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework.start();
}
@Override
public List discovery(String serviceName) {
// /zk-registry/order-service
String serviceNamePath = basePath + "/" + serviceName;
try {
if (this.curatorFramework.checkExists().forPath(serviceNamePath) != null) {
return this.curatorFramework.getChildren().forPath(serviceNamePath);
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public void registerWatch(String serviceNamePath) {
// 永久的监听,当/zk-registry/order-service下的子节点变动,则更新
CuratorCache curatorCache = CuratorCache.build(curatorFramework, serviceNamePath);
CuratorCacheListener listener = CuratorCacheListener.builder().forPathChildrenCache(serviceNamePath, curatorFramework, new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
// 拉模式
System.out.println("最新的urls为: " + curatorFramework.getChildren().forPath(serviceNamePath));
}
}).build();
curatorCache.listenable().addListener(listener);
curatorCache.start();
}
} public interface LoadBalance {
String select(List urls);
} public class RandomLoadBalance implements LoadBalance{
@Override
public String select(List urls) {
int len=urls.size();
Random random=new Random();
return urls.get(random.nextInt(len));
}
} @RestController
@RequestMapping("/user")
public class UserController {
@Autowired
private ServiceDiscovery serviceDiscovery;
@RequestMapping("/discovery")
public void discovery() throws IOException {
List urls= this.serviceDiscovery.discovery("order-service");
LoadBalance loadBalance=new RandomLoadBalance();
String url = loadBalance.select(urls);
System.out.println("获取可用的order-service服务节点路径为: "+url);
String response = new RestTemplate().getForObject("http://" + url + "/order/query", String.class);
System.out.println("order-service response: "+response);
// 添加对节点order-service的监听
this.serviceDiscovery.registerWatch("/zk-registry/order-service");
}
} 访问 http://192.168.9.1:9999/user/discovery 测试
停掉order-service某个服务节点,不需要重启,再次访问user-service
创建spring-cloud-zookeeper的spring boot项目,Spring Boot版本为2.6.8
org.springframework.boot
spring-boot-starter-parent
2.6.8
org.springframework.boot
spring-boot-starter-web
org.springframework.boot
spring-boot-starter-test
test
org.springframework.cloud
spring-cloud-starter-zookeeper-config
org.springframework.cloud
spring-cloud-starter-bootstrap
org.springframework.cloud
spring-cloud-starter-zookeeper-discovery
org.springframework.cloud
spring-cloud-dependencies
2021.0.3
import
pom
application.properties
server.port=9091
spring.cloud.zookeeper.connect-string=192.168.1.104:2181
spring.cloud.zookeeper.discovery.root=/services/registries
spring.application.name=spring-cloud-zookeeper或者application.yml
spring:
cloud:
zookeeper:
connect-string: 192.168.1.104:2181
discovery:
root: /services/registries
application:
name: spring-cloud-zookeeper
server:
port: 9091spring:
profiles:
active: dev
application:
name: spring-cloud-zookeeper # 找哪一个ZNode节点 spring-cloud-zookeeper-dev
cloud:
zookeeper:
config:
root: config # 相当于 /zk-config/spring-cloud-zookeeper-dev
profile-separator: "-"
enabled: true
connect-string: 192.168.1.104:2181可以发现zookeeper server上自动创建了对应的节点
@RestController
public class SpringCloudZkDiscoveryController {
// 1.注入服务发现客户端接口
@Autowired
private DiscoveryClient discoveryClient;
@RequestMapping("/sc-zk-discovery")
public List serviceUrl() {
// 2.调用getInstances方法可获得所有可用实例
List instances = discoveryClient.getInstances("spring-cloud-zookeeper");
String url = instances.get(0).getUri().toString();
System.out.println("url=" + url);
return discoveryClient.getInstances("spring-cloud-zookeeper");
}
} 访问测试
类似于我们手动实现服务注册,Spring Cloud也自定义了一个监听器 AbstractAutoServiceRegistration 去监听 web服务器启动事件 WebServerInitializedEvent
org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#onApplicationEvent(WebServerInitializedEvent)源代码片段:public abstract class AbstractAutoServiceRegistration implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener {
private final ServiceRegistry serviceRegistry;
// .................
public void onApplicationEvent(WebServerInitializedEvent event) {
this.bind(event);
}
@Deprecated
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) {
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}
}
public void start() {
if (!this.isEnabled()) {
} else {
if (!this.running.get()) {
this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));
this.register();
if (this.shouldRegisterManagement()) {
this.registerManagement();
}
this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));
this.running.compareAndSet(false, true);
}
}
}
} getRegistration() 获取具体注册实现类
org.springframework.cloud.zookeeper.serviceregistry.ZookeeperAutoServiceRegistrationserviceRegistry.register(registration) 具体服务注册实现
public class ZookeeperServiceRegistry implements ServiceRegistry, SmartInitializingSingleton, Closeable {
protected CuratorFramework curator;
protected ZookeeperDiscoveryProperties properties;
private ServiceDiscovery serviceDiscovery;
public void register(ZookeeperRegistration registration) {
try {
this.getServiceDiscovery().registerService(registration.getServiceInstance());
} catch (Exception var3) {
ReflectionUtils.rethrowRuntimeException(var3);
}
}
} 服务发现实现类:创建zookeeper节点,创建节点监听
public class ServiceDiscoveryImpl implements ServiceDiscovery {
public void registerService(ServiceInstance service) throws Exception {
ServiceDiscoveryImpl.Entry newEntry = new ServiceDiscoveryImpl.Entry(service);
ServiceDiscoveryImpl.Entry oldEntry = (ServiceDiscoveryImpl.Entry)this.services.putIfAbsent(service.getId(), newEntry);
ServiceDiscoveryImpl.Entry useEntry = oldEntry != null ? oldEntry : newEntry;
synchronized(useEntry) {
if (useEntry == newEntry) {
// 创建节点监听
useEntry.cache = this.makeNodeCache(service);
}
// 创建zookeeper节点
this.internalRegisterService(service);
}
}
// 创建节点监听
private CuratorCacheBridge makeNodeCache(ServiceInstance instance) {
if (!this.watchInstances) {
return null;
} else {
CuratorCacheBridge cache = CuratorCache.bridgeBuilder(this.client, this.pathForInstance(instance.getName(), instance.getId())).withOptions(new Options[]{Options.SINGLE_NODE_CACHE}).withDataNotCached().build();
CuratorCacheListener listener = CuratorCacheListener.builder().afterInitialized().forAll((__, ___, data) -> {
if (data != null) {
try {
ServiceInstance newInstance = this.serializer.deserialize(data.getData());
ServiceDiscoveryImpl.Entry entry = (ServiceDiscoveryImpl.Entry)this.services.get(newInstance.getId());
if (entry != null) {
synchronized(entry) {
entry.service = newInstance;
}
}
} catch (Exception var10) {
this.log.debug("Could not deserialize: " + data.getPath());
}
} else {
this.log.warn("Instance data has been deleted for: " + instance);
}
}).build();
cache.listenable().addListener(listener);
cache.start();
return cache;
}
}
// 创建zookeeper节点
@VisibleForTesting
protected void internalRegisterService(ServiceInstance service) throws Exception {
byte[] bytes = this.serializer.serialize(service);
String path = this.pathForInstance(service.getName(), service.getId());
int MAX_TRIES = true;
boolean isDone = false;
for(int i = 0; !isDone && i < 2; ++i) {
try {
CreateMode mode;
switch(service.getServiceType()) {
case DYNAMIC:
mode = CreateMode.EPHEMERAL;
break;
case DYNAMIC_SEQUENTIAL:
mode = CreateMode.EPHEMERAL_SEQUENTIAL;
break;
default:
mode = CreateMode.PERSISTENT;
}
((ACLBackgroundPathAndBytesable)this.client.create().creatingParentContainersIfNeeded().withMode(mode)).forPath(path, bytes);
isDone = true;
} catch (NodeExistsException var8) {
this.client.delete().forPath(path);
}
}
}
} SpringBoot项目启动 -> webServer启动 -> 监听器监听服务启动事件执行流程:
SpringApplication.run(args) -> refreshContext(context) -> refresh(context) -> ServletWebServerApplicationContext.refresh() -> AbstractApplicationContext.refresh() -> finishRefresh() -> DefaultLifecycleProcessor.onRefresh() -> startBeans(true) -> DefaultLifecycleProcessor$LifecycleGroup.start() -> doStart() -> WebServerStartStopLifecycle.start() -> AbstractApplicationContext.publishEvent(new ServletWebServerInitializedEvent(this.webServer, this.applicationContext)) -> SimpleApplicationEventMulticaster.multicastEvent(applicationEvent, eventType) -> invokeListener(listener, event) -> doInvokeListener(listener, event) -> listener.onApplicationEvent(event);堆栈信息:
onApplicationEvent:12, ZkApplicationListener (com.zk.serviceregistry.orderservice.listener)
doInvokeListener:176, SimpleApplicationEventMulticaster (org.springframework.context.event)
invokeListener:169, SimpleApplicationEventMulticaster (org.springframework.context.event)
multicastEvent:143, SimpleApplicationEventMulticaster (org.springframework.context.event)
publishEvent:421, AbstractApplicationContext (org.springframework.context.support)
publishEvent:378, AbstractApplicationContext (org.springframework.context.support)
start:46, WebServerStartStopLifecycle (org.springframework.boot.web.servlet.context)
doStart:178, DefaultLifecycleProcessor (org.springframework.context.support)
access$200:54, DefaultLifecycleProcessor (org.springframework.context.support)
start:356, DefaultLifecycleProcessor$LifecycleGroup (org.springframework.context.support)
accept:-1, 1643565953 (org.springframework.context.support.DefaultLifecycleProcessor$Lambda$541)
forEach:75, Iterable (java.lang)
startBeans:155, DefaultLifecycleProcessor (org.springframework.context.support)
onRefresh:123, DefaultLifecycleProcessor (org.springframework.context.support)
finishRefresh:935, AbstractApplicationContext (org.springframework.context.support)
refresh:586, AbstractApplicationContext (org.springframework.context.support)
refresh:145, ServletWebServerApplicationContext (org.springframework.boot.web.servlet.context)
refresh:745, SpringApplication (org.springframework.boot)
refreshContext:420, SpringApplication (org.springframework.boot)
run:307, SpringApplication (org.springframework.boot)
run:1317, SpringApplication (org.springframework.boot)
run:1306, SpringApplication (org.springframework.boot)
main:10, OrderServiceApplication (com.zk.serviceregistry.orderservice)1) SpringApplicaton.run()
public class SpringApplication {
public ConfigurableApplicationContext run(String... args) {
long startTime = System.nanoTime();
DefaultBootstrapContext bootstrapContext = this.createBootstrapContext();
ConfigurableApplicationContext context = null;
this.configureHeadlessProperty();
SpringApplicationRunListeners listeners = this.getRunListeners(args);
listeners.starting(bootstrapContext, this.mainApplicationClass);
try {
ApplicationArguments applicationArguments = new DefaultApplicationArguments(args);
ConfigurableEnvironment environment = this.prepareEnvironment(listeners, bootstrapContext, applicationArguments);
this.configureIgnoreBeanInfo(environment);
Banner printedBanner = this.printBanner(environment);
context = this.createApplicationContext();
context.setApplicationStartup(this.applicationStartup);
this.prepareContext(bootstrapContext, context, environment, listeners, applicationArguments, printedBanner);
this.refreshContext(context); // 1
this.afterRefresh(context, applicationArguments);
Duration timeTakenToStartup = Duration.ofNanos(System.nanoTime() - startTime);
if (this.logStartupInfo) {
(new StartupInfoLogger(this.mainApplicationClass)).logStarted(this.getApplicationLog(), timeTakenToStartup);
}
listeners.started(context, timeTakenToStartup);
this.callRunners(context, applicationArguments);
} catch (Throwable var12) {
this.handleRunFailure(context, var12, listeners);
throw new IllegalStateException(var12);
}
try {
Duration timeTakenToReady = Duration.ofNanos(System.nanoTime() - startTime);
listeners.ready(context, timeTakenToReady);
return context;
} catch (Throwable var11) {
this.handleRunFailure(context, var11, (SpringApplicationRunListeners)null);
throw new IllegalStateException(var11);
}
}
}2) refreshContext(context)
private void refreshContext(ConfigurableApplicationContext context) {
if (this.registerShutdownHook) {
shutdownHook.registerApplicationContext(context);
}
this.refresh(context); // 2
}3) AbstractApplicationContext.refresh()
public void refresh() throws BeansException, IllegalStateException {
synchronized(this.startupShutdownMonitor) {
StartupStep contextRefresh = this.applicationStartup.start("spring.context.refresh");
this.prepareRefresh();
ConfigurableListableBeanFactory beanFactory = this.obtainFreshBeanFactory();
this.prepareBeanFactory(beanFactory);
try {
this.postProcessBeanFactory(beanFactory);
StartupStep beanPostProcess = this.applicationStartup.start("spring.context.beans.post-process");
this.invokeBeanFactoryPostProcessors(beanFactory);
this.registerBeanPostProcessors(beanFactory);
beanPostProcess.end();
this.initMessageSource();
this.initApplicationEventMulticaster();
this.onRefresh();
this.registerListeners();
this.finishBeanFactoryInitialization(beanFactory);
this.finishRefresh(); // 3
} catch (BeansException var10) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("Exception encountered during context initialization - cancelling refresh attempt: " + var10);
}
this.destroyBeans();
this.cancelRefresh(var10);
throw var10;
} finally {
this.resetCommonCaches();
contextRefresh.end();
}
}
}4) finishRefresh()
protected void finishRefresh() {
this.clearResourceCaches();
this.initLifecycleProcessor();
this.getLifecycleProcessor().onRefresh(); // 4
this.publishEvent((ApplicationEvent)(new ContextRefreshedEvent(this)));
if (!NativeDetector.inNativeImage()) {
LiveBeansView.registerApplicationContext(this);
}
}5) DefaultLifecycleProcessor.onRefresh()
public void onRefresh() {
this.startBeans(true); // 5
this.running = true;
}6) startBeans(true)
private void startBeans(boolean autoStartupOnly) {
Map lifecycleBeans = this.getLifecycleBeans();
Map phases = new TreeMap();
lifecycleBeans.forEach((beanName, bean) -> {
if (!autoStartupOnly || bean instanceof SmartLifecycle && ((SmartLifecycle)bean).isAutoStartup()) {
int phase = this.getPhase(bean);
((DefaultLifecycleProcessor.LifecycleGroup)phases.computeIfAbsent(phase, (p) -> {
return new DefaultLifecycleProcessor.LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
})).add(beanName, bean);
}
});
if (!phases.isEmpty()) {
phases.values().forEach(DefaultLifecycleProcessor.LifecycleGroup::start); // 6
}
} 7) DefaultLifecycleProcessor$LifecycleGroup.start()
public void start() {
if (!this.members.isEmpty()) {
if (DefaultLifecycleProcessor.this.logger.isDebugEnabled()) {
DefaultLifecycleProcessor.this.logger.debug("Starting beans in phase " + this.phase);
}
Collections.sort(this.members);
Iterator var1 = this.members.iterator();
while(var1.hasNext()) {
DefaultLifecycleProcessor.LifecycleGroupMember member = (DefaultLifecycleProcessor.LifecycleGroupMember)var1.next();
DefaultLifecycleProcessor.this.doStart(this.lifecycleBeans, member.name, this.autoStartupOnly); // 7
}
}
}8) DefaultLifecycleProcessor.doStart()
private void doStart(Map lifecycleBeans, String beanName, boolean autoStartupOnly) {
Lifecycle bean = (Lifecycle)lifecycleBeans.remove(beanName);
if (bean != null && bean != this) {
String[] dependenciesForBean = this.getBeanFactory().getDependenciesForBean(beanName);
String[] var6 = dependenciesForBean;
int var7 = dependenciesForBean.length;
for(int var8 = 0; var8 < var7; ++var8) {
String dependency = var6[var8];
this.doStart(lifecycleBeans, dependency, autoStartupOnly);
}
if (!bean.isRunning() && (!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle)bean).isAutoStartup())) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Starting bean '" + beanName + "' of type [" + bean.getClass().getName() + "]");
}
try {
bean.start(); // 8
} catch (Throwable var10) {
throw new ApplicationContextException("Failed to start bean '" + beanName + "'", var10);
}
}
}
} 9) 发布web服务启动完成事件
事件 ServletWebServerInitializedEvent extends WebServerInitializedEvent
public void start() {
this.webServer.start();
this.running = true;
this.applicationContext.publishEvent(new ServletWebServerInitializedEvent(this.webServer, this.applicationContext)); // 9
}10) 发布事件 AbstractApplicationContext.publishEvent()
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
} else {
this.getApplicationEventMulticaster().multicastEvent((ApplicationEvent)applicationEvent, eventType); // 10
}
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext)this.parent).publishEvent(event, eventType);
} else {
this.parent.publishEvent(event);
}
}
}11) 广播器发布事件 SimpleApplicationEventMulticaster.multicastEvent(event, eventType)
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = eventType != null ? eventType : this.resolveDefaultEventType(event);
Executor executor = this.getTaskExecutor();
Iterator var5 = this.getApplicationListeners(event, type).iterator();
while(var5.hasNext()) {
ApplicationListener<?> listener = (ApplicationListener)var5.next();
if (executor != null) {
executor.execute(() -> {
this.invokeListener(listener, event);
});
} else {
this.invokeListener(listener, event); // 11
}
}
}12) 调用监听器 invokeListener(listener, event)
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = this.getErrorHandler();
if (errorHandler != null) {
try {
this.doInvokeListener(listener, event);
} catch (Throwable var5) {
errorHandler.handleError(var5);
}
} else {
this.doInvokeListener(listener, event); // 12
}
}
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
listener.onApplicationEvent(event); // 13
} catch (ClassCastException var6) {}
}13) 调用监听器事件回调方法
public abstract class AbstractAutoServiceRegistration implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener {
public void onApplicationEvent(WebServerInitializedEvent event) {
// TODO ..................
}
} | 留言与评论(共有 0 条评论) “” |