介绍注册中心的功能的小节,我们曾经说过,注册中心要有持久化的操作,将一些服务的审核信息放到硬盘上,这样做的原因就是因为我们所有的服务信息都是放在内存里面的,如果注册中心的实例宕掉,或者服务器因为某种原因停止的时候,这样某些服务的审核记录就无法找回,为了避免这样的问题,我们需要做的事情就是把这些服务审核信息定时刷盘,把这些信息保存到硬盘上去,然后每个注册中心服务启动的时候,去硬盘上去恢复这些信息,这样就可以规避这样的问题了
其实这个操作与RPC之间的联系不大,要解决的问题其实很简单,就是把信息,我们可以把这个信息序列化json字符串,然后根据给定的指定路径,指定的文件名,把json字符串保存的文件里面,注册中心每次启动的时候,读取文件中的字符串信息,然后序列化成对象,再保存到内存,这样就可以避免上述的问题了
好了,那么问题就变得很简单了,我们只要写一个持久化的工具类问题就大体解决了
下面的代码基本上来自于RocketMQ
package org.laopopo.common.utils; import java.io.File; import java.io.FileInputStream; import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; /** * * @author BazingaLyn * @description 持久化工具 * @time 2016年9月1日 * @modifytime */ public class PersistUtils { /** * 将json数据存到某个文件中 * @param str * @param fileName * @throws IOException */ public static final void string2File(final String str, final String fileName) throws IOException { String tmpFile = fileName + ".tmp"; string2FileNotSafe(str, tmpFile); String bakFile = fileName + ".bak"; String prevContent = file2String(fileName); if (prevContent != null) { string2FileNotSafe(prevContent, bakFile); } File file = new File(fileName); file.delete(); file = new File(tmpFile); file.renameTo(new File(fileName)); } public static final void string2FileNotSafe(final String str, final String fileName) throws IOException { File file = new File(fileName); File fileParent = file.getParentFile(); if (fileParent != null) { fileParent.mkdirs(); } FileWriter fileWriter = null; try { fileWriter = new FileWriter(file); fileWriter.write(str); } catch (IOException e) { throw e; } finally { if (fileWriter != null) { try { fileWriter.close(); } catch (IOException e) { throw e; } } } } public static final String file2String(final String fileName) { // 读取txt内容为字符串 StringBuffer txtContent = new StringBuffer(); // 每次读取的byte数 byte[] b = new byte[8 * 1024]; InputStream in = null; try { // 文件输入流 in = new FileInputStream(fileName); while (in.read(b) != -1) { // 字符串拼接 txtContent.append(new String(b)); } // 关闭流 in.close(); } catch (Exception e) { return null; } finally { if (in != null) { try { in.close(); } catch (IOException e) { } } } return txtContent.toString(); } }
其实还是比较简单的,有了这个工具类,我们再做一个定时任务,每隔一段时间去把内存中的数据刷到硬盘中:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { // 延迟60秒,每隔一段时间将一些服务信息持久化到硬盘上 try { DefaultRegistryServer.this.getProviderManager().persistServiceInfo(); } catch (Exception e) { logger.warn("schedule persist failed [{}]",e.getMessage()); } } }, 60, this.registryServerConfig.getPersistTime(), TimeUnit.SECONDS);
/** * 持久化操作 * 原则:1)首先优先从globalRegisterInfoMap中持久化到库中 * 2)如果globalRegisterInfoMap中没有信息,则从老版本中的historyRecords中的信息重新保存到硬盘中去,这样做的好处就是不需要多维护一个historyRecords这个全局变量的信息有效性 * * 这样做的原因是因为,只要有服务注册到注册中心,在注册的处理的时候,已经从历史中获取到以前审核和负载的情况,所以globalRegisterInfoMap中的信息是最新的 * 如果有些服务以前注册过,但这次重启之后没有注册,所以就需要重新将其更新一下合并记录 * @throws IOException */ public void persistServiceInfo() throws IOException { Map<String,RegistryPersistRecord> persistMap = new HashMap<String, RegistryPersistRecord>(); ConcurrentMap<String, ConcurrentMap<Address, RegisterMeta>> _globalRegisterInfoMap = this.globalRegisterInfoMap; //_stack copy ConcurrentMap<String, LoadBalanceStrategy> _globalServiceLoadBalance = this.globalServiceLoadBalance; //_stack copy ConcurrentMap<String, RegistryPersistRecord> _historyRecords = this.historyRecords; //globalRegisterInfoMap 中保存 if(_globalRegisterInfoMap.keySet() != null){ for(String serviceName : _globalRegisterInfoMap.keySet()){ RegistryPersistRecord persistRecord = new RegistryPersistRecord(); persistRecord.setServiceName(serviceName); persistRecord.setBalanceStrategy(_globalServiceLoadBalance.get(serviceName)); List<PersistProviderInfo> providerInfos = new ArrayList<PersistProviderInfo>(); ConcurrentMap<Address, RegisterMeta> serviceMap = _globalRegisterInfoMap.get(serviceName); for(Address address : serviceMap.keySet()){ PersistProviderInfo info = new PersistProviderInfo(); info.setAddress(address); info.setIsReviewed(serviceMap.get(address).getIsReviewed()); providerInfos.add(info); } persistRecord.setProviderInfos(providerInfos); persistMap.put(serviceName, persistRecord); } } if(null != _historyRecords.keySet()){ for(String serviceName :_historyRecords.keySet()){ //不包含的时候 if(!persistMap.keySet().contains(serviceName)){ persistMap.put(serviceName, _historyRecords.get(serviceName)); }else{ //负载策略不需要合并更新,需要更新的是existRecord中没有的provider的信息 List<PersistProviderInfo> providerInfos = new ArrayList<PersistProviderInfo>(); RegistryPersistRecord existRecord = persistMap.get(serviceName); providerInfos.addAll(existRecord.getProviderInfos()); //可能需要合并的信息,合并原则,如果同地址的审核策略以globalRegisterInfoMap为准,如果不同地址,则合并信息 RegistryPersistRecord possibleMergeRecord = _historyRecords.get(serviceName); List<PersistProviderInfo> possibleProviderInfos = possibleMergeRecord.getProviderInfos(); for(PersistProviderInfo eachPossibleInfo : possibleProviderInfos){ Address address = eachPossibleInfo.getAddress(); boolean exist = false; for(PersistProviderInfo existProviderInfo : providerInfos){ if(existProviderInfo.getAddress().equals(address)){ exist = true; break; } } if(!exist){ providerInfos.add(eachPossibleInfo); } } existRecord.setProviderInfos(providerInfos); persistMap.put(serviceName, existRecord); } } if(null != persistMap.values() && !persistMap.values().isEmpty()){ String jsonString = JSON.toJSONString(persistMap.values()); if(jsonString != null){ PersistUtils.string2File(jsonString, this.defaultRegistryServer.getRegistryServerConfig().getStorePathRootDir()); } } } }
每次注册中心实例启动的时候,再从硬盘上恢复:
/** * 从硬盘上恢复一些服务的审核负载算法的信息 */ private void recoverServiceInfoFromDisk() { String persistString = PersistUtils.file2String(this.registryServerConfig.getStorePathRootDir()); if (null != persistString) { List<RegistryPersistRecord> registryPersistRecords = JSON.parseArray(persistString.trim(), RegistryPersistRecord.class); if (null != registryPersistRecords) { for (RegistryPersistRecord metricsReporter : registryPersistRecords) { String serviceName = metricsReporter.getServiceName(); this.getProviderManager().getHistoryRecords().put(serviceName, metricsReporter); } } } }
基本上的代码思路就是这样了,整个注册中心需要做的事情大体是就是如此的,思路和需要完成的功能还是比较清晰的,实现起来难度不是很大,当然可能会有bug,如果有做错欢迎大家指出来,我改正,注册中心模块的简短说明到此为止了,详细的具体代码可以查看Github