You are here:  Home » Python » 传输和协议 – 异步I / O(Python教程)(参考资料)

传输和协议

前言

传输和协议由低级事件loopAPIs使用,例如loop.create_connection()。它们使用基于回调的编程风格,并实现网络或IPC协议(例如HTTP)的高性能实现.

本质上,传输和协议应仅用于库和框架,而不应用于高级异步应用.

本文档页面涵盖了传输和协议.

简介

在最高级别,传输涉及how字节传输,而协议确定which字节传输(在某种程度上是时间).

说同样的事情的另一种方式:传输是套接字(或类似的I / O端点)的抽象,而协议是抽象的从传输的角度看应用程序.

另一种观点是传输和协议接口共同定义了一个使用网络I / O和进程I / O的抽象接口.

传输和协议对象之间始终存在1:1的关系:协议调用传输方法发送数据,而传输调用协议方法传递已接收的数据.

大多数面向连接的事件循环方法(例如loop.create_connection())通常接受一个protocol_factory参数,用于创建一个Protocol对象,用于接受连接,用Transport对象表示。这种方法通常返回一个(transport, protocol).

的元组内容

本文档页面包含以下部分:

  • 传输部分记录了asyncio BaseTransport,ReadTransport, WriteTransport, Transport,DatagramTransportSubprocessTransport类。
  • 协议部分记录了asyncio BaseProtocol,Protocol, BufferedProtocol,DatagramProtocolSubprocessProtocol classes.
  • 示例部分展示了如何使用传输,协议和低级事件loop APIs

Transports

Transports是由asyncio提供的类,以抽象各种通信渠道.

运输对象总是由anref实例化: asyncio event loop< asyncio-event-loop> .

asyncio实现TCPUDP,SSL和子进程管道的传输。传输上可用的方法取决于传输的种类.

运输类是不是线程安全的.

运输层次结构

class asyncio.BaseTransport

所有运输的基类。包含allasyncio传输共享的方法.

class asyncio.WriteTransportBaseTransport

用于只写连接的基本传输.

的实例WriteTransport上课从返回loop.connect_write_pipe()事件循环方法,也可用于与子进程相关的方法,如loop.subprocess_exec().

class asyncio.ReadTransportBaseTransport

只读连接的基本传输.

的实例ReadTransportclass是从loop.connect_read_pipe()事件循环方法返回的,也可以用于子进程相关的方法,如loop.subprocess_exec().

class asyncio.TransportWriteTransport, ReadTransport

表示双向传输的接口,如aTCP连接.

用户不直接实例化传输;他们称之为autility功能,传递协议工厂和创建传输和协议所需的其他信息.

的实例Transport类是从loop.create_connection(),loop.create_unix_connection(),loop.create_server(), loop.sendfile()等等

class asyncio.DatagramTransportBaseTransport

数据报(UDP)连接的传输.

的实例DatagramTransport上课从返回loop.create_datagram_endpoint()事件循环法.

class asyncio.SubprocessTransportBaseTransport

表示父项与子系统OS进程之间连接的抽象.

的实例SubprocessTransportclass返回fromevent循环方法loop.subprocess_shell()loop.subprocess_exec().

Base Transport

BaseTransport.close

关闭运输.

如果传输具有outgoingdata的缓冲区,则将异步刷新缓冲的数据。不会再收到任何数据。刷新所有缓冲数据后,协议的protocol.connection_lost()None作为其论点

BaseTransport.is_closing

返回True如果运输正在关闭或关闭.

BaseTransport.get_extra_infoname, default=None

返回有关传输或底层资源使用的信息.

name是一个字符串,表示要获取的传输特定信息.

default是信息不可用时返回的值,或者传输不支持使用给定的第三方事件循环实现或在当前平台上查询它.

例如,以下代码尝试获取传输的underlyingsocket对象:

sock = transport.get_extra_info("socket")if sock is not None:    print(sock.getsockopt(...))

可以在某些传输上查询的信息类别:

  • socket:
    • "peername":socket连接的远程地址,socket.socket.getpeername()None出错时)的结果
    • "socket"socket.socket实例
    • "sockname":socket的自己的地址,socket.socket.getsockname()
  • SSL socket的结果:
    • "compression":压缩算法用作astring,如果连接未压缩,则为None;的结果 ssl.SSLSocket.compression()
    • "cipher":一个三值元组,包含正在使用的thecipher的名称,定义使用的SSL协议的版本,以及正在使用的秘密位的数量;的结果ssl.SSLSocket.cipher()
    • "peercert":同行证书;的结果ssl.SSLSocket.getpeercert()
    • "sslcontext"ssl.SSLContext例如
    • "ssl_object"ssl.SSLObject要么ssl.SSLSocket例如
  • 管:
    • "pipe":管道对象
  • 子:
BaseTransport.set_protocolprotocol

设置新协议.

切换协议只应在记录两个协议以支持切换时才能完成.

BaseTransport.get_protocol

返回当前协议.

只读运输

ReadTransport.is_reading

返回True如果运输工具正在接收新数据

新版本3.7.

ReadTransport.pause_reading ()

因为运输的接收端。没有数据会传递到协议的protocol.data_received()方法,直到resume_reading()被称为

更改版本3.7:该方法是幂等的,即当传输已经暂停或关闭时可以调用它.

ReadTransport.resume_reading (

恢复接收端。协议protocol.data_received()如果有一些数据可供阅读,将再次调用方法.

更改版本3.7:该方法是幂等的,即当传输已经读取时可以调用它.

Write-only Transports

WriteTransport.abort

立即关闭传输,无需等待待处理的操作完成。缓冲的数据将丢失。将不再收到数据。协议protocol.connection_lost()方法最终将与None作为其论点

WriteTransport.can_write_eof

返回True如果运输支持write_eof(), False如果不。

WriteTransport.get_write_buffer_size

返回传输使用的输出缓冲区的当前大小.

WriteTransport.get_write_buffer_limits()

获取highlow水印写流量控制。返回at//le (low, high) lowhigh是正数.

使用set_write_buffer_limits()设置限制.

新版本3.4.2.

WriteTransport.set_write_buffer_limits (high=None, low=None

设置highlow水印用于写入流量控制.

当调用协议的protocol.pause_writing()protocol.resume_writing()方法时,这两个值(以字节数为单位)控制。如果指定,低水印必须小于或等于高水印。不是high也不是low可以是否定的

pause_writing()当缓冲区大小大于或等于时调用high值。如果写入已暂停,则resume_writing()在缓冲区大小小于或等于low值时调用

默认值是特定于实现的。如果仅给出高水印,则低水印默认为小于或等于高水印的实现特定值。设置high为零迫使low为零,并且只要缓冲区变为非空,就会调用pause_writing()。将low设置为零会导致resume_writing()仅在缓冲区为空时才被调用。对任一限制使用零通常是最优的,因为它减少了同时进行I / O和计算的机会.

使用get_write_buffer_limits()来获得限制.

WriteTransport.writedata

写一些data字节到transport.

这个方法不会阻塞;它缓冲数据并安排它异步发送.

WriteTransport.writelines (list_of_data)

写一个列表(或任何可迭代的)数据字节到传输。这是在功能上等同于调用write()由迭代产生的eachelement,但可以更有效地实现.

WriteTransport.write_eof

在刷新所有缓冲的数据后关闭传输的写端。可能仍然会收到数据.

这个方法可以提升NotImplementedError如果运输(例如SSL)不支持半封闭连接.

数据报运输

DatagramTransport.sendtodata, addr=None

发送 data给出的远程对等字节数addr(依赖于运输的目标地址)。如果addrNone,数据将被发送到创建传输时给出的目标地址.

这种方法不会阻止;它缓冲数据并安排它以异步方式发送出来.

DatagramTransport.abort ( )

立即关闭传输,无需等待挂起操作完成。缓冲的数据将丢失。将收到更多数据。协议protocol.connection_lost()方法最终将用None作为其论点

子进程传输

SubprocessTransport.get_pid

将子进程进程id作为整数返回

SubprocessTransport.get_pipe_transportfd

返回与整数文件描述符对应的通信管道的传输fd

  • 0:标准输入的可读流传输(stdin)或None如果使用stdin=PIPE
  • 1创建子进程:标准输出的可写流式传输(stdout),或None如果子进程不是用stdout=PIPE
  • 2创建的:标准错误(stderr)或None的可写流式传输,如果子进程不是用stderr=PIPE
  • 其他fdNone
SubprocessTransport.get_returncode创建的(

返回子进程返回码为整数或None如果没有返回,这类似于subprocess.Popen.returncode属性.

SubprocessTransport.kill

杀掉subprocess.

在POSIX系统上,该函数将SIGKILL发送到子进程。在Windows上,此方法是别名terminate().

参见subprocess.Popen.kill().

SubprocessTransport.send_signalsignal

signal数字发送到子进程,如subprocess.Popen.send_signal().

SubprocessTransport.terminate

停止子过程.

在POSIX系统上,此方法将SIGTERM发送到子进程。在Windows上,Windows API函数TerminateProcess()被称为tostop子进程.

参见subprocess.Popen.terminate().

SubprocessTransport.close

通过调用kill()方法。

如果子进程尚未返回,则关闭stdin, stdout,和stderrpipes.

协议

asyncio提供了一组应该用于实现网络协议的抽象基类。那些课程要和运输一起使用.

抽象基础协议类的子类可以实现一些口语方法。所有这些方法都是回调:它们在某些事件上由传输调用,例如当收到某些数据时。相应的传输应该调用基本协议方法.

基本协议

class asyncio.BaseProtocol

基本协议与所有协议共享的方法.

class asyncio.ProtocolBaseProtocol)

实现流协议的基类(TCP,Unix套接字,等等)。

class asyncio.BufferedProtocolBaseProtocol

用手工控制接收缓冲区实现流协议的基类.

class asyncio.DatagramProtocol(BaseProtocol)

实现数据报(UDP)协议的基类.

class asyncio.SubprocessProtocolBaseProtocol

用于实现与子进程通信的协议的基类(单向管道).

基本协议

所有asyncio协议都可以实现基本协议回调.

连接回调

在所有协议上调用连接回调,正好是pera成功连接一次。所有其他协议回调只能在这两种方法之间进行.

BaseProtocol.connection_made (transport

连接时调用.

transport参数是代表连接的传输。该协议负责存储其传输的参考.

BaseProtocol.connection_lost (exc)

连接丢失或关闭时调用.

参数是异常对象或None。后者意味着接收到常规的EOF,或连接的这一侧被连接或关闭.

流控制回叫

可以通过传输来调用流控制回调来暂停或恢复协议执行的写入.

请参阅set_write_buffer_limits()方法更详细.

BaseProtocol.pause_writing

当运输缓冲区超过高水位时调用.

BaseProtocol.resume_writing()

当运输缓冲区排到低水位线以下时调用.

如果缓冲区大小等于高水位线,则不调用pause_writing():缓冲区大小必须严格超过

//相反,resume_writing()当缓冲区大小等于或低于低水印时调用。这些最终条件非常重要,以确保标记为零时,事情按预期进行.

Streaming Protocols

事件方法,如loop.create_server(),loop.create_unix_server(), loop.create_connection(),loop.create_unix_connection(), loop.connect_accepted_socket(),loop.connect_read_pipe()loop.connect_write_pipe()接受返回流协议的工厂.

Protocol.data_received (data)

当有些数据时调用收到了。data是一个非空的bytesobject包含传入的数据.

数据是缓冲,分块还是重新组装取决于传输。通常,您不应该依赖于特定的语义,而应该使您的解析具有通用性和灵活性。但是,数据总是以正确的顺序接收.

当连接打开时,该方法可被称为任意次数.

但是,最多调用protocol.eof_received()一旦。一旦 eof_received()被调用,data_received()就不再调用了

Protocol.eof_received// ()

当另一端发出信号时它会调用它不再发送任何数据(例如通过调用transport.write_eof(),如果另一端也使用了asyncio).

这个方法可能返回一个假值(包括None),在这种情况下,运输将自行关闭。相反,如果此方法返回atrue值,则使用的协议确定是否关闭传输。由于默认实现返回None,它会隐式关闭连接.

某些传输(包括SSL)不支持半封闭连接,在这种情况下,从此方法返回true将导致连接关闭.

状态机:

start -> connection_made    [-> data_received]*    [-> eof_received]?-> connection_lost -> end

缓冲流式协议

版本3.7中的新功能:重要:这已添加到Python 3.7 on a provisional basis的asyncio中!这是一个可以在Python 3.8.

中完全更改或删除的实验性API。可以使用任何支持Streaming Protocols的事件循环方法来实现.

BufferedProtocol实现允许显式手动分配和控制接收缓冲区。然后,事件循环可以使用协议提供的缓冲区来避免不必要的数据副本。这可以显着提高接收大量数据的协议的性能。复杂的协议实现可以显着减少缓冲区分配的数量.

BufferedProtocol实例上调用以下回调:

BufferedProtocol.get_buffersizehint

被调用以分配新的接收缓冲区.

sizehint是返回缓冲区的建议最小大小。可以接受返回更小或更大的缓冲区sizehint建议。设置为-1时,缓冲区大小可以是任意的。返回一个零大小的缓冲区是错误的.

get_buffer()必须返回一个实现缓冲协议的对象.

BufferedProtocol.buffer_updatednbytes

用接收到的数据更新缓冲区时调用.

nbytes是写入缓冲区的总字节数.

BufferedProtocol.eof_received (

参见protocol.eof_received()方法的文档

get_buffer()可以在连接期间被调用任意次数。但是,protocol.eof_received()最多被调用一次,如果被调用,get_buffer()buffer_updated()将不会被调用.

状态机:

start -> connection_made    [-> get_buffer        [-> buffer_updated]?    ]*    [-> eof_received]?-> connection_lost -> end

数据报协议

数据报协议实例应该通过传递给loop.create_datagram_endpoint()方法的协议实例构建

DatagramProtocol.datagram_received// (data, addr)

收到数据报时调用。data是包含传入数据的字节对象。addr是发送数据的对等体的地址;确切的格式取决于传输.

DatagramProtocol.error_receivedexc

当先前的发送或接收操作引发OSError. exc是个 OSErrorinstance.

当传输(例如UDP)检测到数据报无法传递给其接收者时,在极少数条件下调用此方法。但在许多情况下,无法传送的数据报将被静默地丢弃.

注意

在BSD系统(macOS,FreeBSD等)上,数据报协议不支持流控制,因为没有可靠的方法来检测写入太多数据包导致的sendfailures .

套接字始终显示为“就绪”,并且丢弃多余的数据包。OSErrorerrno设置为errno.ENOBUFS市长可能不会被提出;如果它被提高,它将报告DatagramProtocol.error_received()但是否则被忽略.

子进程协议

数据报协议实例应该通过传递给loop.subprocess_exec()loop.subprocess_shell()方法。

SubprocessProtocol.pipe_data_receivedfd, data

当子进程将数据写入其stdout或stderrpipe时调用.

fd是管道的整数文件描述符.

data是包含接收数据的非空字节对象.

SubprocessProtocol.pipe_connection_lostfd, exc

当其中一个与子进程通信的管道关闭时调用.

fd是已关闭的整数文件描述符.

SubprocessProtocol.process_exited

当子进程退出时调用.

示例

TCP回声服务器

创建一个TCP回应服务器loop.create_server()方法,发送回传数据,并关闭连接:

import asyncioclass EchoServerProtocol(asyncio.Protocol):    def connection_made(self, transport):        peername = transport.get_extra_info("peername")        print("Connection from {}".format(peername))        self.transport = transport    def data_received(self, data):        message = data.decode()        print("Data received: {!r}".format(message))        print("Send: {!r}".format(message))        self.transport.write(data)        print("Close the client socket")        self.transport.close()async def main():    # Get a reference to the event loop as we plan to use    # low-level APIs.    loop = asyncio.get_running_loop()    server = await loop.create_server(        lambda: EchoServerProtocol(),        "127.0.0.1", 8888)    async with server:        await server.serve_forever()asyncio.run(main())

参见

TCP echo服务器使用流示例使用高级asyncio.start_server()功能

TCP Echo客户端

一个TCP echo客户端使用loop.create_connection()方法,发送数据,并等待直到连接关闭:

import asyncioclass EchoClientProtocol(asyncio.Protocol):    def __init__(self, message, on_con_lost, loop):        self.message = message        self.loop = loop        self.on_con_lost = on_con_lost    def connection_made(self, transport):        transport.write(self.message.encode())        print("Data sent: {!r}".format(self.message))    def data_received(self, data):        print("Data received: {!r}".format(data.decode()))    def connection_lost(self, exc):        print("The server closed the connection")        self.on_con_lost.set_result(True)async def main():    # Get a reference to the event loop as we plan to use    # low-level APIs.    loop = asyncio.get_running_loop()    on_con_lost = loop.create_future()    message = "Hello World!"    transport, protocol = await loop.create_connection(        lambda: EchoClientProtocol(message, on_con_lost, loop),        "127.0.0.1", 8888)    # Wait until the protocol signals that the connection    # is lost and close the transport.    try:        await on_con_lost    finally:        transport.close()asyncio.run(main())

参见

TCP echo客户端使用流示例使用高级asyncio.open_connection()功能.

UDP Echo Server

UDP echo服务器使用loop.create_datagram_endpoint()方法发送回收到的数据:

import asyncioclass EchoServerProtocol:    def connection_made(self, transport):        self.transport = transport    def datagram_received(self, data, addr):        message = data.decode()        print("Received %r from %s" % (message, addr))        print("Send %r to %s" % (message, addr))        self.transport.sendto(data, addr)async def main():    print("Starting UDP server")    # Get a reference to the event loop as we plan to use    # low-level APIs.    loop = asyncio.get_running_loop()    # One protocol instance will be created to serve all    # client requests.    transport, protocol = await loop.create_datagram_endpoint(        lambda: EchoServerProtocol(),        local_addr=("127.0.0.1", 9999))    try:        await asyncio.sleep(3600)  # Serve for 1 hour.    finally:        transport.close()asyncio.run(main())

UDP Echo Client

UDP echo客户端,使用loop.create_datagram_endpoint()方法,在收到答案时发送数据并关闭传输

import asyncioclass EchoClientProtocol:    def __init__(self, message, loop):        self.message = message        self.loop = loop        self.transport = None        self.on_con_lost = loop.create_future()    def connection_made(self, transport):        self.transport = transport        print("Send:", self.message)        self.transport.sendto(self.message.encode())    def datagram_received(self, data, addr):        print("Received:", data.decode())        print("Close the socket")        self.transport.close()    def error_received(self, exc):        print("Error received:", exc)    def connection_lost(self, exc):        print("Connection closed")        self.on_con_lost.set_result(True)async def main():    # Get a reference to the event loop as we plan to use    # low-level APIs.    loop = asyncio.get_running_loop()    message = "Hello World!"    transport, protocol = await loop.create_datagram_endpoint(        lambda: EchoClientProtocol(message, loop),        remote_addr=("127.0.0.1", 9999))    try:        await protocol.on_con_lost    finally:        transport.close()asyncio.run(main())

连接现有套接字

等待套接字使用带有协议的loop.create_connection()方法接收数据:

import asyncioimport socketclass MyProtocol(asyncio.Protocol):    def __init__(self, loop):        self.transport = None        self.on_con_lost = loop.create_future()    def connection_made(self, transport):        self.transport = transport    def data_received(self, data):        print("Received:", data.decode())        # We are done: close the transport;        # connection_lost() will be called automatically.        self.transport.close()    def connection_lost(self, exc):        # The socket has been closed        self.on_con_lost.set_result(True)async def main():    # Get a reference to the event loop as we plan to use    # low-level APIs.    loop = asyncio.get_running_loop()    # Create a pair of connected sockets    rsock, wsock = socket.socketpair()    # Register the socket to wait for data.    transport, protocol = await loop.create_connection(        lambda: MyProtocol(loop), sock=rsock)    # Simulate the reception of data from the network.    loop.call_soon(wsock.send, "abc".encode())    try:        await protocol.on_con_lost    finally:        transport.close()        wsock.close()asyncio.run(main())

也可以看看

观察读取事件的文件描述符例子使用低级loop.add_reader()注册FD的方法

注册一个打开的套接字来使用流等待数据示例使用协程中open_connection()函数创建的高级流.

loop.subprocess_exec()和SubprocessProtocol

用于获取asubprocess输出并等待子进程退出的子进程协议示例.

子进程由loop.subprocess_exec()方法创建:

import asyncioimport sysclass DateProtocol(asyncio.SubprocessProtocol):    def __init__(self, exit_future):        self.exit_future = exit_future        self.output = bytearray()    def pipe_data_received(self, fd, data):        self.output.extend(data)    def process_exited(self):        self.exit_future.set_result(True)async def get_date():    # Get a reference to the event loop as we plan to use    # low-level APIs.    loop = asyncio.get_running_loop()    code = "import datetime; print(datetime.datetime.now())"    exit_future = asyncio.Future(loop=loop)    # Create the subprocess controlled by DateProtocol;    # redirect the standard output into a pipe.    transport, protocol = await loop.subprocess_exec(        lambda: DateProtocol(exit_future),        sys.executable, "-c", code,        stdin=None, stderr=None)    # Wait for the subprocess exit using the process_exited()    # method of the protocol.    await exit_future    # Close the stdout pipe.    transport.close()    # Read the output which was collected by the    # pipe_data_received() method of the protocol.    data = bytes(protocol.output)    return data.decode("ascii").rstrip()if sys.platform == "win32":    asyncio.set_event_loop_policy(        asyncio.WindowsProactorEventLoopPolicy())date = asyncio.run(get_date())print(f"Current date: {date}")

参见同样的例子使用高级API编写