文章出處

說明
 
本文源碼基于Openfire4.0.2。
 
Openfire的啟動
 
    Openfire的啟動過程非常的簡單,通過一個入口初始化lib目錄下的openfire.jar包,并啟動一個XMPPServer實例。
 
下面就是ServerStarter.start方法的代碼片斷:
 
Class containerClass = loader.loadClass("org.jivesoftware.openfire.XMPPServer");
containerClass.newInstance();
這樣一個openfire實例就已經啟動了。
 
XMPPServer類
 
這個XmppServer類是單實例的對象,這樣在服務器調用時可以獲取一個實例。既然是個對象就會有構造的過程,XMPPServer在構造過程中會對服務進行初始化,這個過程包括:
  • 初始化配置參數
  • 檢查是否需要安裝
  • 初始化Module
  • 啟動統計模塊
  • 啟動plugin
 
基本就是這么簡單,還是非常簡潔明了。這里也可以大概知道在openfire里主要是module和plugin兩類模塊,一般情況下內部的模塊都用module,對于一些功能的擴展或者第三方的開發擴展使用Plugin。官方其實也會自己寫一個插件來擴展功能,說明插件還是比較靈活的。
 
提一提Module的加載過程
 
下面代碼是module的加載過程
if (!setupMode) {
    verifyDataSource();
    // First load all the modules so that modules may access other modules while
    // being initialized
    loadModules();
    // Initize all the modules
    initModules();
    // Start all the modules
    startModules();
}
可以看到,分了三個步驟:
加載模塊:是對模塊類的實例化過程,就是創建對象
初始化模塊:就是調用module.initialize(this);,其實就是調用模塊的初始化方法
啟動模塊:module.start();,同理就是調用啟動模塊
 
這是因為openfire規范了module的接口抽象Module,所有的模塊都要按照這個規范實現,看代碼:
public interface Module {
 
    /**
     * Returns the name of the module for display in administration interfaces.
     *
     * @return The name of the module.
     */
    String getName();
 
    /**
     * Initialize the module with the container.
     * Modules may be initialized and never started, so modules
     * should be prepared for a call to destroy() to follow initialize().
     *
     * @param server the server hosting this module.
     */
    void initialize(XMPPServer server);
 
    /**
     * Start the module (must return quickly). Any long running
     * operations should spawn a thread and allow the method to return
     * immediately.
     */
    void start();
 
    /**
     * Stop the module. The module should attempt to free up threads
     * and prepare for either another call to initialize (reconfigure the module)
     * or for destruction.
     */
    void stop();
 
    /**
     * Module should free all resources and prepare for deallocation.
     */
    void destroy();
}
這也標示了Module的生命周期,Openfire會管理這些Module的生命周期,以此來保證各個模塊的啟動與釋放。
 
Connection管理模塊
 
整個啟動過程有點奇怪,并沒有看到Openfire是如何監聽端口的,如果不監聽如何獲利客戶端連接呢?因為Openfire只通過Module來管理的,那么對應的網絡管理應該就在Module中。于是在XMPPServer.loadModules方法中看到下面的代碼:
// Load this module always last since we don't want to start listening for clients
// before the rest of the modules have been started
loadModule(ConnectionManagerImpl.class.getName());
 
ConnectionManagerImpl就是連接的管理模塊,這里有個注釋,就是在其他模塊啟動后之后再啟動監聽模塊。
 
在ConnectionManagerImpl中管理了主要的連接,都是以ConnectionListener的來管理,這個類用于包裝連接。我的理解就是一個連接抽象吧,這樣對于代碼來說寫起來比較統一。看下面代碼中Manager管理著哪些:
  private final ConnectionListener clientListener;
    private final ConnectionListener clientSslListener;
    private final ConnectionListener boshListener;
    private final ConnectionListener boshSslListener;
    private final ConnectionListener serverListener;
    private final ConnectionListener componentListener;
    private final ConnectionListener componentSslListener;
    private final ConnectionListener connectionManagerListener; // Also known as 'multiplexer'
    private final ConnectionListener connectionManagerSslListener; // Also known as 'multiplexer'
    private final ConnectionListener webAdminListener;
    private final ConnectionListener webAdminSslListener;
這里面除了server只有一個外,其他的都是兩個,其中一個是SSL的。它們主要是什么鏈接?
  • client:表示客戶端連接
  • bosh:就是HTTP綁定的連接
  • server:服務器到服務器的socket連接
  • component:組件到服務器的連接
  • connectionManager:是指通過connectionManager連接器過來的連接
  • webAdmin:是指web控制臺的連接
 
這里面bosh和webAdmin使用的是http協議,所以連接并不是長連接,其他的都是socket。
 
openfire里使用了Mina來實現socket網絡處理。只不過看代碼中對于S2S類型的連接使用的不是mina,如下代碼:
if ( getType() == ConnectionType.SOCKET_S2S )
{
    connectionAcceptor = new LegacyConnectionAcceptor( generateConnectionConfiguration() );
}
else
{
    connectionAcceptor = new MINAConnectionAcceptor( generateConnectionConfiguration() );
}
LegacyConnectionAcceptor是個廢棄的類,但不知道為什么s2s還要用這個呢?看了看實現,LegacyConnectionAcceptor就是起了一個線程,在線程里建了一個ServerSocket。可能以后還是會遷移這部分代碼吧。
 
在connectionAcceptor中會根據類型創建一個ConnectionHandler用于實現具體的業務功能,而ConnectionHandler都是基于org.apache.mina.core.service.IoHandlerAdapter派生的類,而IoHandlerAdapter又是IoHandler的適配接口,所以實質上就是IoHandler。下面是類繼承關系:
 
在這些Handler里完成的主要是每個連接打開、關閉和數據收發等操作的處理。而其中比較關鍵的一個步驟就是在sessionOpened中設置了StanzeHandler,而每種ConnectionHandler都有自己的StanzeHandler實現。以ClientConnectionHandler為例子,其中ClientConnectionHandler復寫了父類的createStanzaHandler方法,這里面
@Override
    StanzaHandler createStanzaHandler(NIOConnection connection) {
        return new ClientStanzaHandler(XMPPServer.getInstance().getPacketRouter(), connection);
    }
這里使用的是clientStanzaHandler,表示是客戶端的數據節處理者。而最終的createStanzaHandler調用是在父類ConnectionHandler的sessionOpened完成的,
@Override
public void sessionOpened(IoSession session) throws Exception {
    // Create a new XML parser for the new connection. The parser will be used by the XMPPDecoder filter.
    final XMLLightweightParser parser = new XMLLightweightParser(StandardCharsets.UTF_8);
    session.setAttribute(XML_PARSER, parser);
    // Create a new NIOConnection for the new session
    final NIOConnection connection = createNIOConnection(session);
    session.setAttribute(CONNECTION, connection);
    session.setAttribute(HANDLER, createStanzaHandler(connection));
    // Set the max time a connection can be idle before closing it. This amount of seconds
    // is divided in two, as Openfire will ping idle clients first (at 50% of the max idle time)
    // before disconnecting them (at 100% of the max idle time). This prevents Openfire from
    // removing connections without warning.
    final int idleTime = getMaxIdleTime() / 2;
    if (idleTime > 0) {
        session.getConfig().setIdleTime(IdleStatus.READER_IDLE, idleTime);
    }
}

這樣每一個session在打開時都會設置handler,而具體的handler由各個派生類創建返回。這里的StanzHandler就是Openfire里的數據包處理單元。和connection類型一樣,包處理也是對應的幾個類:

 
 
注:
關于openfire與mina的關系可以看看下面的文章,但是版本相對比較老些,代碼有些不同,只不過思路差不多:
 
Session模塊
 
對于Openfire來說一個比較重要的功能就是管理session,因為要與客戶端實時的進行數據通訊,所以必須保持著連接。在Openfire中對于Session的管理都集中在SessionManager模塊。但在前面說到連接管理時已經知道了IoSession的創建過程,但并沒有看到openfire是如何管理它的。接著ConnectionHandler和StanzaHandler就能知道其中有奧秘。
 
前面知道了ConnectionHandler是連接的處理者,這里會有連接的創建、關閉、數據收發的處理,回到ConnectionHandler這個抽象類中。對于創建時(sessionOpend)主要是創建了StanzaHandler,這樣就把數據包的處理委托給了StzanzHandler(派生類)。但是這個時候并沒有將session放入到openfire的session管理模塊中,而是在客戶端發送數據過來后才開始的。
 
先看看ConnectionHandler的messageReceived方法:
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
    // Get the stanza handler for this session
    StanzaHandler handler = (StanzaHandler) session.getAttribute(HANDLER);
    // Get the parser to use to process stanza. For optimization there is going
    // to be a parser for each running thread. Each Filter will be executed
    // by the Executor placed as the first Filter. So we can have a parser associated
    // to each Thread
    final XMPPPacketReader parser = PARSER_CACHE.get();
    // Update counter of read btyes
    updateReadBytesCounter(session);
    //System.out.println("RCVD: " + message);
    // Let the stanza handler process the received stanza
    try {
        handler.process((String) message, parser);
    } catch (Exception e) {
        Log.error("Closing connection due to error while processing message: " + message, e);
        final Connection connection = (Connection) session.getAttribute(CONNECTION);
        if ( connection != null ) {
            connection.close();
        }
 
    }
}

在接收到數據包后獲取到StanzaHandler,然后調用了它的process方法,也就是讓實際的包處理者去處理數據。這樣就回到了StanzeHanler,以ClientStanzaHandler為例子。只不過這個派生類中沒有重寫process方法,也就是說要看父類的實現:

public void process(String stanza, XMPPPacketReader reader) throws Exception {
 
    boolean initialStream = stanza.startsWith("<stream:stream") || stanza.startsWith("<flash:stream");
    if (!sessionCreated || initialStream) {
        if (!initialStream) {
..........
        // Found an stream:stream tag...
        if (!sessionCreated) {
            sessionCreated = true;
            MXParser parser = reader.getXPPParser();
            parser.setInput(new StringReader(stanza));
            createSession(parser);
        }
..........
        return;
    }
..........
}

由于代碼較多,我省略了一些代碼。看到這應該明白了吧,對于當前的連接沒有創建Openfire的session對象時,會進行創建過程createSession,對于不同的StanzeHandler會有些不一樣,這里ClientStanzaHandler的實現就是把創建好的session放到本地的LocalClientSession中:

@Override
boolean createSession(String namespace, String serverName, XmlPullParser xpp, Connection connection)
        throws XmlPullParserException {
    if ("jabber:client".equals(namespace)) {
        // The connected client is a regular client so create a ClientSession
        session = LocalClientSession.createSession(serverName, xpp, connection);
        return true;
    }
    return false;
}
到這一個session算是建立完成了。
 
集群下的session
 
之前一篇關于《Openfire集群源碼分析》提到了session的一些內容。其中也提到了session是不會向每一臺服務器進行同步復制的,這就有一個問題,如果A用戶先是連接了服務器1,但是接下來的操作又到服務器2,這不就會造成session無法找到嗎?同樣的問題,如果想要獲取到當前所有的client session怎么辦?
 
1、如何在集群中發消息
對于消息最終還是通過session來發送的,前后代碼太多,就直接看一下sessionManager中的getSession方法吧。
public ClientSession getSession(JID from) {
    // Return null if the JID is null or belongs to a foreign server. If the server is
    // shutting down then serverName will be null so answer null too in this case.
    if (from == null || serverName == null || !serverName.equals(from.getDomain())) {
        return null;
    }
 
    // Initially Check preAuthenticated Sessions
    if (from.getResource() != null) {
        ClientSession session = localSessionManager.getPreAuthenticatedSessions().get(from.getResource());
        if (session != null) {
            return session;
        }
    }
 
    if (from.getResource() == null || from.getNode() == null) {
        return null;
    }
 
    return routingTable.getClientRoute(from);
}


先是獲取本地的session,如果能找到直接返回,找不到則跳到routingTable里獲取客戶端的路由信息。

@Override
public ClientSession getClientRoute(JID jid) {
    // Check if this session is hosted by this cluster node
    ClientSession session = (ClientSession) localRoutingTable.getRoute(jid.toString());
    if (session == null) {
        // The session is not in this JVM so assume remote
        RemoteSessionLocator locator = server.getRemoteSessionLocator();
        if (locator != null) {
            // Check if the session is hosted by other cluster node
            ClientRoute route = usersCache.get(jid.toString());
            if (route == null) {
                route = anonymousUsersCache.get(jid.toString());
            }
            if (route != null) {
                session = locator.getClientSession(route.getNodeID().toByteArray(), jid);
            }
        }
    }
    return session;
}

這里更直接的可以看到,查找本地路由不null則會通過RemoteSessionLocator來完成。當然這里最大的奧秘其實是usersCache和anonymousUsersCache這兩個cache。之前寫的集群源碼分析中提過,最終openfire集群后會對緩存進行同步,這樣每臺服務器上都會有緩存的副本。所以usersCache是擁有所有用戶信息的,有了user的信息就有了jid的信息,這樣不管是哪臺服務器都可以對數據包處理并發送給客戶端。

 
這里的RemoteSessionLocator是由于適配不同的集群組件所抽象的接口,使得加入不同集群組件提供了透明處理。
 
2、如何獲利所有的在線用戶
 
對于獲取所有在線用戶這個功能思路也挺簡單,一樣是找本地所有的緩存。看getSessions的代碼:
public Collection<ClientSession> getSessions() {
        return routingTable.getClientsRoutes(false);
    }

其實就是訪問路由表,因為路由表里有所有的cache,和獲取單個的session不一樣,需要對所有的路由都遍歷返回。

@Override
public Collection<ClientSession> getClientsRoutes(boolean onlyLocal) {
    // Add sessions hosted by this cluster node
    Collection<ClientSession> sessions = new ArrayList<ClientSession>(localRoutingTable.getClientRoutes());
    if (!onlyLocal) {
        // Add sessions not hosted by this JVM
        RemoteSessionLocator locator = server.getRemoteSessionLocator();
        if (locator != null) {
            // Add sessions of non-anonymous users hosted by other cluster nodes
            for (Map.Entry<String, ClientRoute> entry : usersCache.entrySet()) {
                ClientRoute route = entry.getValue();
                if (!server.getNodeID().equals(route.getNodeID())) {
                    sessions.add(locator.getClientSession(route.getNodeID().toByteArray(), new JID(entry.getKey())));
                }
            }
            // Add sessions of anonymous users hosted by other cluster nodes
            for (Map.Entry<String, ClientRoute> entry : anonymousUsersCache.entrySet()) {
                ClientRoute route = entry.getValue();
                if (!server.getNodeID().equals(route.getNodeID())) {
                    sessions.add(locator.getClientSession(route.getNodeID().toByteArray(), new JID(entry.getKey())));
                }
            }
        }
    }
    return sessions;
}

 

 
總結
對于查看Openfire的源代碼學習了一些東西,特別是對于服務化系統的開發思路。而且在集群化上也有了一些認識,知道了多機部署后系統應該要解決哪些問題。
繼續學習吧。
 
注:此文章為原創,歡迎轉載,請在文章頁面明顯位置給出此文鏈接!
若您覺得這篇文章還不錯請點擊下右下角的推薦,非常感謝!
http://www.cnblogs.com/5207

文章列表


不含病毒。www.avast.com
arrow
arrow
    全站熱搜
    創作者介紹
    創作者 AutoPoster 的頭像
    AutoPoster

    互聯網 - 大數據

    AutoPoster 發表在 痞客邦 留言(0) 人氣()