Java Web應用中自動實時檢測資源文件內容變化
在Java Web應用中,我們經常需要配置文件來定制系統行為,這些配置文件可能包括:類路徑下的文件和文件夾、非類路徑下的絕對路徑和相對路徑的文件和文件夾,在分布式環境中,還需要通過HTTP從統一集中的Web服務器中獲得配置信息,如何對這些配置信息進行自動加載并實時檢測變化呢?
Java分布式中文分詞組件 - word分詞已經實現了這個功能,我們看看是如何實現的:package org.apdplat.word.util;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
/**
* 資源變化自動檢測
* @author 楊尚川
*/
public class AutoDetector {
private static final Logger LOGGER = LoggerFactory.getLogger(AutoDetector.class);
//已經被監控的文件
private static final Set<String> fileWatchers = new HashSet<>();
private static final Set<String> httpWatchers = new HashSet<>();
private static final Map<DirectoryWatcher, String> resources = new HashMap<>();
private static final Map<DirectoryWatcher, ResourceLoader> resourceLoaders = new HashMap<>();
private static final Map<DirectoryWatcher.WatcherCallback, DirectoryWatcher> watcherCallbacks = new HashMap<>();
/**
* 加載資源并自動檢測資源變化
* 當資源發生變化的時候重新自動加載
* @param resourceLoader 資源加載邏輯
* @param resourcePaths 多個資源路徑,用逗號分隔
*/
public static void loadAndWatch(ResourceLoader resourceLoader, String resourcePaths) {
LOGGER.info("開始加載資源");
LOGGER.info(resourcePaths);
long start = System.currentTimeMillis();
List<String> result = new ArrayList<>();
for(String resource : resourcePaths.split("[,,]")){
try{
resource = resource.trim();
if(resource.startsWith("classpath:")){
//處理類路徑資源
result.addAll(loadClasspathResource(resource.replace("classpath:", ""), resourceLoader, resourcePaths));
}else if(resource.startsWith("http:")){
//處理HTTP資源
result.addAll(loadHttpResource(resource, resourceLoader));
}else{
//處理非類路徑資源
result.addAll(loadNoneClasspathResource(resource, resourceLoader, resourcePaths));
}
}catch(Exception e){
LOGGER.error("加載資源失敗:"+resource, e);
}
}
LOGGER.info("加載資源 "+result.size()+" 行");
//調用自定義加載邏輯
resourceLoader.clear();
resourceLoader.load(result);
long cost = System.currentTimeMillis() - start;
LOGGER.info("完成加載資源,耗時"+cost+" 毫秒");
}
/**
* 加載類路徑資源
* @param resource 資源名稱
* @param resourceLoader 資源自定義加載邏輯
* @param resourcePaths 資源的所有路徑,用于資源監控
* @return 資源內容
* @throws IOException
*/
private static List<String> loadClasspathResource(String resource, ResourceLoader resourceLoader, String resourcePaths) throws IOException{
List<String> result = new ArrayList<>();
LOGGER.info("類路徑資源:"+resource);
Enumeration<URL> ps = AutoDetector.class.getClassLoader().getResources(resource);
while(ps.hasMoreElements()) {
URL url=ps.nextElement();
LOGGER.info("類路徑資源URL:"+url);
if(url.getFile().contains(".jar!")){
//加載jar資源
result.addAll(load("classpath:"+resource));
continue;
}
File file=new File(url.getFile());
boolean dir = file.isDirectory();
if(dir){
//處理目錄
result.addAll(loadAndWatchDir(file.toPath(), resourceLoader, resourcePaths));
}else{
//處理文件
result.addAll(load(file.getAbsolutePath()));
//監控文件
watchFile(file, resourceLoader, resourcePaths);
}
}
return result;
}
/**
* 加載HTTP資源
* @param resource 資源URL
* @param resourceLoader 資源自定義加載邏輯
* @return 資源內容
*/
private static List<String> loadHttpResource(String resource, ResourceLoader resourceLoader) throws MalformedURLException, IOException {
List<String> result = new ArrayList<>();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(new URL(resource).openConnection().getInputStream(), "utf-8"))) {
String line = null;
while((line = reader.readLine()) != null){
line = line.trim();
if("".equals(line) || line.startsWith("#")){
continue;
}
result.add(line);
}
}
watchHttp(resource, resourceLoader);
return result;
}
private static void watchHttp(String resource, final ResourceLoader resourceLoader){
String[] attrs = resource.split("/");
final String channel = attrs[attrs.length-1];
if(httpWatchers.contains(channel)){
return;
}
httpWatchers.add(channel);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
String host = WordConfTools.get("redis.host", "localhost");
int port = WordConfTools.getInt("redis.port", 6379);
String channel_add = channel+".add";
String channel_remove = channel+".remove";
LOGGER.info("redis服務器配置信息 host:" + host + ",port:" + port + ",channels:[" + channel_add + "," + channel_remove+"]");
while(true){
try{
JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), host, port);
final Jedis jedis = jedisPool.getResource();
LOGGER.info("redis守護線程啟動");
jedis.subscribe(new HttpResourceChangeRedisListener(resourceLoader), new String[]{channel_add, channel_remove});
jedisPool.returnResource(jedis);
LOGGER.info("redis守護線程結束");
break;
}catch(Exception e){
LOGGER.info("redis未啟動,暫停一分鐘后重新連接");
try {
Thread.sleep(60000);
} catch (InterruptedException ex) {
LOGGER.error(ex.getMessage(), ex);
}
}
}
}
});
thread.setDaemon(true);
thread.setName("redis守護線程,用于動態監控資源:"+channel);
thread.start();
}
private static final class HttpResourceChangeRedisListener extends JedisPubSub {
private ResourceLoader resourceLoader;
public HttpResourceChangeRedisListener(ResourceLoader resourceLoader){
this.resourceLoader = resourceLoader;
}
@Override
public void onMessage(String channel, String message) {
LOGGER.debug("onMessage channel:" + channel + " and message:" + message);
if(channel.endsWith(".add")){
this.resourceLoader.add(message);
}else if(channel.endsWith(".remove")){
this.resourceLoader.remove(message);
}
}
@Override
public void onPMessage(String pattern, String channel, String message) {
LOGGER.debug("pattern:" + pattern + " and channel:" + channel + " and message:" + message);
onMessage(channel, message);
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
LOGGER.debug("psubscribe pattern:" + pattern + " and subscribedChannels:" + subscribedChannels);
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
LOGGER.debug("punsubscribe pattern:" + pattern + " and subscribedChannels:" + subscribedChannels);
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
LOGGER.debug("subscribe channel:" + channel + " and subscribedChannels:" + subscribedChannels);
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
LOGGER.debug("unsubscribe channel:" + channel + " and subscribedChannels:" + subscribedChannels);
}
}
/**
* 加載非類路徑資源
* @param resource 資源路徑
* @param resourceLoader 資源自定義加載邏輯
* @param resourcePaths 資源的所有路徑,用于資源監控
* @return 資源內容
* @throws IOException
*/
private static List<String> loadNoneClasspathResource(String resource, ResourceLoader resourceLoader, String resourcePaths) throws IOException {
List<String> result = new ArrayList<>();
Path path = Paths.get(resource);
boolean exist = Files.exists(path);
if(!exist){
LOGGER.error("資源不存在:"+resource);
return result;
}
boolean isDir = Files.isDirectory(path);
if(isDir){
//處理目錄
result.addAll(loadAndWatchDir(path, resourceLoader, resourcePaths));
}else{
//處理文件
result.addAll(load(resource));
//監控文件
watchFile(path.toFile(), resourceLoader, resourcePaths);
}
return result;
}
/**
* 遞歸加載目錄下面的所有資源
* 并監控目錄變化
* @param path 目錄路徑
* @param resourceLoader 資源自定義加載邏輯
* @param resourcePaths 資源的所有路徑,用于資源監控
* @return 目錄所有資源內容
*/
private static List<String> loadAndWatchDir(Path path, ResourceLoader resourceLoader, String resourcePaths) {
final List<String> result = new ArrayList<>();
try {
Files.walkFileTree(path, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException {
result.addAll(load(file.toAbsolutePath().toString()));
return FileVisitResult.CONTINUE;
}
});
} catch (IOException ex) {
LOGGER.error("加載資源失敗:"+path, ex);
}
if(fileWatchers.contains(path.toString())){
//之前已經注冊過監控服務,此次忽略
return result;
}
fileWatchers.add(path.toString());
DirectoryWatcher.WatcherCallback watcherCallback = new DirectoryWatcher.WatcherCallback(){
private long lastExecute = System.currentTimeMillis();
@Override
public void execute(WatchEvent.Kind<?> kind, String path) {
//一秒內發生的多個相同事件認定為一次,防止短時間內多次加載資源
if(System.currentTimeMillis() - lastExecute > 1000){
lastExecute = System.currentTimeMillis();
LOGGER.info("事件:"+kind.name()+" ,路徑:"+path);
synchronized(AutoDetector.class){
DirectoryWatcher dw = watcherCallbacks.get(this);
String paths = resources.get(dw);
ResourceLoader loader = resourceLoaders.get(dw);
LOGGER.info("重新加載數據");
loadAndWatch(loader, paths);
}
}
}
};
DirectoryWatcher directoryWatcher = DirectoryWatcher.getDirectoryWatcher(watcherCallback,
StandardWatchEventKinds.ENTRY_CREATE,
StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.ENTRY_DELETE);
directoryWatcher.watchDirectoryTree(path);
watcherCallbacks.put(watcherCallback, directoryWatcher);
resources.put(directoryWatcher, resourcePaths);
resourceLoaders.put(directoryWatcher, resourceLoader);
return result;
}
/**
* 加載文件資源
* @param path 文件路徑
* @return 文件內容
*/
private static List<String> load(String path) {
List<String> result = new ArrayList<>();
try{
InputStream in = null;
LOGGER.info("加載資源:"+path);
if(path.startsWith("classpath:")){
in = AutoDetector.class.getClassLoader().getResourceAsStream(path.replace("classpath:", ""));
}else{
in = new FileInputStream(path);
}
try(BufferedReader reader = new BufferedReader(new InputStreamReader(in,"utf-8"))){
String line;
while((line = reader.readLine()) != null){
line = line.trim();
if("".equals(line) || line.startsWith("#")){
continue;
}
result.add(line);
}
}
}catch(Exception e){
LOGGER.error("加載資源失敗:"+path, e);
}
return result;
}
/**
* 監控文件變化
* @param file 文件
*/
private static void watchFile(final File file, ResourceLoader resourceLoader, String resourcePaths) {
if(fileWatchers.contains(file.toString())){
//之前已經注冊過監控服務,此次忽略
return;
}
fileWatchers.add(file.toString());
LOGGER.info("監控文件:"+file.toString());
DirectoryWatcher.WatcherCallback watcherCallback = new DirectoryWatcher.WatcherCallback(){
private long lastExecute = System.currentTimeMillis();
@Override
public void execute(WatchEvent.Kind<?> kind, String path) {
if(System.currentTimeMillis() - lastExecute > 1000){
lastExecute = System.currentTimeMillis();
if(!path.equals(file.toString())){
return;
}
LOGGER.info("事件:"+kind.name()+" ,路徑:"+path);
synchronized(AutoDetector.class){
DirectoryWatcher dw = watcherCallbacks.get(this);
String paths = resources.get(dw);
ResourceLoader loader = resourceLoaders.get(dw);
LOGGER.info("重新加載數據");
loadAndWatch(loader, paths);
}
}
}
};
DirectoryWatcher fileWatcher = DirectoryWatcher.getDirectoryWatcher(watcherCallback,
StandardWatchEventKinds.ENTRY_MODIFY,
StandardWatchEventKinds.ENTRY_DELETE);
fileWatcher.watchDirectory(file.getParent());
watcherCallbacks.put(watcherCallback, fileWatcher);
resources.put(fileWatcher, resourcePaths);
resourceLoaders.put(fileWatcher, resourceLoader);
}
public static void main(String[] args){
AutoDetector.loadAndWatch(new ResourceLoader(){
@Override
public void clear() {
System.out.println("清空資源");
}
@Override
public void load(List<String> lines) {
for(String line : lines){
System.out.println(line);
}
}
@Override
public void add(String line) {
System.out.println("add:"+line);
}
@Override
public void remove(String line) {
System.out.println("remove:"+line);
}
}, "d:/DIC, d:/DIC2, d:/dic.txt, classpath:dic2.txt,classpath:dic");
}
}
代碼地址