记录学习笔记、分享资源工具、交流技术思想、提升工作效率

python实现并发

后端 xiaomudk 6年前 (2015-07-01) 5154次浏览 0个评论
文章目录[隐藏]

1. 以进程的方式实现

a.使用map方法

import multiprocessing.dummy
impor time
a=[1,3,4,5,2,4,2,5,6,7,7,8,3,2,2]

p = multiprocessing.dummy.Pool(2)   #指定线程数

def func(x):
   time.sleep(2) 
   print "-----",x,time.time()
   return x+2

now=time.time()

print p.map(func,a)
print time.time()-now

输出结果:

----- 4 1436240616.97
----- 1 1436240616.97
----- 3 1436240618.97
----- 5 1436240618.97
----- 2 1436240620.97
----- 2 1436240620.97
----- 4 1436240622.98
----- 5 1436240622.98
----- 7 1436240624.98
----- 6 1436240624.98
----- 7 1436240626.98
----- 8 1436240626.98
----- 3 1436240628.99
----- 2 1436240628.99
----- 2 1436240630.99
[3, 5, 6, 7, 4, 6, 4, 7, 8, 9, 9, 10, 5, 4, 4]
16.025149107

这个方法有个缺点,就是参数需要一块给入,像上面就是把所有的参数都存放在a数组里.

b. 使用apply_async

import multiprocessing
import time

def func(msg):
    print msg,time.time()
    time.sleep(2)

if __name__ == "__main__":
    now=time.time()

    pool = multiprocessing.Pool(processes=2)
    for i in ['a','b','c','d','e','f']:
        msg = "hello %s" % (i)
        pool.apply_async(func,(msg,))

    pool.close()   # 关闭pool,使其不再接受新的任务。
    pool.join()    #主进程阻塞等待子进程的退出, join方法要在close之后使用
    print time.time()-now

输出结果:

hello a 1436240838.72
hello b 1436240838.72
hello c 1436240840.73
hello d 1436240840.73
hello e 1436240842.73
hello f 1436240842.73
6.03255391121

2. 以线程的方式实现

a. 使用threading

import threading
import time

# 线程函数
def func(msg):

    print msg,time.time()
    time.sleep(2)
    sem.release()     #释放一个锁

if __name__ =='__main__':

    now=time.time()
    maxThread=2
    #BoundedSemaphore 指定信息量的最大值 
    sem=threading.BoundedSemaphore(maxThread)

    for i in ['a','b','c','d','e','f']:

        sem.acquire()    #获取一个锁, 如果所有的锁都被使用,则会等待。
        msg = "hello %s" % (i)
        threading.Thread(target=func,args=(msg,)).start()

    #再次加下锁,保证上面的并发线程全部执行完(上面的锁被释放完)
    for a in range(maxThread):   
        sem.acquire();

    print time.time()-now

输出结果:

hello a 1436319425.23
hello b 1436319425.23
hello c 1436319427.23
hello d 1436319427.24
hello e 1436319429.23
hello f 1436319429.24
6.02088999748

Semaphore(信号量)是计算机科学史上最古老的同步指令之一。Semaphore管理一个内置的计数器,每当调用acquire()时-1,调用release() 时+1。计数器不能小于0;当计数器为0时,acquire()将阻塞线程至同步锁定状态,直到其他线程调用release()。

基于这个特点,Semaphore经常用来同步一些有“访客上限”的对象,比如连接池。

BoundedSemaphore 与Semaphore的唯一区别在于前者将在调用release()时检查计数器的值是否超过了计数器的初始值,如果超过了将抛出一个异常。

参考文章:
Python线程指南

b.使用threadpool

import threadpool
import time

def func(msg):
    print msg,time.time()
    time.sleep(2)
    return 0

if __name__ == "__main__":
    now=time.time()

    data=[]
    pool = threadpool.ThreadPool(2) 
    for i in range(10):
        data.append(i)

    print data
    requests = threadpool.makeRequests(func, data) 

    for req in requests:
        pool.putRequest(req)
    pool.wait() 

输出结果:

0 1437304454.73
1 1437304454.73
2 1437304456.73
3 1437304456.73
4 1437304458.73
5 1437304458.73
6 1437304460.73
7 1437304460.73
8 1437304462.73
9 1437304462.73

threadpool和上面进程的map方法一样,需要把需要执行的进程先放在队列里,最后一块执行。但是threadpool默认支持回调函数,具体可以参看官方文档。

参考文章:
threadpool官方文档
threadpool中文文档

c.使用threading和Queue

from Queue import Queue
import threading
import time

maxThread=2
p = Queue(maxThread)   #设置队列长度

def func(msg):
    print msg,time.time()
    time.sleep(2)
    p.get()            #从队列里取出一个对象

for i in range(10):
    p.put(i)            #往队列里写入一个对象
    threading.Thread(target=func,args=(i,)).start()

输出结果:

0 1437488952.58
1 1437488952.58
2 1437488954.58
3 1437488954.58
4 1437488956.58
5 1437488956.58
6 1437488958.59
7 1437488958.59
8 1437488960.59
9 1437488960.59

Queue队列默认就是堵塞,如果队列空了,或者队列满了都会堵塞。

参考文章:
Queue官方文档
Queue中文文档


本网站采用知识共享署名-相同方式共享 4.0 国际许可协议进行授权
转载请注明原文链接:python实现并发
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址