用mina做業務服之間的通信,實現業務負載均衡思路

jopen 10年前發布 | 29K 次閱讀 網絡工具包 MINA

 

學習mina目的還是搭建通信架構,學完mina我們了解了如何實現客戶端和服務端,也就是一個正常channel我們是知道怎么建立的

但是問題是,我們應用環境通信分為兩種

1.前后端通信

其實這個比較好實現,提供一個mina server端,供前端語言通過socket建連接就行,這個通信就算是ok了,編解碼等通信解析的細節這里不講了

以前的游戲服務端架構業務多用短連接,聊天用長連接,聊天的部分其實就是上面表述的情況

現在是長連接的天下,聊天依舊是長連接,業務也做成長連接,實現了真正意義上的長連接游戲架構,這其實就表述了一種當下典型架構,

就是后端提供兩個開放的通信端口【即兩個mina server】,供前端的socket連接,一個負責聊天,登錄,注冊,另一個負責其他業務,這樣就實現了協議通信的負載均衡

2.后端的業務服通信【這是本文的重點】

那么后端的業務就不需要負載均衡嗎?比如job,異步更新db,活動副本等

當然也是需要的,怎么做那,先拿1中的做個解釋

mainserevr[聊天,登錄,注冊]---nodeserver[其他業務]

這兩個mina sever端已經建立起來了,但是兩個server之間還不能通信,我們有兩個選擇,要么在mainserevr上起個mina client去連nodeserver,要么在nodeserver

上起個mina client去連mainserevr,思路肯定是這樣的,一旦這個通道建立了,其實互為server和client的,會有一個iosession被通道持有,只要有這個iosession,

就可以主動write,當然對于通道的另一端可以response,也可以通過取得iosession來主動寫

實現方式,我們在nodeserevr上提供一個mainserverClient這樣一個spring的bean去連接mainserver,這樣在nodeserver上就可以向mainserevr發消息了

3.帶著這個思路設計一下

我把游戲中的業務分為

 public static final String SERVER_TYPE_NODE_STR = "nodeserver";// game node
  public static final String SERVER_TYPE_MAIN_STR = "mainserver";// 主server
  public static final String SERVER_TYPE_JOB_STR = "jobserver";// job server
  public static final String SERVER_TYPE_ASYNCDB_STR = "asyncdbserver";// 異步DB
  public static final String SERVER_TYPE_ACTIVE_STR = "activityserver";// 活動
  public static final String SERVER_TYPE_OTHER_STR = "other";// 其他
  public static final String SERVER_TYPE_GM_STR = "GM";//管理端

每次啟動一種server時,首先啟動一次mina serevr,然后啟動多個mina client去連接其他的mina server,

比如啟動nodeserevr 服務端,然后啟動多個client分別連接mainserevr,jobserevr等的服務端,這樣我就可以

在nodeserver上給其他業務serevr發請求了,具體啟動哪些client看需要

搞一個啟動server類型的方法

public static ClassPathXmlApplicationContext start(String serverTypeStr) {
        try {
    //關閉連接池的鉤子線程
  ProxoolFacade.disableShutdownHook();
    //spring 的核心配置文件
  String xmlFile = "applicationContext.xml";
  ....
  log.info("啟動 {} server................", serverTypeName);
  // 設置到系統環境變量
  System.setProperty(NodeSessionMgr.SERVER_TYPE_KEY, serverType + "");
  System.setProperty(NodeSessionMgr.SERVER_TYPE_NAME_KEY,
          serverTypeName);
  // final ClassPathXmlApplicationContext parent = new
  // ClassPathXmlApplicationContext(
  // xmlFile);
  String fileName = null;
    //這是把spring的住配置文件拆分了一部分內容出來,目前是只加載本server需要的bean
  if (serverType == NodeSessionMgr.SERVER_TYPE_NODE) {
      fileName = "wolf/app_nodeserver.xml";
  } else {
      fileName = "wolf/app_server.xml";
  }
  //手動啟動spring
  final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
          new String[] { xmlFile, fileName });
  if (context != null) {
      ServiceLocator.getInstance().setApplicationContext(context);
  }
  // 啟動socket server
  final WolfServer server = (WolfServer) ServiceLocator
          .getSpringBean("wolf_server");
  server.setServerType(serverType);
    //這個調用就是我們熟悉的啟動mina server端
  server.start();
  //這個動用做兩件事,選區需要的serevr類型建立mina client連接
  startClient(server);
    //鉤子線程用來監聽應用停止,為了做停止時的后續處理
  Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
      public void run() {
          _shutdown();
      }
  }, "shutdownHookThread"));
    //為了支持web,springMVC,內置一個web server
  if (NodeSessionMgr.SERVER_TYPE_MAIN_STR
          .equalsIgnoreCase(serverTypeStr)) {
      JettyServer jettyServer = (JettyServer) ServiceLocator
    .getSpringBean("jettyServer");
      jettyServer.start();
  }
  log.info("start {} end................", serverTypeName);
  return context;
        } catch (Exception e) {
  e.printStackTrace();
  shutdown();
        } finally {
        }
        return null;
    }

在看下startClient(server);
private static void startClient(WolfServer server) {
  // asyncdbServer只會被連接,不會主動連接其他server
    // 這部分目的是過濾那些不需要主動連比人的serevr,比武我這里的異步db,和活動服
  if (server.getServerType() == NodeSessionMgr.SERVER_TYPE_ASYNCDB
    || server.getServerType() == NodeSessionMgr.SERVER_TYPE_ACTIVE) {
      return;
  }
  // 發送game Server ip port到mainserver
  Map<String, Object> params = new HashMap<String, Object>();
  params.put("nodeServerIp", server.getIp());
  params.put("nodeServerPort", server.getPort());
  params.put("serverType", server.getServerType());
  //我需要mainserevr的client,就弄個bean在本服
  final IWolfClientService mainServerClient = (IWolfClientService) ServiceLocator
    .getSpringBean("mainServerClient");
  //這個位置其實就是mina的client連server端
  mainServerClient.init();
  Object localAddress = mainServerClient.registerNode(params);
   //同上,需要jobserevr的client
  final IWolfClientService jobServerClient = (IWolfClientService) ServiceLocator
    .getSpringBean("jobServerClient");
  if (jobServerClient != null) {
      jobServerClient.init();
      Map<String, Object> params1 = new HashMap<String, Object>();
      params1.putAll(params);
      jobServerClient.registerNode(params1);
  }
  // }

  .....
    }

再看下WolfClientService.init()

public void init() {
    if (start)
      return;
    if (wolfClient == null) {
      log.error("wolf client is null");
      return;
    }
     //mina 的client 連接 mina server
    wolfClient.start();
    if (wolfClient.isConnected())
      start = true;
  }

再看下wolfclient.start()
/**
     * 連接一個服務器,并指定處理接收到的消息的處理方法
     * 
     */
    public void start() {
  // this.context.put("resultMgr", this.resultMgr);

  logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
    .getString("WolfClient_9"), processorNum);
  logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
    .getString("WolfClient_0"), corePoolSize);
  logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
    .getString("WolfClient_4"), maxPoolSize);
  if (this.serverIp == null || this.serverIp.equals("")) {
      logger.error(clientName + "沒有配置serverIp,不啟動.........");
      return;
  }
  String threadPrefix = clientName + "[" + this.serverIp + ":"
    + this.serverPort + "]";
  // exector = Executors.newCachedThreadPool(new
  // NamingThreadFactory(threadPrefix));
  processor = new SimpleIoProcessorPool<NioSession>(NioProcessor.class,
    processorNum);
  // connector = new NioSocketConnector((Executor) exector, processor);
  connector = new NioSocketConnector(processor);
  // connector.getSessionConfig().setReuseAddress(true);
  DefaultIoFilterChainBuilder chain = connector.getFilterChain();
  if (useLogFilter == 2) {
      chain.addLast("logging", new LoggingFilter());
  }
  // codec filter要放在ExecutorFilter前,因為讀寫同一個socket connection的socket
  // buf不能并發(事實上主要是讀,寫操作mina已經封裝成一個write Queue)
  chain.addLast("codec", new ProtocolCodecFilter(codecFactory)); // 設置編碼過濾器
  // 添加心跳過濾器,客戶端只接受服務端的心跳請求,不發送心跳請求
  // connector.getSessionConfig().setReaderIdleTime(readIdleTimeOut);
  // 這里的KeepAliveFilter必須在codec之后,因為KeepAliveMessageFactoryImpl返回的是Object,如果KeepAliveMessageFactoryImpl返回的是IOBuffer,則可以在codec之前
  // KeepAliveFilter到底在ExecutorFilter之前好還是之后好,我也不確定
  KeepAliveFilter filter = new KeepAliveFilter(
    new KeepAliveMessageFactoryImpl(keepAliveRequestInterval <= 0),
    IdleStatus.READER_IDLE, new RequestTimeoutCloseHandler(),
    keepAliveRequestInterval <= 0 ? 600 : keepAliveRequestInterval,
    30);
  chain.addLast("ping", filter);
  // 添加執行線程池
  executor = new UnorderedThreadPoolExecutor(corePoolSize, maxPoolSize,
    keepAliveTime, TimeUnit.SECONDS, new NamingThreadFactory(
      threadPrefix));
  // 這里是預先啟動corePoolSize個處理線程
  executor.prestartAllCoreThreads();
  chain.addLast("exec", new ExecutorFilter(executor,
    IoEventType.EXCEPTION_CAUGHT, IoEventType.MESSAGE_RECEIVED,
    IoEventType.SESSION_CLOSED, IoEventType.SESSION_IDLE,
    IoEventType.SESSION_OPENED));
  if (useWriteThreadPool) {
      executorWrite = new UnorderedThreadPoolExecutor(corePoolSize,
        maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
        new NamingThreadFactory(threadPrefix + "write"));
      executorWrite.prestartAllCoreThreads();
      chain.addLast("execWrite", new ExecutorFilter(executorWrite,
        IoEventType.WRITE, IoEventType.MESSAGE_SENT));
  }
  // ,logger.isDebugEnabled() ? new
  // LoggingIoEventQueueHandler("execWrite") : nulls
  // 配置handler的 logger,在codec之后,打印的是decode前或者encode后的消息的log
  // 可以配置在ExecutorFilter之后:是為了在工作線程中打印log,不是在NioProcessor中打印
  if (useLogFilter == 1) {
      chain.addLast("logging", new LoggingFilter());
  }
  connector.setHandler(handler);
  connector.getSessionConfig().setReuseAddress(true);
  connector.getSessionConfig().setTcpNoDelay(tcpNoDelay);
  logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
    .getString("WolfClient_1")
    + serverIp + ":" + serverPort);
  ConnectFuture cf = null;
  long start = System.currentTimeMillis();
  while (true) {
      //這地很關鍵,是個無線循環,每10秒連接一次,直到可以和服務端建立連接,否則一支循環下去
      cf = connector.connect(serverAddress);// 建立連接
      cf.awaitUninterruptibly(10000L);
      if (!cf.isConnected()) {
    if ((System.currentTimeMillis() - start) > timeout) {
        throw new RuntimeException(
          com.youxigu.dynasty2.i18n.MarkupMessages
            .getString("WolfClient_5")
            + serverIp + ":" + serverPort);
    }
    if (cf.getException() != null) {
        logger.error(com.youxigu.dynasty2.i18n.MarkupMessages
          .getString("WolfClient_6"), serverIp + ":"
          + serverPort, cf.getException().getMessage());
    }
    try {
        Thread.sleep(10000);
    } catch (Exception e) {
    }
    continue;
      }
      //這就是終極目標了,我們的目的就是在serevr的客戶端的bean里,可以拿到這個iosession
      this.setSession(cf.getSession());
      logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
        .getString("WolfClient_10")
        + serverIp + ":" + serverPort);
      shutDown = false;
      if (handler instanceof WolfMessageChain) {
    WolfMessageChain wmc = WolfMessageChain.class.cast(handler);
    wmc.init(context);
      }
      break;
  }
    }

這樣后端的業務通信網就可以輕松的建立起來,之后想怎么通信就看你的了

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!