如何使用Spring開發和監控線程池服務
線程池對執行同步或異步的任務很重要。本文展示如何利用Spring開發并監控線程池服務。創建線程池的其他兩種方法已講解過。
使用技術
- JDK 1.6.0_21
- Spring 3.0.5
- Maven 3.0.2 </ul>
第1步:創建Maven工程
下面是一個maven工程。(可以使用Maven或IDE的插件創建)。
第2步:添加依賴庫
將Spring的依賴添加到Maven的pom.xml文件中。
<!-- Spring 3 dependencies --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency>
使用下面的插件創建可執行jar包。
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.3.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.otv.exe.Application</mainClass> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.handlers</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>META-INF/spring.schemas</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin>
第3步:創建任務類
創建一個實現Runnable接口的新TestTask類。這個類表示要執行的任務。
package com.otv.task;import org.apache.log4j.Logger;
/**
- @author onlinetechvision.com
- @since 17 Oct 2011
@version 1.0.0 / public class TestTask implements Runnable {
private static Logger log = Logger.getLogger(TestTask.class); String taskName;
public TestTask() { }
public TestTask(String taskName) {
this.taskName = taskName;
}
public void run() {
try { log.debug(this.taskName + " : is started."); Thread.sleep(10000); log.debug(this.taskName + " : is completed."); } catch (InterruptedException e) { log.error(this.taskName + " : is not completed!"); e.printStackTrace(); }
}
@Override public String toString() {
return (getTaskName());
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
} }</pre>
第4步:創建TestRejectedExecutionHandler類
TestRejectedExecutionHandler類實現了RejectedExecutionHandler接口。如果沒有空閑線程并且隊列超出限制,任務會被拒絕。這個類處理被拒絕的任務。
package com.otv.handler;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor;
import org.apache.log4j.Logger;
/**
- @author onlinetechvision.com
- @since 17 Oct 2011
@version 1.0.0 / public class TestRejectedExecutionHandler implements RejectedExecutionHandler {
private static Logger log = Logger.getLogger(TestRejectedExecutionHandler.class);
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
log.debug(runnable.toString() + " : has been rejected");
} }</pre>
第5步:創建ITestThreadPoolExecutorService接口
創建ITestThreadPoolExecutorService接口。(譯者注:這個接口的主要功能是通過設置的參數創建一個線程池)
package com.otv.srv;
import java.util.concurrent.ThreadPoolExecutor;
import com.otv.handler.TestRejectedExecutionHandler;
/**
- @author onlinetechvision.com
- @since 17 Oct 2011
- @version 1.0.0
*/ public interface ITestThreadPoolExecutorService {
public ThreadPoolExecutor createNewThreadPool();
public int getCorePoolSize();
public void setCorePoolSize(int corePoolSize);
public int getMaxPoolSize();
public void setMaxPoolSize(int maximumPoolSize);
public long getKeepAliveTime();
public void setKeepAliveTime(long keepAliveTime);
public int getQueueCapacity();
public void setQueueCapacity(int queueCapacity);
public TestRejectedExecutionHandler getTestRejectedExecutionHandler();
public void setTestRejectedExecutionHandler(TestRejectedExecutionHandler testRejectedExecutionHandler);
}</pre>
第6步:創建TestThreadPoolExecutorService類
TestThreadPoolExecutorService類實現了ITestThreadPoolExecutorService接口(上一步創建的接口)。這個類可以創建一個新的線程池。
package com.otv.srv;import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;
import com.otv.handler.TestRejectedExecutionHandler;
/**
- @author onlinetechvision.com
- @since 17 Oct 2011
@version 1.0.0 / public class TestThreadPoolExecutorService implements ITestThreadPoolExecutorService {
private int corePoolSize; private int maxPoolSize; private long keepAliveTime; private int queueCapacity; TestRejectedExecutionHandler testRejectedExecutionHandler;
public ThreadPoolExecutor createNewThreadPool() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(getCorePoolSize(), getMaxPoolSize(), getKeepAliveTime(), TimeUnit.SECONDS, new ArrayBlockingQueue(getQueueCapacity()), getTestRejectedExecutionHandler()); return executor;
}
public int getCorePoolSize() {
return corePoolSize;
}
public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}
public int getMaxPoolSize() {
return maxPoolSize;
}
public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}
public long getKeepAliveTime() {
return keepAliveTime;
}
public void setKeepAliveTime(long keepAliveTime) {
this.keepAliveTime = keepAliveTime;
}
public int getQueueCapacity() {
return queueCapacity;
}
public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}
public TestRejectedExecutionHandler getTestRejectedExecutionHandler() {
return testRejectedExecutionHandler;
}
public void setTestRejectedExecutionHandler(TestRejectedExecutionHandler testRejectedExecutionHandler) {
this.testRejectedExecutionHandler = testRejectedExecutionHandler;
} }</pre>
第7步: 創建IThreadPoolMonitorService接口
創建IThreadPoolMonitorService接口
package com.otv.monitor.srv;
import java.util.concurrent.ThreadPoolExecutor;
public interface IThreadPoolMonitorService extends Runnable {
public void monitorThreadPool();
public ThreadPoolExecutor getExecutor();
public void setExecutor(ThreadPoolExecutor executor);
}</pre>
第8步:創建ThreadPoolMonitorService類
ThreadPoolMonitorService類實現了IThreadPoolMonitorService接口。這個類用來監控已創建的線程池。
package com.otv.monitor.srv;import java.util.concurrent.ThreadPoolExecutor; import org.apache.log4j.Logger;
/**
- @author onlinetechvision.com
- @since 17 Oct 2011
@version 1.0.0 / public class ThreadPoolMonitorService implements IThreadPoolMonitorService {
private static Logger log = Logger.getLogger(ThreadPoolMonitorService.class); ThreadPoolExecutor executor; private long monitoringPeriod;
public void run() {
try { while (true){ monitorThreadPool(); Thread.sleep(monitoringPeriod*1000); } } catch (Exception e) { log.error(e.getMessage()); }
}
public void monitorThreadPool() {
StringBuffer strBuff = new StringBuffer(); strBuff.append("CurrentPoolSize : ").append(executor.getPoolSize()); strBuff.append(" - CorePoolSize : ").append(executor.getCorePoolSize()); strBuff.append(" - MaximumPoolSize : ").append(executor.getMaximumPoolSize()); strBuff.append(" - ActiveTaskCount : ").append(executor.getActiveCount()); strBuff.append(" - CompletedTaskCount : ").append(executor.getCompletedTaskCount()); strBuff.append(" - TotalTaskCount : ").append(executor.getTaskCount()); strBuff.append(" - isTerminated : ").append(executor.isTerminated()); log.debug(strBuff.toString());
}
public ThreadPoolExecutor getExecutor() {
return executor;
}
public void setExecutor(ThreadPoolExecutor executor) {
this.executor = executor;
}
public long getMonitoringPeriod() {
return monitoringPeriod;
}
public void setMonitoringPeriod(long monitoringPeriod) {
this.monitoringPeriod = monitoringPeriod;
} }</pre>
第9步:創建Starter類
(譯者注:這個類內部維護了一個線程池服務(testThreadPoolExecutorService)和一個監控服務(threadPoolMonitorService),然后創建線程池、啟動一個單獨的線程執行監控服務、通過線程池執行任務)
package com.otv.start;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.log4j.Logger;
import com.otv.handler.TestRejectedExecutionHandler; import com.otv.monitor.srv.IThreadPoolMonitorService; import com.otv.monitor.srv.ThreadPoolMonitorService; import com.otv.srv.ITestThreadPoolExecutorService; import com.otv.srv.TestThreadPoolExecutorService; import com.otv.task.TestTask;
/**
- @author onlinetechvision.com
- @since 17 Oct 2011
@version 1.0.0 / public class Starter {
private static Logger log = Logger.getLogger(TestRejectedExecutionHandler.class);
IThreadPoolMonitorService threadPoolMonitorService; ITestThreadPoolExecutorService testThreadPoolExecutorService;
public void start() {
// A new thread pool is created... ThreadPoolExecutor executor = testThreadPoolExecutorService.createNewThreadPool(); executor.allowCoreThreadTimeOut(true); // Created executor is set to ThreadPoolMonitorService... threadPoolMonitorService.setExecutor(executor); // ThreadPoolMonitorService is started... Thread monitor = new Thread(threadPoolMonitorService); monitor.start(); // New tasks are executed... for(int i=1;i<10;i++) { executor.execute(new TestTask("Task"+i)); } try { Thread.sleep(40000); } catch (Exception e) { log.error(e.getMessage()); } for(int i=10;i<19;i++) { executor.execute(new TestTask("Task"+i)); } // executor is shutdown... executor.shutdown();
}
public IThreadPoolMonitorService getThreadPoolMonitorService() {
return threadPoolMonitorService;
}
public void setThreadPoolMonitorService(IThreadPoolMonitorService threadPoolMonitorService) {
this.threadPoolMonitorService = threadPoolMonitorService;
}
public ITestThreadPoolExecutorService getTestThreadPoolExecutorService() {
return testThreadPoolExecutorService;
}
public void setTestThreadPoolExecutorService(ITestThreadPoolExecutorService testThreadPoolExecutorService) {
this.testThreadPoolExecutorService = testThreadPoolExecutorService;
} }</pre>
第10步:創建Application類
創建Application類。這個類運行應用程序。
package com.otv.start;
import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
- @author onlinetechvision.com
- @since 17 Oct 2011
@version 1.0.0 / public class Application {
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml"); Starter starter = (Starter) context.getBean("Starter"); starter.start();
} }</pre>
第11步:創建applicationContext.xml文件
(譯者注:在Spring中注冊了上面所創建的類,并提前設置了部分相應的參數,比如將監控服務的監控周期設為5)
<beans xmlns="http://www.springframework.org/schema/beans
<!-- Beans Declaration -->
<bean id="TestTask" class="com.otv.task.TestTask"></bean>
<bean id="ThreadPoolMonitorService" class="com.otv.monitor.srv.ThreadPoolMonitorService">
<property name="monitoringPeriod" value="5" />
</bean>
<bean id="TestRejectedExecutionHandler" class="com.otv.handler.TestRejectedExecutionHandler"></bean>
<bean id="TestThreadPoolExecutorService" class="com.otv.srv.TestThreadPoolExecutorService">
<property name="corePoolSize" value="1" />
<property name="maxPoolSize" value="3" />
<property name="keepAliveTime" value="10" />
<property name="queueCapacity" value="3" />
<property name="testRejectedExecutionHandler" ref="TestRejectedExecutionHandler" />
</bean>
<bean id="Starter" class="com.otv.start.Starter">
<property name="threadPoolMonitorService" ref="ThreadPoolMonitorService" />
<property name="testThreadPoolExecutorService" ref="TestThreadPoolExecutorService" />
</bean>
</beans></pre>
第12步:創建線程池的另一方法
Spring提供的ThreadPoolTaskExecutor類也可以創建線程池。
(譯者注:上面通過我們自己創建的TestThreadPoolExecutorService類來設置線程池的各項參數并創建線程池,但實際上Spring也提供了功能類似的類,就是ThreadPoolTaskExecutor。所以也可以使用這種方式創建線程池)
<bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="1" /> <property name="maxPoolSize" value="3" /> <property name="queueCapacity" value="3" /> </bean><bean id="testTaskExecutor" class="TestTaskExecutor"> <constructor-arg ref="threadPoolTaskExecutor" /> </bean></pre>
第13步:構建項目
OTV_Spring_ThreadPool工程被build后,就會產生一個OTV_Spring_ThreadPool-0.0.1-SNAPSHOT.jar包。
第14步:運行工程
OTV_Spring_ThreadPool-0.0.1-SNAPSHOT.jar運行后,輸出日志如下:
18.10.2011 20:08:48 DEBUG (TestRejectedExecutionHandler.java:19) - Task7 : has been rejected 18.10.2011 20:08:48 DEBUG (TestRejectedExecutionHandler.java:19) - Task8 : has been rejected 18.10.2011 20:08:48 DEBUG (TestRejectedExecutionHandler.java:19) - Task9 : has been rejected 18.10.2011 20:08:48 DEBUG (TestTask.java:25) - Task1 : is started. 18.10.2011 20:08:48 DEBUG (TestTask.java:25) - Task6 : is started. 18.10.2011 20:08:48 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 2 - CompletedTaskCount : 0 - TotalTaskCount : 5 - isTerminated : false 18.10.2011 20:08:48 DEBUG (TestTask.java:25) - Task5 : is started. 18.10.2011 20:08:53 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 0 - TotalTaskCount : 6 - isTerminated : false 18.10.2011 20:08:58 DEBUG (TestTask.java:27) - Task6 : is completed. 18.10.2011 20:08:58 DEBUG (TestTask.java:27) - Task1 : is completed. 18.10.2011 20:08:58 DEBUG (TestTask.java:25) - Task3 : is started. 18.10.2011 20:08:58 DEBUG (TestTask.java:25) - Task2 : is started. 18.10.2011 20:08:58 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 2 - TotalTaskCount : 6 - isTerminated : false 18.10.2011 20:08:58 DEBUG (TestTask.java:27) - Task5 : is completed. 18.10.2011 20:08:58 DEBUG (TestTask.java:25) - Task4 : is started. 18.10.2011 20:09:03 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 3 - TotalTaskCount : 6 - isTerminated : false 18.10.2011 20:09:08 DEBUG (TestTask.java:27) - Task2 : is completed. 18.10.2011 20:09:08 DEBUG (TestTask.java:27) - Task3 : is completed. 18.10.2011 20:09:08 DEBUG (TestTask.java:27) - Task4 : is completed. 18.10.2011 20:09:08 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 0 - CompletedTaskCount : 6 - TotalTaskCount : 6 - isTerminated : false 18.10.2011 20:09:13 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 0 - CompletedTaskCount : 6 - TotalTaskCount : 6 - isTerminated : false 18.10.2011 20:09:18 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 0 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 0 - CompletedTaskCount : 6 - TotalTaskCount : 6 - isTerminated : false 18.10.2011 20:09:23 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 0 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 0 - CompletedTaskCount : 6 - TotalTaskCount : 6 - isTerminated : false 18.10.2011 20:09:28 DEBUG (TestTask.java:25) - Task10 : is started. 18.10.2011 20:09:28 DEBUG (TestRejectedExecutionHandler.java:19) - Task16 : has been rejected 18.10.2011 20:09:28 DEBUG (TestRejectedExecutionHandler.java:19) - Task17 : has been rejected 18.10.2011 20:09:28 DEBUG (TestRejectedExecutionHandler.java:19) - Task18 : has been rejected 18.10.2011 20:09:28 DEBUG (TestTask.java:25) - Task14 : is started. 18.10.2011 20:09:28 DEBUG (TestTask.java:25) - Task15 : is started. 18.10.2011 20:09:28 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 6 - TotalTaskCount : 12 - isTerminated : false 18.10.2011 20:09:33 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 6 - TotalTaskCount : 12 - isTerminated : false 18.10.2011 20:09:38 DEBUG (TestTask.java:27) - Task10 : is completed. 18.10.2011 20:09:38 DEBUG (TestTask.java:25) - Task11 : is started. 18.10.2011 20:09:38 DEBUG (TestTask.java:27) - Task14 : is completed. 18.10.2011 20:09:38 DEBUG (TestTask.java:27) - Task15 : is completed. 18.10.2011 20:09:38 DEBUG (TestTask.java:25) - Task12 : is started. 18.10.2011 20:09:38 DEBUG (TestTask.java:25) - Task13 : is started. 18.10.2011 20:09:38 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 9 - TotalTaskCount : 12 - isTerminated : false 18.10.2011 20:09:43 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 3 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 3 - CompletedTaskCount : 9 - TotalTaskCount : 12 - isTerminated : false 18.10.2011 20:09:48 DEBUG (TestTask.java:27) - Task11 : is completed. 18.10.2011 20:09:48 DEBUG (TestTask.java:27) - Task13 : is completed. 18.10.2011 20:09:48 DEBUG (TestTask.java:27) - Task12 : is completed. 18.10.2011 20:09:48 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 0 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 0 - CompletedTaskCount : 12 - TotalTaskCount : 12 - isTerminated : true 18.10.2011 20:09:53 DEBUG (ThreadPoolMonitorService.java:39) - CurrentPoolSize : 0 - CorePoolSize : 1 - MaximumPoolSize : 3 - ActiveTaskCount : 0 - CompletedTaskCount : 12 - TotalTaskCount : 12 - isTerminated : true第15步:下載