用mina做業務服之間的通信,實現業務負載均衡思路
學習mina目的還是搭建通信架構,學完mina我們了解了如何實現客戶端和服務端,也就是一個正常channel我們是知道怎么建立的
但是問題是,我們應用環境通信分為兩種
1.前后端通信
其實這個比較好實現,提供一個mina server端,供前端語言通過socket建連接就行,這個通信就算是ok了,編解碼等通信解析的細節這里不講了
以前的游戲服務端架構業務多用短連接,聊天用長連接,聊天的部分其實就是上面表述的情況
現在是長連接的天下,聊天依舊是長連接,業務也做成長連接,實現了真正意義上的長連接游戲架構,這其實就表述了一種當下典型架構,
就是后端提供兩個開放的通信端口【即兩個mina server】,供前端的socket連接,一個負責聊天,登錄,注冊,另一個負責其他業務,這樣就實現了協議通信的負載均衡
2.后端的業務服通信【這是本文的重點】
那么后端的業務就不需要負載均衡嗎?比如job,異步更新db,活動副本等
當然也是需要的,怎么做那,先拿1中的做個解釋
mainserevr[聊天,登錄,注冊]---nodeserver[其他業務]
這兩個mina sever端已經建立起來了,但是兩個server之間還不能通信,我們有兩個選擇,要么在mainserevr上起個mina client去連nodeserver,要么在nodeserver
上起個mina client去連mainserevr,思路肯定是這樣的,一旦這個通道建立了,其實互為server和client的,會有一個iosession被通道持有,只要有這個iosession,
就可以主動write,當然對于通道的另一端可以response,也可以通過取得iosession來主動寫
實現方式,我們在nodeserevr上提供一個mainserverClient這樣一個spring的bean去連接mainserver,這樣在nodeserver上就可以向mainserevr發消息了
3.帶著這個思路設計一下
我把游戲中的業務分為
public static final String SERVER_TYPE_NODE_STR = "nodeserver";// game node public static final String SERVER_TYPE_MAIN_STR = "mainserver";// 主server public static final String SERVER_TYPE_JOB_STR = "jobserver";// job server public static final String SERVER_TYPE_ASYNCDB_STR = "asyncdbserver";// 異步DB public static final String SERVER_TYPE_ACTIVE_STR = "activityserver";// 活動 public static final String SERVER_TYPE_OTHER_STR = "other";// 其他 public static final String SERVER_TYPE_GM_STR = "GM";//管理端
每次啟動一種server時,首先啟動一次mina serevr,然后啟動多個mina client去連接其他的mina server,
比如啟動nodeserevr 服務端,然后啟動多個client分別連接mainserevr,jobserevr等的服務端,這樣我就可以
在nodeserver上給其他業務serevr發請求了,具體啟動哪些client看需要
搞一個啟動server類型的方法
public static ClassPathXmlApplicationContext start(String serverTypeStr) {
try {
//關閉連接池的鉤子線程
ProxoolFacade.disableShutdownHook();
//spring 的核心配置文件
String xmlFile = "applicationContext.xml";
....
log.info("啟動 {} server................", serverTypeName);
// 設置到系統環境變量
System.setProperty(NodeSessionMgr.SERVER_TYPE_KEY, serverType + "");
System.setProperty(NodeSessionMgr.SERVER_TYPE_NAME_KEY,
serverTypeName);
// final ClassPathXmlApplicationContext parent = new
// ClassPathXmlApplicationContext(
// xmlFile);
String fileName = null;
//這是把spring的住配置文件拆分了一部分內容出來,目前是只加載本server需要的bean
if (serverType == NodeSessionMgr.SERVER_TYPE_NODE) {
fileName = "wolf/app_nodeserver.xml";
} else {
fileName = "wolf/app_server.xml";
}
//手動啟動spring
final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { xmlFile, fileName });
if (context != null) {
ServiceLocator.getInstance().setApplicationContext(context);
}
// 啟動socket server
final WolfServer server = (WolfServer) ServiceLocator
.getSpringBean("wolf_server");
server.setServerType(serverType);
//這個調用就是我們熟悉的啟動mina server端
server.start();
//這個動用做兩件事,選區需要的serevr類型建立mina client連接
startClient(server);
//鉤子線程用來監聽應用停止,為了做停止時的后續處理
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
_shutdown();
}
}, "shutdownHookThread"));
//為了支持web,springMVC,內置一個web server
if (NodeSessionMgr.SERVER_TYPE_MAIN_STR
.equalsIgnoreCase(serverTypeStr)) {
JettyServer jettyServer = (JettyServer) ServiceLocator
.getSpringBean("jettyServer");
jettyServer.start();
}
log.info("start {} end................", serverTypeName);
return context;
} catch (Exception e) {
e.printStackTrace();
shutdown();
} finally {
}
return null;
}在看下startClient(server);
private static void startClient(WolfServer server) {
// asyncdbServer只會被連接,不會主動連接其他server
// 這部分目的是過濾那些不需要主動連比人的serevr,比武我這里的異步db,和活動服
if (server.getServerType() == NodeSessionMgr.SERVER_TYPE_ASYNCDB
|| server.getServerType() == NodeSessionMgr.SERVER_TYPE_ACTIVE) {
return;
}
// 發送game Server ip port到mainserver
Map<String, Object> params = new HashMap<String, Object>();
params.put("nodeServerIp", server.getIp());
params.put("nodeServerPort", server.getPort());
params.put("serverType", server.getServerType());
//我需要mainserevr的client,就弄個bean在本服
final IWolfClientService mainServerClient = (IWolfClientService) ServiceLocator
.getSpringBean("mainServerClient");
//這個位置其實就是mina的client連server端
mainServerClient.init();
Object localAddress = mainServerClient.registerNode(params);
//同上,需要jobserevr的client
final IWolfClientService jobServerClient = (IWolfClientService) ServiceLocator
.getSpringBean("jobServerClient");
if (jobServerClient != null) {
jobServerClient.init();
Map<String, Object> params1 = new HashMap<String, Object>();
params1.putAll(params);
jobServerClient.registerNode(params1);
}
// }
.....
}再看下WolfClientService.init()
public void init() {
if (start)
return;
if (wolfClient == null) {
log.error("wolf client is null");
return;
}
//mina 的client 連接 mina server
wolfClient.start();
if (wolfClient.isConnected())
start = true;
}再看下wolfclient.start()
/**
* 連接一個服務器,并指定處理接收到的消息的處理方法
*
*/
public void start() {
// this.context.put("resultMgr", this.resultMgr);
logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
.getString("WolfClient_9"), processorNum);
logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
.getString("WolfClient_0"), corePoolSize);
logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
.getString("WolfClient_4"), maxPoolSize);
if (this.serverIp == null || this.serverIp.equals("")) {
logger.error(clientName + "沒有配置serverIp,不啟動.........");
return;
}
String threadPrefix = clientName + "[" + this.serverIp + ":"
+ this.serverPort + "]";
// exector = Executors.newCachedThreadPool(new
// NamingThreadFactory(threadPrefix));
processor = new SimpleIoProcessorPool<NioSession>(NioProcessor.class,
processorNum);
// connector = new NioSocketConnector((Executor) exector, processor);
connector = new NioSocketConnector(processor);
// connector.getSessionConfig().setReuseAddress(true);
DefaultIoFilterChainBuilder chain = connector.getFilterChain();
if (useLogFilter == 2) {
chain.addLast("logging", new LoggingFilter());
}
// codec filter要放在ExecutorFilter前,因為讀寫同一個socket connection的socket
// buf不能并發(事實上主要是讀,寫操作mina已經封裝成一個write Queue)
chain.addLast("codec", new ProtocolCodecFilter(codecFactory)); // 設置編碼過濾器
// 添加心跳過濾器,客戶端只接受服務端的心跳請求,不發送心跳請求
// connector.getSessionConfig().setReaderIdleTime(readIdleTimeOut);
// 這里的KeepAliveFilter必須在codec之后,因為KeepAliveMessageFactoryImpl返回的是Object,如果KeepAliveMessageFactoryImpl返回的是IOBuffer,則可以在codec之前
// KeepAliveFilter到底在ExecutorFilter之前好還是之后好,我也不確定
KeepAliveFilter filter = new KeepAliveFilter(
new KeepAliveMessageFactoryImpl(keepAliveRequestInterval <= 0),
IdleStatus.READER_IDLE, new RequestTimeoutCloseHandler(),
keepAliveRequestInterval <= 0 ? 600 : keepAliveRequestInterval,
30);
chain.addLast("ping", filter);
// 添加執行線程池
executor = new UnorderedThreadPoolExecutor(corePoolSize, maxPoolSize,
keepAliveTime, TimeUnit.SECONDS, new NamingThreadFactory(
threadPrefix));
// 這里是預先啟動corePoolSize個處理線程
executor.prestartAllCoreThreads();
chain.addLast("exec", new ExecutorFilter(executor,
IoEventType.EXCEPTION_CAUGHT, IoEventType.MESSAGE_RECEIVED,
IoEventType.SESSION_CLOSED, IoEventType.SESSION_IDLE,
IoEventType.SESSION_OPENED));
if (useWriteThreadPool) {
executorWrite = new UnorderedThreadPoolExecutor(corePoolSize,
maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
new NamingThreadFactory(threadPrefix + "write"));
executorWrite.prestartAllCoreThreads();
chain.addLast("execWrite", new ExecutorFilter(executorWrite,
IoEventType.WRITE, IoEventType.MESSAGE_SENT));
}
// ,logger.isDebugEnabled() ? new
// LoggingIoEventQueueHandler("execWrite") : nulls
// 配置handler的 logger,在codec之后,打印的是decode前或者encode后的消息的log
// 可以配置在ExecutorFilter之后:是為了在工作線程中打印log,不是在NioProcessor中打印
if (useLogFilter == 1) {
chain.addLast("logging", new LoggingFilter());
}
connector.setHandler(handler);
connector.getSessionConfig().setReuseAddress(true);
connector.getSessionConfig().setTcpNoDelay(tcpNoDelay);
logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
.getString("WolfClient_1")
+ serverIp + ":" + serverPort);
ConnectFuture cf = null;
long start = System.currentTimeMillis();
while (true) {
//這地很關鍵,是個無線循環,每10秒連接一次,直到可以和服務端建立連接,否則一支循環下去
cf = connector.connect(serverAddress);// 建立連接
cf.awaitUninterruptibly(10000L);
if (!cf.isConnected()) {
if ((System.currentTimeMillis() - start) > timeout) {
throw new RuntimeException(
com.youxigu.dynasty2.i18n.MarkupMessages
.getString("WolfClient_5")
+ serverIp + ":" + serverPort);
}
if (cf.getException() != null) {
logger.error(com.youxigu.dynasty2.i18n.MarkupMessages
.getString("WolfClient_6"), serverIp + ":"
+ serverPort, cf.getException().getMessage());
}
try {
Thread.sleep(10000);
} catch (Exception e) {
}
continue;
}
//這就是終極目標了,我們的目的就是在serevr的客戶端的bean里,可以拿到這個iosession
this.setSession(cf.getSession());
logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
.getString("WolfClient_10")
+ serverIp + ":" + serverPort);
shutDown = false;
if (handler instanceof WolfMessageChain) {
WolfMessageChain wmc = WolfMessageChain.class.cast(handler);
wmc.init(context);
}
break;
}
}這樣后端的業務通信網就可以輕松的建立起來,之后想怎么通信就看你的了