队列

asyncio队列的设计类似于queue模块的类。尽管asyncio队列不是线程安全的,但它们被设计为专门用于异步/等待代码.

请注意,asyncio队列的方法没有timeout参数;使用asyncio.wait_for()使用atimeout.

另见下面的示例部分.

队列

class asyncio.Queuemaxsize=0, *, loop=None

先进先出(FIFO)队列

如果maxsize小于或等于零,队列大小是无限的。如果是大于的整数0, 然后await put()当队列达到maxsize直到一个项被get().

删除不同于标准库线程queue,队列的大小总是已知的,可以通过调用qsize()方法返回

//这个课是不是线程安全的.

maxsize

队列中允许的项目数量

empty

返回True如果队列是空的,False除此以外。

full

返回True如果有 maxsize队列中的项目

如果队列用maxsize=0(默认值)初始化,则full()永远不会返回True.

coroutine get

从队列中删除并返回一个项目。如果队列是空的,请等待一个项目可用.

get_nowait ()

如果一个项目立即可用,则返回一个项目,否则加注QueueEmpty.

coroutine join

阻止,直到队列中的所有项目都被收到并处理完毕.

每当项目添加到队列时,未完成任务的计数就会增加。每当消费者协程调用时,计数就会下降task_done()表示该项目已被检索,并且其上的所有工作都已完成。当未完成任务的数量减少到零时,join() unblocks.

coroutine put (item )

将项目放入队列。如果队列已满,请在添加项目之前等待afree slot可用.

put_nowait (item)

将一个项目放入队列中而不阻塞.

如果没有立即可用的空闲插槽,请抬起QueueFull.

qsize

返回队列中的项目数.

task_done(

表示以前排队的任务已完成.

由队列使用者使用。对于每个get()使用tofetch一个任务,随后调用task_done()告诉thequeue任务处理完成.

如果join()目前正在阻止,它将在处理完所有物品时恢复(意味着task_done()对于put()进入队列).

提高ValueError如果被召唤的次数多于队列中放置的项目的数量.

优先级队列

class asyncio.PriorityQueue

Queue的变种按优先级顺序检索条目(最低的第一个).

条目通常是形式的元组(priority_number, data).

LIFO Queue

class asyncio.LifoQueue

Queue的一个变体,它首先检索最近的addedentries(last in,first out).

例外

exception asyncio.QueueEmpty

get_nowait()方法调用空队列时引发此异常.

exception asyncio.QueueFull

put_nowait()方法被调用到队列已经到达maxsize.

时引发异常例子

队列可用于在几个并发任务之间分配工作负载:

import asyncioimport randomimport timeasync def worker(name, queue):    while True:        # Get a "work item" out of the queue.        sleep_for = await queue.get()        # Sleep for the "sleep_for" seconds.        await asyncio.sleep(sleep_for)        # Notify the queue that the "work item" has been processed.        queue.task_done()        print(f"{name} has slept for {sleep_for:.2f} seconds")async def main():    # Create a queue that we will use to store our "workload".    queue = asyncio.Queue()    # Generate random timings and put them into the queue.    total_sleep_time = 0    for _ in range(20):        sleep_for = random.uniform(0.05, 1.0)        total_sleep_time += sleep_for        queue.put_nowait(sleep_for)    # Create three worker tasks to process the queue concurrently.    tasks = []    for i in range(3):        task = asyncio.create_task(worker(f"worker-{i}", queue))        tasks.append(task)    # Wait until the queue is fully processed.    started_at = time.monotonic()    await queue.join()    total_slept_for = time.monotonic() - started_at    # Cancel our worker tasks.    for task in tasks:        task.cancel()    # Wait until all worker tasks are cancelled.    await asyncio.gather(*tasks, return_exceptions=True)    print("====")    print(f"3 workers slept in parallel for {total_slept_for:.2f} seconds")    print(f"total expected sleep time: {total_sleep_time:.2f} seconds")asyncio.run(main())

评论被关闭。