Boost::Asio庫詳解

jopen 9年前發布 | 14K 次閱讀 Boost C/C++開發

io_service

所有的異步操作:異步網絡讀寫,異步時鐘,都在io_service.run()時進行輪詢。有趣的是,io_service在線程利用方面下了很大的功夫,你可以在主線程建立它的實例,但是在多個線程里面run,io_service很擅長于將需要執行的回調函數分配到空閑線程當中。

io_service為了跨平臺,不得不在Linux下也實現Proactor模型。再說一遍,Asio的實現是Proactor模型。在Linux普遍是Reactor的情況下模擬出Proactor,作者也是很辛苦啊。run里面同時需要調度來自用戶post的Handler和來自操作系統的IO請求(或者說是 Servicespost的Handler)。

這里有一個Service的概念。Service是從一類IO操作中抽象出來的一些共同的地方。在你調用socket.async_read之類的操作之后,事實上它將真正跟操作系統打交道的操作托管給了basic_stream_socket_service。

io_service的調度有多種實現,在 epoll 實現里,作者很機智地把 "有空閑進程" 事件轉化為一個水平觸發(Level Trigger),這樣就可以在等待中醒來,進行調度了。

buffer

庫中提供了mutable_buffer和const_buffer兩種 單個緩沖 ,以及mutable_buffers_1和const_buffers_1兩種 緩沖序列 ,asio提供了一些操作:

  • buffer_cast<char*>(mb): 單個緩沖 轉成指針

    </li>

  • buffer_size(buf):取得緩沖區大小

    </li>

  • buffer_copy(bufs, bufd):緩沖區(包括單個和序列)之間復制 盡可能多的元素 ,它是安全的,不會出現溢出的情況

    </li> </ul>

    //buffer can wrap nearly everything
    vector<char> underlying_buffer(50);
    auto buf1 = buffer(underlying_buffer);

    char underlying_string[] = "Hello"; auto buf2 = buffer(underlying_string);

    buffer_copy(buf2, buf1); // returns 6

    std::string s(buffer_begin(buf1), buffer_end(buf1));</pre>

    streambuf

    streambuf是Asio能靈活地異步調控數據的關鍵。它能自動增長和回收consumed space。在使用的時候有這些要點:

    • streambuf分為input sequence和output sequence兩部分,這都是繼承自std::streambuf的理念。

      </li>

    • 用data()來獲取輸入序列(常緩沖),prepare(n)來獲取輸出序列(變緩沖)。

      </li>

    • 用commit(n)來將從輸出序列提交到輸入緩沖,用consume(n)來將輸入緩沖中的數據丟棄,代表已經處理完畢。

      </li>

    • read數據的過程:先prepare好固定大小的緩沖區,然后buffer_copy進去一些數據,拷貝了多少數據就commit多少數據。然后再從prepare開始,拷貝到手頭上的數據沒有了為止。

      </li>

    • streambuf不可拷貝,所以乖乖傳引用或者指針吧。

      </li> </ul>

      這里有一些文檔沒有說明但是需要了解的細節:

      • 自動增長的功能是通過reserve(n)函數管理的,這個函數在overflow和prepare處會調用,用于保證輸出緩沖區里有至少n字節的空間。

        </li>

      • 緩沖區的大小是沒有限制的,max_size一般是size_t能表示的最大數字,相當于內存的極限。

        </li>

      • std::istream(sb).ignore(n),sb.gbump(n),sb.consume(n)有相同的效果,但是ignore和consume有防下溢機制。

        </li> </ul>

        async_read_xxxx

        值得注意的是async_read_until(socket, streambuf, condition, handler)函數,給我們處理數據分段帶來了極大的方便。它內建了4種條件決定read到什么時候停止 :

        • 出現了某個字符

          </li>

        • 出現了某條字串

          </li>

        • 符合了某條 Regular Expression

          </li>

        • 符合了某條謂詞的條件

          </li> </ul>

          注意:這里的"出現"指的是streambuf的input sequence中出現,也就是如果原本streambuf中的內容已經符合條件,則async_read_until將立即呼叫回調。

          推論:某些庫的until是不包含結束符的,比如readLine沒有換行符。但是asio是包含的。

          </div>

          using boost::system::error_code;
          // this piece of code shows how to read Chunked Tranfer-Encoding
          streambuf sbuf;
          std::string content;
          void chunked_handler(const error_code& ec, std::size_t sz)
          {
              std::istream is(sbuf);
              std::size_t chunk_size;
              is >> std::hex >> chunk_size;
              std::assert(is.get() == '\r' && is.get() == '\n');
              async_read(socket, sbuf,
                  [chunk_size] (const error_code& ec, std::size_t) {
                      return chunk_size + 2 - sbuf.size();}, 
                  [chunk_size] (const error_code& ec, std::size_t) {
                      std::istream is(sbuf);
                      boost::scoped_array<char> cbuf(new char[chunk_size]);
                      is.read(cbuf, chunk_size);
                      content += std::string(cbuf, chunk_size);
                      std::assert(is.get() == '\r' && is.get() == '\n');
                      async_read_until(socket, sbuf,
                          std::string("\r\n"), chunked_handler); });
          }
          async_read_until(socket, sbuf,
              std::string("\r\n"), chunked_handler);

          Networking

          基本上都是ip::tcp里的東西了。acceptor相當于Java里的ServerSocket,沒有任何數據傳輸的功能,只有作為服務器監聽端口接收連接的功能。socket就是一般意義上的socket了,沒有什么特別的。iostream是一個很聰明的設計,你可以用operator >>和operator <<來進行數據傳輸了,不過是同步阻塞的。這是使用上的一些要點:

          • socket::read_some是非阻塞的,socket::async_read_some會立即回調。read some的意義是,有多少讀多少,沒有就不讀直接返回。write_some同理,如果網絡不暢導致內核緩沖區滿的話,返回0都是有可能的。

            </li>

          • async_***的話很多時候要靈活運用std::bind了。

            </li>

          • query中的服務參數service可以是端口或者服務名,定義在/etc/services中。

            </li>

          • socket::connect僅對一個endpoint進行連接,connect可對迭代器所指示的一系列endpoint進行連接,直到有其中一個成功連接為止。

            </li> </ul>

            using boost::system::error_code;
            // server: accept
            tcp::ip::socket sock(service);
            tcp::ip::acceptor acc(service);
            tcp::ip::endpoint ep(tcp::ip::v4(), 8080);
            acc.async_accpet(sock, eq, [sock] (const error_code&) {
                // new connection handling
            });
            //client: resolve + connect
            tcp::ip::resolver resolver(service);
            tcp::ip::resolver::query qurey("www.example.com", "http" /*"80"*/);
            resolver.async_resolve(query, [] (
                    const error_code& ec,
                    tcp::ip::resolver::iterator i) {
                tcp::ip::socket sock(service);
                tcp::ip::async_connect(i, [sock] (
                        const error_code& ec,
                        tcp::ip::resolver::iterator i) {
                    // new connection handling
                });
            });
            原文 http://segmentfault.com/a/1190000003706906

           本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
           轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
           本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!