使用 Log4j、ActiveMQ 和 Spring 實現異步日志
我的團隊和我正在創建一個由一組RESTful JSON服務組成的服務平臺,該平臺中的每個服務在平臺中的作用就是分別提供一些獨特的功能和/或數據。由于平臺中產生的日志四散各處,所以我們想,要是能將這些日志集中化處理一下,并提供一個能夠讓我們查看、過濾、排序和搜索我們所有的日志的基本型的日常查看工具就好了。我們還想讓我們的日志是異步式的,因為我們可不想在寫日志的時候(比方說,可能會將日志直接寫入數據庫),讓我們提供的服務因為寫日志而暫時被阻擋住。
實現這個目標的策略非常簡單明了。
- 安裝ActiveMQ
- 創建一個log4j的日志追加器,將日志寫入隊列(log4j自帶了一個這樣的追加器,不過現在讓我們自己來寫一個吧。)
- 寫一個消息偵聽器,從MQ服務器上所設置的JMS隊列中讀取日志并將日志持久化 </ol>
- JMSQueue appender is a log4j appender that writes LoggingEvent to a queue.
- @author faheem / public class JMSQueueAppender extends AppenderSkeleton implements Appender{
- Logging Event Wraps a log4j LoggingEvent object. Wrapping is required by some information is lost
- when the LoggingEvent is serialized. The idea is to extract all information required from the LoggingEvent
- object, place it in the wrapper and then serialize the LoggingEventWrapper. This way all required data remains
- available to us.
- @author faheem /
下面讓我們分步來看這個策略是如何得以實現的。
安裝ActiveMQ
安裝一個外部的ActiveMQ服務器簡單極了。這個鏈接http://servicebus.blogspot.com/2011/02/installing-apache-active-mq-on-ubuntu.html是在Ubuntu上安裝ActiveMQ的一個非常棒的指南。你還可以選擇在你的應用中嵌入一個消息代理,采用Spring就可以非常輕松實現。 我們將在后文中詳談具體的實現方法。
創建一個Lo4j的JMS日志追加器
首先,我們來創建一個log4j的JMS日志追加器。log4j自帶了一個這樣的追加器(該追加器沒有將日志寫入一個隊列,而是寫給了一個話題)
import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session;import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Appender; import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; import org.apache.log4j.spi.LoggingEvent;
/**
private static Logger logger = Logger.getLogger("JMSQueueAppender");
private String brokerUri; private String queueName;
@Override public void close() {
}
@Override public boolean requiresLayout() { return false; }
@Override protected synchronized void append(LoggingEvent event) {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
this.brokerUri);
// Create a Connection
javax.jms.Connection connection = connectionFactory.createConnection();
connection.start();np
// Create a Session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue(this.queueName);
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
ObjectMessage message = session.createObjectMessage(new LoggingEventWrapper(event));
// Tell the producer to send the message
producer.send(message);
// Clean up
session.close();
connection.close();
} catch (Exception e) { e.printStackTrace(); } }
public void setBrokerUri(String brokerUri) { this.brokerUri = brokerUri; }
public String getBrokerUri() { return brokerUri; }
public void setQueueName(String queueName) { this.queueName = queueName; }
public String getQueueName() { return queueName; } }</pre>
下面讓我們看看這里面發生了什么事情。
第19行:We我們實現了的Log4J日志追加器接口,該接口要求我們實現三個方法:requiresLayout, close和append。我們將暫時簡化處理過程,實現所需的append方法。在對logger進行調用時這個方法就會被調用。
第37行: log4j將一個LoggingEvent對象作為參數對append方法進行調用,這個LoggingEvent對象表示了對logger的一次調用,它封裝了每一個日志項的所有信息。
第41和42行:將指向JMS的uri作為參數,創建一個連接工廠對象,在我們的情況下,該uri指向的是我們的ActiveMQ服務器。
第45, 46和49行: 我們同JMS服務器建立一個連接和會話。會話有多種打開模式。在Auto_Acknowledge模式的會話中,消息的應答會自動發生。Client_Acknowledge 模式下,客戶端需要對消息的接收和/或處理進行顯式地應答。另外還有兩種其它的模式。有關細節,請參考文檔http://download.oracle.com/javaee/1.4/api/javax/jms/Session.html
第52行: 創建一個隊列。將隊列的名字作為參數發送給連接
第56行: 我們將發送模式設置為Non_Persistent。另一個可選的模式是Persistent ,在這種模式下,消息會持久化到一個持久性存儲系統中。持久化模式會降低系統速度,但能增加了消息傳遞的可靠性。
第58行: 這行我們做了很多事。首先我將一個LoggingEvent對象封裝到了一個LoggingEventWrapper對象之中。這么做是因為LoggingEvent對象有一些屬性不支持序列化,另外還有一個原因是我想記錄一些額外的信息,比如IP地址和主機名。接下來,使用JMS的會話對象,我們把一個對象(LoggingEventWrapper對象)做好了發送前的準備。
第61行: 我將該對象發送到了隊列中。
下面所示是LoggingEventWrapper的代碼。
import java.io.Serializable; import java.net.InetAddress; import java.net.UnknownHostException;import org.apache.log4j.EnhancedPatternLayout; import org.apache.log4j.spi.LoggingEvent;
/**
public class LoggingEventWrapper implements Serializable{
private static final String ENHANCED_PATTERN_LAYOUT = "%throwable";
private static final long serialVersionUID = 3281981073249085474L;
private LoggingEvent loggingEvent;
private Long timeStamp;
private String level;
private String logger;
private String message;
private String detail;
private String ipAddress;
private String hostName;
public LoggingEventWrapper(LoggingEvent loggingEvent){
this.loggingEvent = loggingEvent;
//Format event and set detail field
EnhancedPatternLayout layout = new EnhancedPatternLayout();
layout.setConversionPattern(ENHANCED_PATTERN_LAYOUT);
this.detail = layout.format(this.loggingEvent);
}
public Long getTimeStamp() {
return this.loggingEvent.timeStamp;
}
public String getLevel() {
return this.loggingEvent.getLevel().toString();
}
public String getLogger() {
return this.loggingEvent.getLoggerName();
}
public String getMessage() {
return this.loggingEvent.getRenderedMessage();
}
public String getDetail() {
return this.detail;
}
public LoggingEvent getLoggingEvent() {
return loggingEvent;
}
public String getIpAddress() {
try {
return InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
return "Could not determine IP";
}
}
public String getHostName() {
try {
return InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
return "Could not determine Host Name";
}
}
}</pre>
消息偵聽器
消息偵聽器會對隊列(或話題)進行“偵聽”。一旦有新消息添加到了隊列中,onMessage 方法就會得到調用。
import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage;import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class LogQueueListener implements MessageListener { public static Logger logger = Logger.getLogger(LogQueueListener.class);
@Autowired private ILoggingService loggingService; public void onMessage( final Message message ) { if ( message instanceof ObjectMessage ) { try{ final LoggingEventWrapper loggingEventWrapper = (LoggingEventWrapper)((ObjectMessage) message).getObject(); loggingService.saveLog(loggingEventWrapper); } catch (final JMSException e) { logger.error(e.getMessage(), e); } catch (Exception e) { logger.error(e.getMessage(),e); } } }
}</pre>
第23行: 檢查從隊列中拿到的對象是否是ObjectMessage的實例
第26行: 從消息中提取出LoggingEventWrapper對象
第27行: 調用服務方法將日志持久化Spring配置
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd"> <!-- lets create an embedded ActiveMQ Broker --> <!-- uncomment the tag below only if you need to create an embedded broker --> <!-- amq:broker useJmx="false" persistent="false"> <amq:transportConnectors> <amq:transportConnector uri="tcp://localhost:61616" /> </amq:transportConnectors> </amq:broker--> <!-- ActiveMQ destinations to use --> <amq:queue id="destination" physicalName="logQueue" /> <!-- JMS ConnectionFactory to use, configuring the embedded broker using XML --> <amq:connectionFactory id="jmsFactory" brokerURL="tcp://localhost:61616" /> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <constructor-arg ref="jmsFactory" /> <property name="exceptionListener" ref="JMSExceptionListener" /> <property name="sessionCacheSize" value="100" /> </bean> <!-- Spring JMS Template --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="connectionFactory" /> </bean> <!-- listener container definition using the jms namespace, concurrency is the max number of concurrent listeners that can be started --> <jms:listener-container concurrency="10"> <jms:listener id="QueueListener" destination="logQueue" ref="logQueueListener" /> </jms:listener-container> </beans>第5到9行: 使用代理標簽建立一個嵌入式消息代理。既然我用的是外部消息代理,所以我就不需要它了。
第12行: 給出你想要連接的隊列的名字
第14行: 代理服務器的URI
第15到19行: 連接工廠的設置
第26到28行: 消息偵聽器的設置,這里可以指定用于從隊列中讀取消息的并發現線程的個數當然,上面的例子做不到讓你能夠拿來就用。你還需要包含所有的JMS依賴庫并實現完成日志持久化任務的服務。但是,我希望本文能夠為你提供一個相當不錯的思路。