Streams是高级异步/等待就绪原语,用于处理网络连接。Streams允许发送和接收数据而不使用回调或低级协议和传输.

以下是使用asynciostreams编写的TCP echo客户端示例:

import asyncioasync def tcp_echo_client(message):    reader, writer = await asyncio.open_connection(        "127.0.0.1", 8888)    print(f"Send: {message!r}")    writer.write(message.encode())    data = await reader.read(100)    print(f"Received: {data.decode()!r}")    print("Close the connection")    writer.close()    await writer.wait_closed()asyncio.run(tcp_echo_client("Hello World!"))

另请参阅下面的示例部分.

流函数

以下顶级asyncio函数可用于创建和使用流:

coroutine asyncio.open_connectionhost=None, port=None, *, loop=None, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None

建立网络连接并返回一对(reader, writer) objects.

返回readerwriter对象是StreamReaderStreamWriterclasses.

loop参数是可选的,当从协程等待这个函数时,总是可以自动确定.

limit确定使用的缓冲区大小限制StreamReader实例。默认情况下limit设置为64 KiB.

其余的参数直接传递给loop.create_connection().

版本3.7中的新功能:ssl_handshake_timeout参数.

coroutine asyncio.start_server (client_connected_cb, host=None, port=None, *, loop=None, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

启动套接字服务器.

每当建立新的客户端连接时,都会调用client_connected_cb回调。它收到了(reader, writer) pairas两个参数,StreamReaderStreamWriter classes.

client_connected_cb可以是一个简单的可调用或协程功能;如果它是一个协程功能,它将自动安排为Task.

loop参数是可选的,并且当从协程中等待这个方法时,总是可以自动确定.

limit确定被使用的缓冲区大小限制StreamReader实例。默认情况下limit设置为64 KiB.

其余的参数直接传递给loop.create_server().

版本3.7中的新功能:ssl_handshake_timeoutstart_servingparameters.

Unix套接字

coroutine asyncio.open_unix_connectionpath=None, *, loop=None, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None

建立一个Unix套接字连接并返回一对(reader, writer).

相近 open_connection()但是在Unix套接字上运行.

另见loop.create_unix_connection().

可用性:Unix .

版本3.7中新增: ssl_handshake_timeout参数

更改版本3.7: path参数现在可以是类似路径的对象

coroutine asyncio.start_unix_serverclient_connected_cb, path=None, *, loop=None, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True

启动一个Unix套接字服务器.

类似于start_server()但适用于Unix套接字.

参见loop.create_unix_server().

文档//可用性:Unix.

新版本3.7: ssl_handshake_timeoutstart_serving参数

更改版本3.7: path参数现在可以是路径类对象.


StreamReader

class asyncio.StreamReader

表示读者对象提供API从IO流中读取数据.

不建议直接实例化StreamReader对象;使用open_connection()start_server()instead.

coroutine read(n=-1)

读到n字节。如果没有提供n,或设置为-1读到EOF并返回所有读取的字节.

如果收到EOF并且内部缓冲区为空,则返回一个空的bytes object.

coroutine readline(

读一行,其中“line”是一个字节发送序列\n.

如果收到EOF并且找不到\n,则该方法返回部分读取数据.

如果收到EOF并且内部缓冲区为空,则返回一个空的bytes object.

coroutine readexactly (n)

准确地说nbytes

//如果在IncompleteReadError之前达到EOF,则调出n。使用IncompleteReadError.partial属性获取部分读取的数据.

coroutine readuntil (separator=b”\n”)

从流中读取数据,直到找到separator.

成功后,数据和分隔符将从内部缓冲区中删除(消耗)。返回的数据最后将包括这些参与者.

如果读取的数据量超过配置的流限制,则会引发LimitOverrunError异常,并将数据保留在内部缓冲区中并可以读取再次

如果在找到完整的分隔符之前达到EOF,则会引发IncompleteReadError异常,并重置内部缓冲区。IncompleteReadError.partial属性可以包含分隔符的一部分.

新版本3.5.2.

at_eof ()

返回True如果缓冲区是空的并且feed_eof()被调用的话

StreamWriter

class asyncio.StreamWriter

表示一个提供API的编写器对象,用于写入IO流的数据.

不建议实例化StreamWriter objectsdirectly;使用open_connection()start_server()instead.

can_write_eof ()

返回True如果基础传输支持write_eof()方法,False否则

write_eof// ()

关闭流后的写入结束缓冲的writedata是冲的.

transport

返回底层的asyncio transport.

get_extra_info(name, default=None)

获取可选的运输信息;看BaseTransport.get_extra_info()了解详情.

write (data)

写到data到溪边.

该方法不受流量控制。调用write()应该跟drain().

writelinesdata

写一个列表(或任何可迭代的)字节到流.

该方法不受流量控制。调用writelines()之后应该drain().

coroutine drain

等到适合继续写入流中。例如:

writer.write(data)await writer.drain()

这是一种与undersIO写缓冲区交互的流控制方法。当缓冲区的大小达到高水印时,drain()阻塞直到缓冲区的大小耗尽到低水位线并且可以恢复写入。当没有什么可以等待时,drain()立即返回.

close ( )

关闭溪流

is_closing

返回True如果溪流关闭或正在关闭的过程中

版本3.7.

coroutine wait_closed

等到溪流关闭.

应该在close()等到底层连接关闭.

版本3.7.

例子

使用流的TCP echo客户端

TCP echo客户端使用asyncio.open_connection()function:

import asyncioasync def tcp_echo_client(message):    reader, writer = await asyncio.open_connection(        "127.0.0.1", 8888)    print(f"Send: {message!r}")    writer.write(message.encode())    data = await reader.read(100)    print(f"Received: {data.decode()!r}")    print("Close the connection")    writer.close()asyncio.run(tcp_echo_client("Hello World!"))

参见

TCP echo客户端协议示例使用低级loop.create_connection()方法

TCP回送服务器使用流

TCP回显服务器使用asyncio.start_server()功能:

import asyncioasync def handle_echo(reader, writer):    data = await reader.read(100)    message = data.decode()    addr = writer.get_extra_info("peername")    print(f"Received {message!r} from {addr!r}")    print(f"Send: {message!r}")    writer.write(data)    await writer.drain()    print("Close the connection")    writer.close()async def main():    server = await asyncio.start_server(        handle_echo, "127.0.0.1", 8888)    addr = server.sockets[0].getsockname()    print(f"Serving on {addr}")    async with server:        await server.serve_forever()asyncio.run(main())

另见

TCP echo服务器协议示例使用loop.create_server()方法

获取HTTP

简单示例查询命令行上传递的URL的HTTP头:

import asyncioimport urllib.parseimport sysasync def print_http_headers(url):    url = urllib.parse.urlsplit(url)    if url.scheme == "https":        reader, writer = await asyncio.open_connection(            url.hostname, 443, ssl=True)    else:        reader, writer = await asyncio.open_connection(            url.hostname, 80)    query = (        f"HEAD {url.path or "/"} HTTP/1.0\r\n"        f"Host: {url.hostname}\r\n"        f"\r\n"    )    writer.write(query.encode("latin-1"))    while True:        line = await reader.readline()        if not line:            break        line = line.decode("latin1").rstrip()        if line:            print(f"HTTP header> {line}")    # Ignore the body, close the socket    writer.close()url = sys.argv[1]asyncio.run(print_http_headers(url))

用法:

python example.py http://example.com/path/page.html

或使用HTTPS:

python example.py https://example.com/path/page.html

注册一个打开的套接字以等待使用流的数据

使用open_connection()函数

import asyncioimport socketasync def wait_for_data():    # Get a reference to the current event loop because    # we want to access low-level APIs.    loop = asyncio.get_running_loop()    # Create a pair of connected sockets.    rsock, wsock = socket.socketpair()    # Register the open socket to wait for data.    reader, writer = await asyncio.open_connection(sock=rsock)    # Simulate the reception of data from the network    loop.call_soon(wsock.send, "abc".encode())    # Wait for data    data = await reader.read(100)    # Got data, we are done: close the socket    print("Received:", data.decode())    writer.close()    # Close the second socket    wsock.close()asyncio.run(wait_for_data())

等待套接字接收数据的协同程序还

使用协议注册一个打开的套接字等待数据示例使用低级协议和loop.create_connection()方法。

观察读取事件的文件描述符示例使用低级loop.add_reader()方法来查看文件描述符.

评论被关闭。