python里的阻塞函数,Python 阻塞

python里并发执行协程时部分阻塞超时怎么办

在前面的例子里学习了并发地执行多个协程来下载图片,也许其中一个协程永远下载不了,一直阻塞,这时怎么办呢?

创新互联从2013年开始,先为巧家等服务建站,巧家等地企业,进行企业商务咨询服务。为巧家企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。

碰到这种需求时不要惊慌,可以使用wait()里的timeout参数来设置等待时间,也就是从这个函数开始运行算起,如果时间到达协程没有执行完成,就可以不再等它们了,直接从wait()函数里返回,返回之后就可以判断那些没有执行成功的,可以把这些协程取消掉。例子如下:

[python] view plain copy

import asyncio

async def phase(i):

print('in phase {}'.format(i))

try:

await asyncio.sleep(0.1 * i)

except asyncio.CancelledError:

print('phase {} canceled'.format(i))

raise

else:

print('done with phase {}'.format(i))

return 'phase {} result'.format(i)

async def main(num_phases):

print('starting main')

phases = [

phase(i)

for i in range(num_phases)

]

print('waiting 0.1 for phases to complete')

completed, pending = await asyncio.wait(phases, timeout=0.1)

print('{} completed and {} pending'.format(

len(completed), len(pending),

))

# Cancel remaining tasks so they do not generate errors

# as we exit without finishing them.

if pending:

print('canceling tasks')

for t in pending:

t.cancel()

print('exiting main')

event_loop = asyncio.get_event_loop()

try:

event_loop.run_until_complete(main(3))

finally:

event_loop.close()

结果输出如下:

starting main

waiting 0.1 for phases to complete

in phase 0

in phase 2

in phase 1

done with phase 0

1 completed and 2 pending

canceling tasks

exiting main

phase 1 canceled

phase 2 canceled

python多进程中队列不空时阻塞,求解为什么

最近接触一个项目,要在多个虚拟机中运行任务,参考别人之前项目的代码,采用了多进程来处理,于是上网查了查python中的多进程

一、先说说Queue(队列对象)

Queue是python中的标准库,可以直接import 引用,之前学习的时候有听过著名的“先吃先拉”与“后吃先吐”,其实就是这里说的队列,队列的构造的时候可以定义它的容量,别吃撑了,吃多了,就会报错,构造的时候不写或者写个小于1的数则表示无限多

import Queue

q = Queue.Queue(10)

向队列中放值(put)

q.put(‘yang')

q.put(4)

q.put([‘yan','xing'])

在队列中取值get()

默认的队列是先进先出的

q.get()

‘yang'

q.get()

4

q.get()

[‘yan', ‘xing']

当一个队列为空的时候如果再用get取则会堵塞,所以取队列的时候一般是用到

get_nowait()方法,这种方法在向一个空队列取值的时候会抛一个Empty异常

所以更常用的方法是先判断一个队列是否为空,如果不为空则取值

队列中常用的方法

Queue.qsize() 返回队列的大小

Queue.empty() 如果队列为空,返回True,反之False

Queue.full() 如果队列满了,返回True,反之False

Queue.get([block[, timeout]]) 获取队列,timeout等待时间

Queue.get_nowait() 相当Queue.get(False)

非阻塞 Queue.put(item) 写入队列,timeout等待时间

Queue.put_nowait(item) 相当Queue.put(item, False)

二、multiprocessing中使用子进程概念

from multiprocessing import Process

可以通过Process来构造一个子进程

p = Process(target=fun,args=(args))

再通过p.start()来启动子进程

再通过p.join()方法来使得子进程运行结束后再执行父进程

from multiprocessing import Process

import os

# 子进程要执行的代码

def run_proc(name):

print 'Run child process %s (%s)...' % (name, os.getpid())

if __name__=='__main__':

print 'Parent process %s.' % os.getpid()

p = Process(target=run_proc, args=('test',))

print 'Process will start.'

p.start()

p.join()

print 'Process end.'

三、在multiprocessing中使用pool

如果需要多个子进程时可以考虑使用进程池(pool)来管理

from multiprocessing import Pool

from multiprocessing import Pool

import os, time

def long_time_task(name):

print 'Run task %s (%s)...' % (name, os.getpid())

start = time.time()

time.sleep(3)

end = time.time()

print 'Task %s runs %0.2f seconds.' % (name, (end - start))

if __name__=='__main__':

print 'Parent process %s.' % os.getpid()

p = Pool()

for i in range(5):

p.apply_async(long_time_task, args=(i,))

print 'Waiting for all subprocesses done...'

p.close()

p.join()

print 'All subprocesses done.'

pool创建子进程的方法与Process不同,是通过

p.apply_async(func,args=(args))实现,一个池子里能同时运行的任务是取决你电脑的cpu数量,如我的电脑现在是有4个cpu,那会子进程task0,task1,task2,task3可以同时启动,task4则在之前的一个某个进程结束后才开始

上面的程序运行后的结果其实是按照上图中1,2,3分开进行的,先打印1,3秒后打印2,再3秒后打印3

代码中的p.close()是关掉进程池子,是不再向里面添加进程了,对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

当时也可以是实例pool的时候给它定义一个进程的多少

如果上面的代码中p=Pool(5)那么所有的子进程就可以同时进行

三、多个子进程间的通信

多个子进程间的通信就要采用第一步中说到的Queue,比如有以下的需求,一个子进程向队列中写数据,另外一个进程从队列中取数据,

#coding:gbk

from multiprocessing import Process, Queue

import os, time, random

# 写数据进程执行的代码:

def write(q):

for value in ['A', 'B', 'C']:

print 'Put %s to queue...' % value

q.put(value)

time.sleep(random.random())

# 读数据进程执行的代码:

def read(q):

while True:

if not q.empty():

value = q.get(True)

print 'Get %s from queue.' % value

time.sleep(random.random())

else:

break

if __name__=='__main__':

# 父进程创建Queue,并传给各个子进程:

q = Queue()

pw = Process(target=write, args=(q,))

pr = Process(target=read, args=(q,))

# 启动子进程pw,写入:

pw.start()

# 等待pw结束:

pw.join()

# 启动子进程pr,读取:

pr.start()

pr.join()

# pr进程里是死循环,无法等待其结束,只能强行终止:

print

print '所有数据都写入并且读完'

四、关于上面代码的几个有趣的问题

if __name__=='__main__':

# 父进程创建Queue,并传给各个子进程:

q = Queue()

p = Pool()

pw = p.apply_async(write,args=(q,))

pr = p.apply_async(read,args=(q,))

p.close()

p.join()

print

print '所有数据都写入并且读完'

如果main函数写成上面的样本,本来我想要的是将会得到一个队列,将其作为参数传入进程池子里的每个子进程,但是却得到

RuntimeError: Queue objects should only be shared between processes through inheritance

的错误,查了下,大意是队列对象不能在父进程与子进程间通信,这个如果想要使用进程池中使用队列则要使用multiprocess的Manager类

if __name__=='__main__':

manager = multiprocessing.Manager()

# 父进程创建Queue,并传给各个子进程:

q = manager.Queue()

p = Pool()

pw = p.apply_async(write,args=(q,))

time.sleep(0.5)

pr = p.apply_async(read,args=(q,))

p.close()

p.join()

print

print '所有数据都写入并且读完'

这样这个队列对象就可以在父进程与子进程间通信,不用池则不需要Manager,以后再扩展multiprocess中的Manager类吧

关于锁的应用,在不同程序间如果有同时对同一个队列操作的时候,为了避免错误,可以在某个函数操作队列的时候给它加把锁,这样在同一个时间内则只能有一个子进程对队列进行操作,锁也要在manager对象中的锁

#coding:gbk

from multiprocessing import Process,Queue,Pool

import multiprocessing

import os, time, random

# 写数据进程执行的代码:

def write(q,lock):

lock.acquire() #加上锁

for value in ['A', 'B', 'C']:

print 'Put %s to queue...' % value

q.put(value)

lock.release() #释放锁

# 读数据进程执行的代码:

def read(q):

while True:

if not q.empty():

value = q.get(False)

print 'Get %s from queue.' % value

time.sleep(random.random())

else:

break

if __name__=='__main__':

manager = multiprocessing.Manager()

# 父进程创建Queue,并传给各个子进程:

q = manager.Queue()

lock = manager.Lock() #初始化一把锁

p = Pool()

pw = p.apply_async(write,args=(q,lock))

pr = p.apply_async(read,args=(q,))

p.close()

p.join()

print

print '所有数据都写入并且读完'

python wait()函数问题

看了你发的函数:

def Wait(self):

self._app.MainLoop()

看名字应该是启动了阻塞循环,去处理app的请求,这个就是需要一直运行的,因为一旦停止了,你的app请求就没发处理了。

如果你需要启动后再执行的别的程序,可以使用多进程,把这个启动放在别的进程里去执行。

如果解决了您的问题请采纳!

如果未解决请继续追问

python2.7怎么实现异步

改进之前

之前,我的查询步骤很简单,就是:

前端提交查询请求 -- 建立数据库连接 -- 新建游标 -- 执行命令 -- 接受结果 -- 关闭游标、连接

这几大步骤的顺序执行。

这里面当然问题很大:

建立数据库连接实际上就是新建一个套接字。这是进程间通信的几种方法里,开销最大的了。

在“执行命令”和“接受结果”两个步骤中,线程在阻塞在数据库内部的运行过程中,数据库连接和游标都处于闲置状态。

这样一来,每一次查询都要顺序的新建数据库连接,都要阻塞在数据库返回结果的过程中。当前端提交大量查询请求时,查询效率肯定是很低的。

第一次改进

之前的模块里,问题最大的就是第一步——建立数据库连接套接字了。如果能够一次性建立连接,之后查询能够反复服用这个连接就好了。

所以,首先应该把数据库查询模块作为一个单独的守护进程去执行,而前端app作为主进程响应用户的点击操作。那么两条进程怎么传递消息呢?翻了几天Python文档,终于构思出来:用队列queue作为生产者(web前端)向消费者(数据库后端)传递任务的渠道。生产者,会与SQL命令一起,同时传递一个管道pipe的连接对象,作为任务完成后,回传结果的渠道。确保,任务的接收方与发送方保持一致。

作为第二个问题的解决方法,可以使用线程池来并发获取任务队列中的task,然后执行命令并回传结果。

第二次改进

第一次改进的效果还是很明显的,不用任何测试手段。直接点击页面链接,可以很直观地感觉到反应速度有很明显的加快。

但是对于第二个问题,使用线程池还是有些欠妥当。因为,CPython解释器存在GIL问题,所有线程实际上都在一个解释器进程里调度。线程稍微开多一点,解释器进程就会频繁的切换线程,而线程切换的开销也不小。线程多一点,甚至会出现“抖动”问题(也就是刚刚唤醒一个线程,就进入挂起状态,刚刚换到栈帧或内存的上下文,又被换回内存或者磁盘),效率大大降低。也就是说,线程池的并发量很有限。

试过了多进程、多线程,只能在单个线程里做文章了。

Python中的asyncio库

Python里有大量的协程库可以实现单线程内的并发操作,比如Twisted、Gevent等等。Python官方在3.5版本里提供了asyncio库同样可以实现协程并发。asyncio库大大降低了Python中协程的实现难度,就像定义普通函数那样就可以了,只是要在def前面多加一个async关键词。async def函数中,需要阻塞在其他async def函数的位置前面可以加上await关键词。

import asyncio

async def wait():

await asyncio.sleep(2)

async def execute(task):

process_task(task)

await wait()

continue_job()

async def函数的执行稍微麻烦点。需要首先获取一个loop对象,然后由这个对象代为执行async def函数。

loop = asyncio.get_event_loop()

loop.run_until_complete(execute(task))

loop.close()

loop在执行execute(task)函数时,如果遇到await关键字,就会暂时挂起当前协程,转而去执行其他阻塞在await关键词的协程,从而实现协程并发。

不过需要注意的是,run_until_complete()函数本身是一个阻塞函数。也就是说,当前线程会等候一个run_until_complete()函数执行完毕之后,才会继续执行下一部函数。所以下面这段代码并不能并发执行。

for task in task_list:

loop.run_until_complete(task)

对与这个问题,asyncio库也有相应的解决方案:gather函数。

loop = asyncio.get_event_loop()

tasks = [asyncio.ensure_future(execute(task))

for task in task_list]

loop.run_until_complete(asyncio.gather(*tasks))

loop.close()

当然了,async def函数的执行并不只有这两种解决方案,还有call_soon与run_forever的配合执行等等,更多内容还请参考官方文档。

Python下的I/O多路复用

协程,实际上,也存在上下文切换,只不过开销很轻微。而I/O多路复用则完全不存在这个问题。

目前,Linux上比较火的I/O多路复用API要算epoll了。Tornado,就是通过调用C语言封装的epoll库,成功解决了C10K问题(当然还有Pypy的功劳)。

在Linux里查文档,可以看到epoll只有三类函数,调用起来比较方便易懂。

创建epoll对象,并返回其对应的文件描述符(file descriptor)。

int epoll_create(int size);

int epoll_create1(int flags);

控制监听事件。第一个参数epfd就对应于前面命令创建的epoll对象的文件描述符;第二个参数表示该命令要执行的动作:监听事件的新增、修改或者删除;第三个参数,是要监听的文件对应的描述符;第四个,代表要监听的事件。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

等候。这是一个阻塞函数,调用者会等候内核通知所注册的事件被触发。

int epoll_wait(int epfd, struct epoll_event *events,

int maxevents, int timeout);

int epoll_pwait(int epfd, struct epoll_event *events,

int maxevents, int timeout,

const sigset_t *sigmask);

在Python的select库里:

select.epoll()对应于第一类创建函数;

epoll.register(),epoll.unregister(),epoll.modify()均是对控制函数epoll_ctl的封装;

epoll.poll()则是对等候函数epoll_wait的封装。

Python里epoll相关API的最大问题应该是在epoll.poll()。相比于其所封装的epoll_wait,用户无法手动指定要等候的事件,也就是后者的第二个参数struct epoll_event *events。没法实现精确控制。因此只能使用替代方案:select.select()函数。

根据Python官方文档,select.select(rlist, wlist, xlist[, timeout])是对Unix系统中select函数的直接调用,与C语言API的传参很接近。前三个参数都是列表,其中的元素都是要注册到内核的文件描述符。如果想用自定义类,就要确保实现了fileno()方法。

其分别对应于:

rlist: 等候直到可读

wlist: 等候直到可写

xlist: 等候直到异常。这个异常的定义,要查看系统文档。

select.select(),类似于epoll.poll(),先注册文件和事件,然后保持等候内核通知,是阻塞函数。

实际应用

Psycopg2库支持对异步和协程,但和一般情况下的用法略有区别。普通数据库连接支持不同线程中的不同游标并发查询;而异步连接则不支持不同游标的同时查询。所以异步连接的不同游标之间必须使用I/O复用方法来协调调度。

所以,我的大致实现思路是这样的:首先并发执行大量协程,从任务队列中提取任务,再向连接池请求连接,创建游标,然后执行命令,并返回结果。在获取游标和接受查询结果之前,均要阻塞等候内核通知连接可用。

其中,连接池返回连接时,会根据引用连接的协程数量,返回负载最轻的连接。这也是自己定义AsyncConnectionPool类的目的。

我的代码位于:bottle-blog/dbservice.py

存在问题

当然了,这个流程目前还一些问题。

首先就是每次轮询拿到任务之后,都会走这么一个流程。

获取连接 -- 新建游标 -- 执行任务 -- 关闭游标 -- 取消连接引用

本来,最好的情况应该是:在轮询之前,就建好游标;在轮询时,直接等候内核通知,执行相应任务。这样可以减少轮询时的任务量。但是如果协程提前对应好连接,那就不能保证在获取任务时,保持各连接负载均衡了。

所以这一块,还有工作要做。

还有就是epoll没能用上,有些遗憾。

以后打算写点C语言的内容,或者用Python/C API,或者用Ctypes包装共享库,来实现epoll的调用。

最后,请允许我吐槽一下Python的epoll相关文档:简直太弱了!!!必须看源码才能弄清楚功能。

python:如何以非阻塞的方式读

代码是这样的:

subp = subprocess.Popen(["d:/T1.exe"], shell=True, stdout=subprocess.PIPE, bufsize=0)

subp.stdout.read()

但是发现read和readline函数是阻塞方式调用的,一定要subprocess运行结束才能返回数据。

新闻名称:python里的阻塞函数,Python 阻塞
链接地址:https://www.cdcxhl.com/article26/hsihcg.html

成都网站建设公司_创新互联,为您提供移动网站建设面包屑导航网页设计公司品牌网站建设企业建站网站策划

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联

网站托管运营