python模塊介紹-asynchat 異步socket命令/響應處理器

jopen 11年前發布 | 33K 次閱讀 Python開發 Python

該模塊基于asyncore簡化了異步客戶端和服務器,并使其更容易元素處理由任意的字符串結束,或者是可變長度的的協議。它提供了抽象類async_chat,提供collect_incoming_data()和found_terminator()方法。循環和 asyncore的一樣,有2種信道:asyncore.dispatcher和asynchat.async_chat,可以自由混合信道。通常 asyncore.dispatcher服務器通道在接收到連接請求時產生新的asynchat.async_chat通道對象

 

接口文檔

  • class asynchat.async_chat:這個類是asyncore.dispatcher的抽象子類。一般使用其collect_incoming_data()和found_terminator()方法,asyncore.dispatcher的方法都可以使用,盡管不是所有在消息/響應上下文都有意義。

    • async_chat.close_when_done()推None到生產者FIFO。當此None彈出的fifo時,通道將關閉。

    • async_chat.collect_incoming_data(data):數據到達。默認引發NotImplementedError異常。使用時必須重載。

    • async_chat.discard_buffers():在緊急情況下方法將丟棄在輸入和/或輸出緩沖器和生產者FIFO持有數據。

    • async_chat.found_terminator():當輸入的數據流相匹配set_terminator()中設定的終止條件時調用。默認引發NotImplementedError異常。使用時必須重載。緩沖輸入的數據在實例屬性是可用的。

    • async_chat.get_terminator():返回當前通道的結束符。

    • async_chat.push(data):往通道的FIFO中添加數據,當然可以使用自己制作的更復雜的方案來實現加密和分塊之類的。

    • async_chat.push_with_producer(producer):往通道的FIFO中添加數據生產者FIFO中添加生產者。當所有當前生產者隊列用盡之后將調用其more()方法并發送數據到遠端。

    • async_chat.set_terminator(term) 設定終止符號。類型如下:

    • string: Will call found_terminator() when the string is found in the input stream。

    • integer: Will call found_terminator() when the indicated number of characters have been received。

    • None: The channel continues to collect data forever。

    • ac_in_buffer_size異步輸入緩沖區的大小(默認4096)。

    • ac_out_buffer_size異步輸出緩沖區的大小(默認4096)。

    • 像asyncore.dispatcher, async_chat定義了一組由select()調用后的socket條件分析生成的事件。輪詢循環開始后async_chat對象的方法被事件處理框架調用,不需要程序員干預。。兩個類的屬性可以修改,以提高性能,甚至節省內存。

    • 不像asyncore.dispatcher, async_chat允許您定義生產者的先進先出隊列(FIFO)。生產者有唯一的方法中more(),返回通道上要傳輸的數據。生產者沒有數據時返回空字符串。這時async_chat對象從FIFO移除生產者并開始使用下一個生產者,如果有的話。當生產者FIFO為空時handle_write()什么都不會做。通道對象的set_terminator()方法來識別遠端的結束,重要的斷點,輸入。

    • 為了建立一個有效的async_chat子類,方法collect_incoming_data()和found_terminator()必須處理該通道異步接收數據。相關方法如下:

  • class asynchat.fifo([list=None]): 輔助類asynchat.fifo的方法如下:

    • is_empty(): Returns True if and only if the fifo is empty.

    • first(): Returns the least-recently push()ed item from the fifo.

    • push(data): Adds the given data (which may be a string or a producer object) to the producer fifo.

    • pop(): If the fifo is not empty, returns True, first(), deleting the popped item. Returns False, None for an empty fifo.

HTTP實例

下例說明如何通過async_chat讀取HTTP請求。 Web服務器可以為每個傳入客戶端連接創建一個http_request_handler對象。請注意, 初始的信道終結符設置為HTTP頭的末尾空行和一個表示正在讀取頭的標志。

報頭讀完后,如果請求的類型是POST的(這表明還有其他數據),則內容長度:頭用來設置數值終止符。

數據整理完畢, 設置terminator為None,確保web客戶端不會讀取多余的數據之后調用handle_request()進行數據處理。

class http_request_handler(asynchat.async_chat):
    def __init__(self, sock, addr, sessions, log):
        asynchat.async_chat.__init__(self, sock=sock)
        self.addr = addr        self.sessions = sessions        self.ibuffer = []
        self.obuffer = ""
        self.set_terminator("\r\n\r\n")
        self.reading_headers = True
        self.handling = False
        self.cgi_data = None
        self.log = log    def collect_incoming_data(self, data):
        """Buffer the data"""
        self.ibuffer.append(data)
    def found_terminator(self):
        if self.reading_headers:
            self.reading_headers = False
            self.parse_headers("".join(self.ibuffer))
            self.ibuffer = []
            if self.op.upper() == "POST":
                clen = self.headers.getheader("content-length")
                self.set_terminator(int(clen))
            else:
                self.handling = True
                self.set_terminator(None)
                self.handle_request()
        elif not self.handling:
            self.set_terminator(None) # browsers sometimes over-send
            self.cgi_data = parse(self.headers, "".join(self.ibuffer))
            self.handling = True
            self.ibuffer = []
            self.handle_request()

注意:這并不是一個不添加代碼就可以實際運行的實例。

ECHO實例

下面實例來源于《The Python Standard Library by Example 2011》,更詳細。

asynchat_echo_server.py:

import asyncoreimport loggingimport socketfrom asynchat_echo_handler import EchoHandlerclass EchoServer(asyncore.dispatcher):
    """Receives connections and establishes handlers for each client.
    """
    
    def __init__(self, address):
        asyncore.dispatcher.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.bind(address)
        self.address = self.socket.getsockname()
        self.listen(1)
        return

    def handle_accept(self):
        # Called when a client connects to our socket
        client_info = self.accept()
        EchoHandler(sock=client_info[0])
        # Only deal with one client at a time,
        # so close as soon as the handler is set up.
        # Under normal conditions, the server
        # would run forever or until it received
        # instructions to stop.
        self.handle_close()
        return
    
    def handle_close(self):
        self.close()

asynchat_echo_handler.py: EchoHandler繼承asynchat.async_chat,收發可以自動處理,不過需要處理如下東東:

  • 處理輸入數據,重載handle_incoming_data()

  • 認識輸入數據的結束符,通過set_terminator()

  • 收到完整信息后的處理:found_terminator()

  • 發送數據push()

該示例應用程序有兩種工作模式。它要么等待像"ECHO length\n"之類的命令,要么等待要顯示的數據,通過實例變量process_data切換模式。一旦找到完整的命令,處理程序切換到消息處理模式,并等待接收的完整文本。當所有的數據可用時,它被推入到輸出信道。數據被發送處理程序被設置為關閉。

import asynchatimport loggingclass EchoHandler(asynchat.async_chat):
    """Handles echoing messages from a single client.
    """

    # Artificially reduce buffer sizes to illustrate
    # sending and receiving partial messages.
    ac_in_buffer_size = 128
    ac_out_buffer_size = 128
    
    def __init__(self, sock):
        self.received_data = []
        self.logger = logging.getLogger('EchoHandler')
        asynchat.async_chat.__init__(self, sock)
        # Start looking for the ECHO command
        self.process_data = self._process_command        self.set_terminator('\n')
        return

    def collect_incoming_data(self, data):
        """Read an incoming message from the client
        and put it into the outgoing queue.
        """
        self.logger.debug(
            'collect_incoming_data() -> (%d bytes) %r',
            len(data), data)
        self.received_data.append(data)

    def found_terminator(self):
        """The end of a command or message has been seen."""
        self.logger.debug('found_terminator()')
        self.process_data()
    
    def _process_command(self):        
        """Have the full ECHO command"""
        command = ''.join(self.received_data)
        self.logger.debug('_process_command() %r', command)
        command_verb, command_arg = command.strip().split(' ')
        expected_data_len = int(command_arg)
        self.set_terminator(expected_data_len)
        self.process_data = self._process_message        self.received_data = []
    
    def _process_message(self):
        """Have read the entire message."""
        to_echo = ''.join(self.received_data)
        self.logger.debug('_process_message() echoing %r',
                          to_echo)
        self.push(to_echo)
        # Disconnect after sending the entire response
        # since we only want to do one thing at a time
        self.close_when_done()

客戶端和處理器類似。要發送的消息先傳遞給客戶端的構造函數。當socket連接建立后,調用handle_connect()發送命令和消息數據。

命令是直接push,消息文本則使用生產者類。生產者有輪詢機制把數據塊發送到網絡。當生產者返回空字符串,寫停止。

import asynchatimport loggingimport socketclass EchoClient(asynchat.async_chat):
    """Sends messages to the server and receives responses.
    """

    # Artificially reduce buffer sizes to show
    # sending and receiving partial messages.
    ac_in_buffer_size = 128
    ac_out_buffer_size = 128
    
    def __init__(self, host, port, message):
        self.message = message        self.received_data = []
        self.logger = logging.getLogger('EchoClient')
        asynchat.async_chat.__init__(self)
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.logger.debug('connecting to %s', (host, port))
        self.connect((host, port))
        return
        
    def handle_connect(self):
        self.logger.debug('handle_connect()')
        # Send the command
        self.push('ECHO %d\n' % len(self.message))
        # Send the data
        self.push_with_producer(
            EchoProducer(self.message,
                         buffer_size=self.ac_out_buffer_size)
            )
        # We expect the data to come back as-is, 
        # so set a length-based terminator
        self.set_terminator(len(self.message))
    
    def collect_incoming_data(self, data):
        """Read an incoming message from the client
        and add it to the outgoing queue.
        """
        self.logger.debug(
            'collect_incoming_data() -> (%d) %r',
            len(data), data)
        self.received_data.append(data)

    def found_terminator(self):
        self.logger.debug('found_terminator()')
        received_message = ''.join(self.received_data)
        if received_message == self.message:
            self.logger.debug('RECEIVED COPY OF MESSAGE')
        else:
            self.logger.debug('ERROR IN TRANSMISSION')
            self.logger.debug('EXPECTED %r', self.message)
            self.logger.debug('RECEIVED %r', received_message)
        returnclass EchoProducer(asynchat.simple_producer):

    logger = logging.getLogger('EchoProducer')

    def more(self):
        response = asynchat.simple_producer.more(self)
        self.logger.debug('more() -> (%s bytes) %r',
                          len(response), response)
        return response

主程序:

import asyncoreimport loggingimport socketfrom asynchat_echo_server import EchoServerfrom asynchat_echo_client import EchoClient

logging.basicConfig(level=logging.DEBUG,
                    format='%(name)-11s: %(message)s',
                    )address = ('localhost', 0) # let the kernel give us a portserver = EchoServer(address)ip, port = server.address # find out what port we were givenmessage_data = open('lorem.txt', 'r').read()client = EchoClient(ip, port, message=message_data)asyncore.loop()

輔助文本:

# vi lorem.txt 
Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Donec
egestas, enim et consectetuer ullamcorper, lectus ligula rutrum
leo, a elementum elit tortor eu quam.

執行結果:

# ./asynchat_echo_main.py 
EchoClient : connecting to ('127.0.0.1', 58974)EchoClient : handle_connect()EchoHandler: collect_incoming_data() -> (8 bytes) 'ECHO 166'EchoHandler: found_terminator()EchoHandler: _process_command() 'ECHO 166'EchoProducer: more() -> (128 bytes) 'Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Donec\negestas, enim et consectetuer ullamcorper, lectus ligula rutrum\n'EchoHandler: collect_incoming_data() -> (128 bytes) 'Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Donec\negestas, enim et consectetuer ullamcorper, lectus ligula rutrum\n'EchoProducer: more() -> (38 bytes) 'leo, a elementum elit tortor eu quam.\n'EchoHandler: collect_incoming_data() -> (38 bytes) 'leo, a elementum elit tortor eu quam.\n'EchoHandler: found_terminator()EchoHandler: _process_message() echoing 'Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Donec\negestas, enim et consectetuer ullamcorper, lectus ligula rutrum\nleo, a elementum elit tortor eu quam.\n'EchoProducer: more() -> (0 bytes) ''EchoClient : collect_incoming_data() -> (128) 'Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Donec\negestas, enim et consectetuer ullamcorper, lectus ligula rutrum\n'EchoClient : collect_incoming_data() -> (38) 'leo, a elementum elit tortor eu quam.\n'EchoClient : found_terminator()EchoClient : RECEIVED COPY OF MESSAGE

參考資料

本文地址

 本文由用戶 jopen 自行上傳分享,僅供網友學習交流。所有權歸原作者,若您的權利被侵害,請聯系管理員。
 轉載本站原創文章,請注明出處,并保留原始鏈接、圖片水印。
 本站是一個以用戶分享為主的開源技術平臺,歡迎各類分享!