SpringBatch企業批處理框架Reader的使用
SpringBatch是解決企業數據邏輯較簡單,重復性高,大數據量而設計的.從他提供的各種Reader就能看出來.起碼我是這樣理解的.最適合做的如:數據清洗,數據分析后轉移,或者定時的和其他系統交互的地方等.
在上一篇文章中,我使用了 JdbcPagingItemReader讀取HSQLDB數據庫的數據.
<bean id="sysAppStoreMapper" class="net.dbatch.mapper.SysAppStoreMapper" /> <bean id="dbReader" class="org.springframework.batch.item.database.JdbcPagingItemReader"> <property name="dataSource" ref="dataSource"/> <property name="rowMapper" ref="sysAppStoreMapper"/> <property name="queryProvider" ref="appQueryProvider"/> </bean> <bean id="appQueryProvider" class="org.springframework.batch.item.database.support.HsqlPagingQueryProvider"> <property name="selectClause" value="a.APP_ID, a.PARENT_ID, a.APP_DESC, a.APP_URL, a.FOLDER, a.SEQ"/> <property name="fromClause" value="sys_appstore a"/> <property name="sortKey" value="SEQ"/> </bean>
事實上SpringBatch提供了很多的Reader,自定義的Reader只要是繼承自org.springframework.batch.item.ItemReader接口的都可以.但是好多都不用你麻煩了,SpringBatch都替你做好了.2.1.8API中基本常用的和數據庫[Hibernate/Ibatis/JDBC],文件系統,JMS消息等Reader現成的實現.如圖:

對于喜歡SpringJDBC的用戶[我就非常不喜歡Hibernate ]可以使用JdbcPagingItemReader
,然后指定一個queryProvider ,queryProvider 是針對各種數據庫的一個分頁的實現,常用的數據庫 queryProvider也有現成的.如圖:

好了.如果上面你實在找不到你可以使用的數據庫對應的實現,而你又了解你的數據庫SQL,你可以使用JdbcCursorItemReader.這個Reader允許你自己set SQL.
如我上面實現的例子,用JdbcCursorItemReader改寫也非常簡單:
<bean id="dbReader" class="org.springframework.batch.item.database.JdbcCursorItemReader"> <property name="dataSource" ref="dataSource" /> <property name="sql" value="select a.APP_ID, a.PARENT_ID, a.APP_DESC, a.APP_URL, a.FOLDER from sys_appstore a order by a.SEQ" /> <property name="rowMapper" ref="sysAppStoreMapper" /> </bean>
他仍然可以工作的很好,而且還簡單了.
如果我的數據來源不是從數據庫,從文件的怎么辦?
看到剛才的Reader實現里有個FlatFileItemReader沒?他就是讀取文件[文本文件]的.
假如我要分析這樣結構的log日志信息
User1,20 User2,21 User3,22 User4,23 User5,24 User6,25 User7,26 User8,27 User9,28 User10,29
他都是一些結構化的文本文件,我可以很容易的實現.如Spring代碼:
<bean id="delimitedLineTokenizer" class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer" /> <bean id="lineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper"> <property name="lineTokenizer" ref="delimitedLineTokenizer" /> <property name="fieldSetMapper"> <bean class="net.dbatch.sample.UserMapper" /> </property> </bean> <bean id="messageReader" class="org.springframework.batch.item.file.FlatFileItemReader"> <property name="lineMapper" ref="lineMapper" /> <property name="resource" value="classpath:/users.txt" /> </bean>
再寫上一個對應的Bean
public class UserMapper implements FieldSetMapper<User> { public User mapFieldSet(FieldSet fs) throws BindException { User u = new User(); u.setName(fs.readString(0)); u.setAge(fs.readInt(1)); return u; } }
Processor:
public class MessagesItemProcessor implements ItemProcessor<User, Message> { public Message process(User user) throws Exception { if(!StringUtils.hasText(user.getName())){ throw new RuntimeException("The user name is required!"); } Message m = new Message();//Message是user一個簡單的包裝 m.setUser(user); m.setContent("Hello " + user.getName() + ",please pay promptly at end of this month."); return m; } }
Writer:
public class MessagesItemWriter implements ItemWriter<Message> { public void write(List<? extends Message> messages) throws Exception { System.out.println("write results"); for (Message m : messages) { System.out.println(m.getContent()); //只做輸出 } } }
測試代碼:
public static void main(String[] args) { ClassPathXmlApplicationContext c = new ClassPathXmlApplicationContext("localfile_job.xml"); SimpleJobLauncher launcher = new SimpleJobLauncher(); launcher.setJobRepository((JobRepository) c.getBean("jobRepository")); launcher.setTaskExecutor(new SyncTaskExecutor()); try { JobExecution je = launcher.run((Job) c.getBean("messageJob"), new JobParametersBuilder().toJobParameters()); System.out.println(je); System.out.println(je.getJobInstance()); System.out.println(je.getStepExecutions()); } catch (Exception e) { e.printStackTrace(); } }
輸出:
10-20 15:28:32 INFO [job.SimpleStepHandler] - <Executing step: [messageStep]> write results Hello User1,please pay promptly at end of this month. Hello User2,please pay promptly at end of this month. Hello User3,please pay promptly at end of this month. Hello User4,please pay promptly at end of this month. Hello User5,please pay promptly at end of this month. write results Hello User6,please pay promptly at end of this month. Hello User7,please pay promptly at end of this month. Hello User8,please pay promptly at end of this month. Hello User9,please pay promptly at end of this month. Hello User10,please pay promptly at end of this month. 10-20 15:28:32 INFO [support.SimpleJobLauncher] - <Job: [FlowJob: [name=messageJob]] completed with the following parameters: [{run.month=2011-10}] and the following status: [COMPLETED]> JobExecution: id=0, version=2, startTime=Sat Oct 20 15:28:32 CST 2012, endTime=Sat Oct 20 15:28:32 CST 2012, lastUpdated=Sat Oct 20 15:28:32 CST 2012, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=0, version=0, JobParameters=[{run.month=2011-10}], Job=[messageJob]] JobInstance: id=0, version=0, JobParameters=[{run.month=2011-10}], Job=[messageJob] [StepExecution: id=1, version=5, name=messageStep, status=COMPLETED, exitStatus=COMPLETED, readCount=10, filterCount=0, writeCount=10 readSkipCount=0, writeSkipCount=0, processSkipCount=0, commitCount=3, rollbackCount=0, exitDescription=]
從日志里,我們可以清楚的看到.他是每行的讀取并送入Processor中處理,完成5次讀取進行一次性的寫入.tasklet的屬性 commit-interval可以調節此值.
全部的Spring配置:
<batch:job id="messageJob" restartable="true"> <batch:step id="messageStep"> <batch:tasklet> <batch:chunk reader="messageReader" processor="messageProcessor" writer="messageWriter" commit-interval="5" chunk-completion-policy="" retry-limit="2"> <batch:retryable-exception-classes> <batch:include class="java.lang.RuntimeException" /> </batch:retryable-exception-classes> </batch:chunk> </batch:tasklet> </batch:step> </batch:job>