不灭的焱

革命尚未成功,同志仍须努力下载JDK17

作者:Albert.Wen  添加时间:2018-11-04 19:59:04  修改时间:2024-05-17 03:54:54  分类:Java基础  编辑

介绍注册中心的功能的小节,我们曾经说过,注册中心要有持久化的操作,将一些服务的审核信息放到硬盘上,这样做的原因就是因为我们所有的服务信息都是放在内存里面的,如果注册中心的实例宕掉,或者服务器因为某种原因停止的时候,这样某些服务的审核记录就无法找回,为了避免这样的问题,我们需要做的事情就是把这些服务审核信息定时刷盘,把这些信息保存到硬盘上去,然后每个注册中心服务启动的时候,去硬盘上去恢复这些信息,这样就可以规避这样的问题了

 

其实这个操作与RPC之间的联系不大,要解决的问题其实很简单,就是把信息,我们可以把这个信息序列化json字符串,然后根据给定的指定路径,指定的文件名,把json字符串保存的文件里面,注册中心每次启动的时候,读取文件中的字符串信息,然后序列化成对象,再保存到内存,这样就可以避免上述的问题了

 

好了,那么问题就变得很简单了,我们只要写一个持久化的工具类问题就大体解决了

下面的代码基本上来自于RocketMQ

 

package org.laopopo.common.utils;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;

/**
 * 
 * @author BazingaLyn
 * @description 持久化工具
 * @time 2016年9月1日
 * @modifytime
 */
public class PersistUtils {
	
	
	/**
	 * 将json数据存到某个文件中
	 * @param str
	 * @param fileName
	 * @throws IOException
	 */
	public static final void string2File(final String str, final String fileName) throws IOException {
        String tmpFile = fileName + ".tmp";
        string2FileNotSafe(str, tmpFile);

        String bakFile = fileName + ".bak";
        String prevContent = file2String(fileName);
        if (prevContent != null) {
            string2FileNotSafe(prevContent, bakFile);
        }

        File file = new File(fileName);
        file.delete();

        file = new File(tmpFile);
        file.renameTo(new File(fileName));
    }
	
	public static final void string2FileNotSafe(final String str, final String fileName) throws IOException {
        File file = new File(fileName);
        File fileParent = file.getParentFile();
        if (fileParent != null) {
            fileParent.mkdirs();
        }
        FileWriter fileWriter = null;

        try {
            fileWriter = new FileWriter(file);
            fileWriter.write(str);
        }
        catch (IOException e) {
            throw e;
        }
        finally {
            if (fileWriter != null) {
                try {
                    fileWriter.close();
                }
                catch (IOException e) {
                    throw e;
                }
            }
        }
    }
	
	public static final String file2String(final String fileName) {
		// 读取txt内容为字符串
		StringBuffer txtContent = new StringBuffer();
		// 每次读取的byte数
		byte[] b = new byte[8 * 1024];
		InputStream in = null;
		try {
			// 文件输入流
			in = new FileInputStream(fileName);
			while (in.read(b) != -1) {
				// 字符串拼接
				txtContent.append(new String(b));
			}
			// 关闭流
			in.close();
		} catch (Exception e) {
			return null;
		} finally {
			if (in != null) {
				try {
					in.close();
				} catch (IOException e) {
				}
			}
		}
		return txtContent.toString();
	}

}

 

其实还是比较简单的,有了这个工具类,我们再做一个定时任务,每隔一段时间去把内存中的数据刷到硬盘中:

 

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

			@Override
			public void run() {
				// 延迟60秒,每隔一段时间将一些服务信息持久化到硬盘上
				try {
					DefaultRegistryServer.this.getProviderManager().persistServiceInfo();
				} catch (Exception e) {
					logger.warn("schedule persist failed [{}]",e.getMessage());
				} 
			}
}, 60, this.registryServerConfig.getPersistTime(), TimeUnit.SECONDS);

 

/**
 * 持久化操作
 * 原则:1)首先优先从globalRegisterInfoMap中持久化到库中
 *      2)如果globalRegisterInfoMap中没有信息,则从老版本中的historyRecords中的信息重新保存到硬盘中去,这样做的好处就是不需要多维护一个historyRecords这个全局变量的信息有效性
 *      
 * 这样做的原因是因为,只要有服务注册到注册中心,在注册的处理的时候,已经从历史中获取到以前审核和负载的情况,所以globalRegisterInfoMap中的信息是最新的
 * 如果有些服务以前注册过,但这次重启之后没有注册,所以就需要重新将其更新一下合并记录
 * @throws IOException
 */
public void persistServiceInfo() throws IOException {
	
	Map<String,RegistryPersistRecord> persistMap = new HashMap<String, RegistryPersistRecord>();
	ConcurrentMap<String, ConcurrentMap<Address, RegisterMeta>> _globalRegisterInfoMap = this.globalRegisterInfoMap; //_stack copy
	ConcurrentMap<String, LoadBalanceStrategy> _globalServiceLoadBalance = this.globalServiceLoadBalance; //_stack copy
	ConcurrentMap<String, RegistryPersistRecord> _historyRecords = this.historyRecords;
	
	//globalRegisterInfoMap 中保存
	if(_globalRegisterInfoMap.keySet() != null){
		
		for(String serviceName : _globalRegisterInfoMap.keySet()){
			
			RegistryPersistRecord persistRecord = new RegistryPersistRecord();
			persistRecord.setServiceName(serviceName);
			persistRecord.setBalanceStrategy(_globalServiceLoadBalance.get(serviceName));
			
			List<PersistProviderInfo> providerInfos = new ArrayList<PersistProviderInfo>();
			ConcurrentMap<Address, RegisterMeta> serviceMap = _globalRegisterInfoMap.get(serviceName);
			for(Address address : serviceMap.keySet()){
				PersistProviderInfo info = new PersistProviderInfo();
				info.setAddress(address);
				info.setIsReviewed(serviceMap.get(address).getIsReviewed());
				providerInfos.add(info);
			}
			persistRecord.setProviderInfos(providerInfos);
			persistMap.put(serviceName, persistRecord);
		}
	}
	
	
	if(null != _historyRecords.keySet()){
		
		for(String serviceName :_historyRecords.keySet()){
			
			//不包含的时候
			if(!persistMap.keySet().contains(serviceName)){
				persistMap.put(serviceName, _historyRecords.get(serviceName));
			}else{
				
				//负载策略不需要合并更新,需要更新的是existRecord中没有的provider的信息
				List<PersistProviderInfo> providerInfos = new ArrayList<PersistProviderInfo>();
				RegistryPersistRecord existRecord = persistMap.get(serviceName);
				providerInfos.addAll(existRecord.getProviderInfos());
				
				//可能需要合并的信息,合并原则,如果同地址的审核策略以globalRegisterInfoMap为准,如果不同地址,则合并信息
				RegistryPersistRecord possibleMergeRecord = _historyRecords.get(serviceName);
				List<PersistProviderInfo> possibleProviderInfos = possibleMergeRecord.getProviderInfos();
				
				for(PersistProviderInfo eachPossibleInfo : possibleProviderInfos){
					
					Address address = eachPossibleInfo.getAddress();
					
					boolean exist = false;
					for(PersistProviderInfo existProviderInfo : providerInfos){
						if(existProviderInfo.getAddress().equals(address)){
							exist = true;
							break;
						}
					}
					if(!exist){
						providerInfos.add(eachPossibleInfo);
					}
				}
				existRecord.setProviderInfos(providerInfos);
				persistMap.put(serviceName, existRecord);
			}
		}
		
		if(null != persistMap.values() && !persistMap.values().isEmpty()){
			
			String jsonString = JSON.toJSONString(persistMap.values());
			
			if(jsonString != null){
				PersistUtils.string2File(jsonString, this.defaultRegistryServer.getRegistryServerConfig().getStorePathRootDir());
			}
		}
	}
}

 

每次注册中心实例启动的时候,再从硬盘上恢复:

 

/**
 * 从硬盘上恢复一些服务的审核负载算法的信息
 */
private void recoverServiceInfoFromDisk() {
	
	String persistString = PersistUtils.file2String(this.registryServerConfig.getStorePathRootDir());
	
	if (null != persistString) {
		List<RegistryPersistRecord> registryPersistRecords = JSON.parseArray(persistString.trim(), RegistryPersistRecord.class);
		
		if (null != registryPersistRecords) {
			for (RegistryPersistRecord metricsReporter : registryPersistRecords) {
				
			     String serviceName = metricsReporter.getServiceName();
			     this.getProviderManager().getHistoryRecords().put(serviceName, metricsReporter);
			     
			}
		}
	}
	
}

 

基本上的代码思路就是这样了,整个注册中心需要做的事情大体是就是如此的,思路和需要完成的功能还是比较清晰的,实现起来难度不是很大,当然可能会有bug,如果有做错欢迎大家指出来,我改正,注册中心模块的简短说明到此为止了,详细的具体代码可以查看Github

 

 

摘自:https://blog.csdn.net/linuu/article/details/52774477