Nodejs 實時監控文件內容的變化及按行讀取文本文件

jopen 9年前發布 | 49K 次閱讀 Node.js 開發 NodeJS

原文 http://segmentfault.com/a/1190000003902877

需求問題

在使用nodejs開發項目的過程中,有一個需求需要實時監控指定文件的變更并按行讀取最新的文件內容,在nodejs中有相應地API么?具體怎么使用呢~~~

fs.watchFile-實時監控文件變化

nodejs的fs模塊提供了watchFile方法,在文件內容每次發生變化時就觸發相應回調函數。回調函數提供2個參數-當前的文件狀態對象fs.state和文件發生變化時的文件狀態fs.state,比較2者的最后修改時間就能知道文件內容是否發生過變化。unwatchfile取消對指定文件的監控

fs.read/fs.createReadStream-讀取文件內容

在nodejs中讀取文件內容最基本的API為fs.read,也可以通過創建一個文件流,監聽data/end事件讀取文件內容

  • fs.read方法可以自由的控制要讀取多少內容從那個位置讀取,對于一個大文件來說需要反復調用才能獲取全部內容。

    </li> </ul>

    fs.read(fd, buffer, offset, length, position, callback)

    </div>

    fd為打開的文件描述符,也就數文件對象

    buffer指定此次read操作讀取的數據寫入的緩存區對象

    offset讀取的數據從緩沖區對象的哪個位置開始寫入

    length此次read操作想要讀取的數據長度

    position指定從fd的哪個位置開始讀取數據,指定為null,那么數據此文件的當前位置開始讀取

    callback 函數提供3個參數err、bytesRead,buffer-是否發生了錯誤,本地讀到的字節長度,保存讀到的數據的緩沖區

    </div>

    • fs.createReadStream 文件流的方式則用通過事件監聽的方式順序的讀取文件內容,無法從指定的位置開始讀取文件內容

      var fileReadStream=fs.createReadStream(filename,{
          encoding:'utf8'
      });
      var fileContent="";
      fileReadStream.on('data',functon(data){
          fileContent+=data;
      });
      fileReadStream.on('end',function(){
          console.log('file read ');
      });

    Readline模塊-按行讀取文件內容

    nodejs提供readline模塊按行讀取文件流數據內容的方法。通過創建一個readline的interface對象,監聽readline事件就可以獲取input輸入流的行數據。當輸入流讀取完畢后將觸發close事件

    var readline = require('readline');
    var rl = readline.createInterface({
      input: fs.createReadStream(filename,{
          enconding:'utf8'
      }),
      output: null
    });
    rl.on('readline',function(line){
        console.log('got line content:%s',line);
    });

    解決方案

    文件內容變化的監控用fs.watchFile方法實現,最新內容的讀取就只有fs.read方法可以使用了。

    因為無論是使用readline模塊還是使用fs.createReadStream文件流接口都無法從特定的位置(也就是上次文件內容的最后更新位置)開始讀取數據,只能從頭開始讀取文件內容,這個對于-文件監控并讀取最新的內容-這個需求來說是不合適也是完全沒有效率的。

    使用fs.read方法需要提供一種機制能夠按行解析出數據并且能夠觸發消息通知監聽者,有行數據讀取完成。并且在讀取到的數據長度不夠獲取沒有檢測到換行標識符(\r\n)時,保留這些數據。

    最終的方案是這樣的:

    1. fs.watchFile檢查到文件內容發生了變化

    2. 調用fs.read方法讀取指定的字節數據到到buffer緩沖對象中

    3. 將獲取的字節數據和上次read行解析后遺留的內容合并成一個新的buffer對象

    4. 將buffer對象通過換行符解析出行數據

    5. 每解析出一條行數據,就觸發一個line事件,通知監聽者已讀到新的行數據

    6. 保留行解析后遺留數據

    7. 如果本次read讀取到的實際數據長度小于buffer緩沖區長度,說明已經到達文件的末尾,沒有更多地數據能夠讀取到了。回到1 等到內容變化的通知

    8. 否則回到2,并從上次讀取的最后位置開始讀取

    9. 如果讀取行數據內容為===END===或者行解析后的遺留內容為===END===,那么將調用fs.unwatchFile停止文件內容的監控并不再調用fs.read

    代碼

    var fs=require('fs');
    var EM = require("events").EventEmitter;
    var util = require('util');
    var EventEmitter = require('events').EventEmitter;
    var newlines = [
        //13, // \r
        10  // \n
    ];
    function createLineReader(fileName) {
        if (!(this instanceof createLineReader)) return new createLineReader(fileName,monitorFlag);
        var self=this;
        var currentFileUpdateFlag=0;
        var fileOPFlag="a+";
        fs.open(fileName,fileOPFlag,function(error,fd){
            var buffer;
            var remainder = null;
               fs.watchFile(fileName,{
               persistent: true,
               interval: 1000
            },function(curr, prev){
               //console.log('the current mtime is: ' + curr.mtime);
               //console.log('the previous mtime was: ' + prev.mtime);
               if(curr.mtime>prev.mtime){
                   //文件內容有變化,那么通知相應的進程可以執行相關操作。例如讀物文件寫入數據庫等
                   continueReadData();
               }else{
                   //console.log('curr.mtime<=prev.mtime');
               }
               });
            //先讀取原來文件中內容
            continueReadData();
            function continueReadData(){
                //var fileUpdateFlag=fileUpdateFlagIn;
                buffer=new Buffer(2048);
                var start = 0,i=0,tmp;
                fs.read(fd,buffer,0,buffer.length,null,function(err, bytesRead, buffer){
                    var data=buffer.slice(0,bytesRead)
                    if(remainder != null){//append newly received data chunk
                        //console.log("remainder length:"+remainder.length);
                        tmp = new Buffer(remainder.length+bytesRead);
                        remainder.copy(tmp);
                        //data=buffer.slice(0,bytesRead);
                        data.copy(tmp,remainder.length)
                        data = tmp;
                    }
                    //console.log("data length:"+data.length);
                    for(i=0; i<data.length; i++){
                        if(newlines.indexOf(data[i]) >=0){ //\r \n new line
                            var line = data.slice(start,i);
                            self.emit("line", line);
                            start = i+1;
                        }
                    }
                    if(start<data.length){
                        remainder = data.slice(start);
                        if(remainder.toString()==='===END==='){
                            self.emit("end");
                            stopWatch();
                            return;
                        }
                    }else{
                        remainder = null;
                    }
                    if(bytesRead<buffer.length){
                        return;
                    }else{
                        //console.log('~~continue~~');
                        continueReadData();
                    }
                });
            }
            function stopWatch(){
                fs.unwatchFile(fileName);
            }
        });
    }
    util.inherits(createLineReader, EventEmitter);
    module.exports=createLineReader;
     本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
     轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
     本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!