Java多線程實現文件快速切分

TerriMabry 8年前發布 | 1K 次閱讀 Java

前段時間需要進行大批量數據導入,DBA給提供的是CVS文件,但是每個CVS文件都好幾個GB大小,直接進行load,數據庫很慢還會產生內存不足的問題,為了實現這個功能,寫了個快速切分文件的程序。
</div>

 

[Java]代碼    

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.*;
import java.util.*;
import java.util.concurrent.*;

public class FileSplitUtil {

    private final static Logger log = LogManager.getLogger(FileSplitUtil.class);
    private static final long originFileSize = 1024 * 1024 * 100;// 100M
    private static final int blockFileSize = 1024 * 1024 * 64;// 防止中文亂碼,必須取2的N次方
    /**
     * CVS文件分隔符
     */
    private static final char cvsSeparator = '^';
    public static  void  main(String args[]){
        long start = System.currentTimeMillis();
        try {
            String fileName = "D:\\csvtest\\aa.csv";
            File sourceFile = new File(fileName);
            if (sourceFile.length() >= originFileSize) {
                String cvsFileName = fileName.replaceAll("\\\\", "/");
                FileSplitUtil fileSplitUtil = new FileSplitUtil();
                List<String> parts=fileSplitUtil.splitBySize(cvsFileName, blockFileSize);
                for(String part:parts){
                    System.out.println("partName is:"+part);
                }
            }
            System.out.println("總文件長度"+sourceFile.length()+",拆分文件耗時:" + (System.currentTimeMillis() - start) + "ms.");
        }catch (Exception e){
            log.info(e.getStackTrace());
        }

    }



    /**
     * 拆分文件
     *
     * @param fileName 待拆分的完整文件名
     * @param byteSize 按多少字節大小拆分
     * @return 拆分后的文件名列表
     */
    public List<String> splitBySize(String fileName, int byteSize)
            throws IOException, InterruptedException {
        List<String> parts = new ArrayList<String>();
        File file = new File(fileName);
        int count = (int) Math.ceil(file.length() / (double) byteSize);
        int countLen = (count + "").length();
        RandomAccessFile raf = new RandomAccessFile(fileName, "r");
        long totalLen = raf.length();
        CountDownLatch latch = new CountDownLatch(count);

        for (int i = 0; i < count; i++) {
            String partFileName = file.getPath() + "."
                    + leftPad((i + 1) + "", countLen, '0') + ".cvs";
            int readSize=byteSize;
            long startPos=(long)i * byteSize;
            long nextPos=(long)(i+1) * byteSize;
            if(nextPos>totalLen){
                readSize= (int) (totalLen-startPos);
            }
            new SplitRunnable(readSize, startPos, partFileName, file, latch).run();
            parts.add(partFileName);
        }
        latch.await();//等待所有文件寫完
        //由于切割時可能會導致行被切斷,加工所有的的分割文件,合并行
        mergeRow(parts);
        return parts;
    }

    /**
     * 分割處理Runnable
     *
     * @author supeidong
     */
    private class SplitRunnable implements Runnable {
        int byteSize;
        String partFileName;
        File originFile;
        long startPos;
        CountDownLatch latch;
        public SplitRunnable(int byteSize, long startPos, String partFileName,
                             File originFile, CountDownLatch latch) {
            this.startPos = startPos;
            this.byteSize = byteSize;
            this.partFileName = partFileName;
            this.originFile = originFile;
            this.latch = latch;
        }

        public void run() {
            RandomAccessFile rFile;
            OutputStream os;
            try {
                rFile = new RandomAccessFile(originFile, "r");
                byte[] b = new byte[byteSize];
                rFile.seek(startPos);// 移動指針到每“段”開頭
                int s = rFile.read(b);
                os = new FileOutputStream(partFileName);
                os.write(b, 0, s);
                os.flush();
                os.close();
                latch.countDown();
            } catch (IOException e) {
                log.error(e.getMessage());
                latch.countDown();
            }
        }
    }

    /**
     * 合并被切斷的行
     *
     * @param parts
     */
    private void mergeRow(List<String> parts) {
        List<PartFile> partFiles = new ArrayList<PartFile>();
        try {
            //組裝被切分表對象
            for (int i=0;i<parts.size();i++) {
                String partFileName=parts.get(i);
                File splitFileTemp = new File(partFileName);
                if (splitFileTemp.exists()) {
                    PartFile partFile = new PartFile();
                    BufferedReader reader=new BufferedReader(new InputStreamReader(new FileInputStream(splitFileTemp),"gbk"));
                    String firstRow = reader.readLine();
                    String secondRow = reader.readLine();
                    String endRow = readLastLine(partFileName);
                    partFile.setPartFileName(partFileName);
                    partFile.setFirstRow(firstRow);
                    partFile.setEndRow(endRow);
                    if(i>=1){
                        String prePartFile=parts.get(i - 1);
                        String preEndRow = readLastLine(prePartFile);
                        partFile.setFirstIsFull(getCharCount(firstRow+preEndRow)>getCharCount(secondRow));
                    }

                    partFiles.add(partFile);
                    reader.close();
                }
            }
            //進行需要合并的行的寫入
            for (int i = 0; i < partFiles.size() - 1; i++) {
                PartFile partFile = partFiles.get(i);
                PartFile partFileNext = partFiles.get(i + 1);
                StringBuilder sb = new StringBuilder();
                if (partFileNext.getFirstIsFull()) {
                    sb.append("\r\n");
                    sb.append(partFileNext.getFirstRow());
                } else {
                    sb.append(partFileNext.getFirstRow());
                }
                writeLastLine(partFile.getPartFileName(),sb.toString());
            }
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }

    /**
     * 得到某個字符出現的次數
     * @param s
     * @return
     */
    private int getCharCount(String s) {
        int count = 0;
        for (int i = 0; i < s.length(); i++) {
            if (s.charAt(i) == cvsSeparator) {
                count++;
            }
        }
        return count;
    }

    /**
     * 采用BufferedInputStream方式讀取文件行數
     *
     * @param filename
     * @return
     */
    public int getFileRow(String filename) throws IOException {
        InputStream is = new BufferedInputStream(new FileInputStream(filename));
        byte[] c = new byte[1024];
        int count = 0;
        int readChars = 0;
        while ((readChars = is.read(c)) != -1) {
            for (int i = 0; i < readChars; ++i) {
                if (c[i] == '\n')
                    ++count;
            }
        }
        is.close();
        return count;
    }

    /**
     * 讀取最后一行數據
     * @param filename
     * @return
     * @throws IOException
     */
    private String readLastLine(String filename) throws IOException {
        // 使用RandomAccessFile , 從后找最后一行數據
        RandomAccessFile raf = new RandomAccessFile(filename, "r");
        long len = raf.length();
        String lastLine = "";
        if(len!=0L) {
            long pos = len - 1;
            while (pos > 0) {
                pos--;
                raf.seek(pos);
                if (raf.readByte() == '\n') {
                    lastLine = raf.readLine();
                    lastLine=new String(lastLine.getBytes("8859_1"), "gbk");
                    break;
                }
            }
        }
        raf.close();
        return lastLine;
    }
    /**
     * 修改最后一行數據
     * @param fileName
     * @param lastString
     * @return
     * @throws IOException
     */
    private void writeLastLine(String fileName,String lastString){
        try {
            // 打開一個隨機訪問文件流,按讀寫方式
            RandomAccessFile randomFile = new RandomAccessFile(fileName, "rw");
            // 文件長度,字節數
            long fileLength = randomFile.length();
            //將寫文件指針移到文件尾。
            randomFile.seek(fileLength);
            //此處必須加gbk,否則會出現寫入亂碼
            randomFile.write(lastString.getBytes("gbk"));
            randomFile.close();
        } catch (IOException e) {
            log.error(e.getMessage());
        }
    }
    /**
     * 左填充
     *
     * @param str
     * @param length
     * @param ch
     * @return
     */
    public static String leftPad(String str, int length, char ch) {
        if (str.length() >= length) {
            return str;
        }
        char[] chs = new char[length];
        Arrays.fill(chs, ch);
        char[] src = str.toCharArray();
        System.arraycopy(src, 0, chs, length - src.length, src.length);
        return new String(chs);
    }

    /**
     * 合并文件行內部類
     */
    class PartFile {
        private String partFileName;
        private String firstRow;
        private String endRow;
        private boolean firstIsFull;

        public String getPartFileName() {
            return partFileName;
        }

        public void setPartFileName(String partFileName) {
            this.partFileName = partFileName;
        }

        public String getFirstRow() {
            return firstRow;
        }

        public void setFirstRow(String firstRow) {
            this.firstRow = firstRow;
        }

        public String getEndRow() {
            return endRow;
        }

        public void setEndRow(String endRow) {
            this.endRow = endRow;
        }

        public boolean getFirstIsFull() {
            return firstIsFull;
        }

        public void setFirstIsFull(boolean firstIsFull) {
            this.firstIsFull = firstIsFull;
        }
    }

}
 本文由用戶 TerriMabry 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!