介绍
multiprocess 模块的完全模仿了 threading 模块的接口,二者在使用层面,有很大的相似性,例:
import threading
import time
def func():
print('start sub thread1')
print(threading.currentThread()) # <Thread(sub thread1, started 11832)>
time.sleep(10)
print('end sub thread1')
thread = threading.Thread(target=func)
thread.start()
print(thread.is_alive()) # True
print(thread.getName()) # Thread-1
thread.setName('sub thread1')
print(thread.getName()) # sub thread1
print(threading.currentThread()) # <_MainThread(MainThread, started 9708)>
print(threading.enumerate()) # [<_MainThread(MainThread, started 9708)>, <Thread(sub thread1, started 11832)>]
print(threading.activeCount()) # 2
Thread实例对象的方法
isAlive(): 返回线程是否活动的。
getName(): 返回线程名。
setName(): 设置线程名。
threading模块提供的一些方法:
threading.currentThread(): 返回当前的线程变量。
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
进程与线程的关系
1)地址空间和其它资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见。
2)通信:进程间通信IPC,线程间可以直接读写进程数据段(如全局变量)来进行通信——需要进程同步和互斥手段的辅助,以保证数据的一致性。
3)调度和切换:线程上下文切换比进程上下文切换要快得多。
4)在多线程操作系统中,进程不是一个可执行的实体。
5)进程是资源分配的最小单位,线程是 CPU 调度的最小单位,每一个进程中至少有一个线程。
使用
创建线程
from threading import Thread
def func():
print('from sub threading')
p = Thread(target=func)
p.start()
p.join()
# result:
# from sub threading
继承 Thread 创建线程
from threading import Thread
class MyThread(Thread):
def run(self):
print('from sub thread,threadid:{}'.format(self.ident))
my_thread = MyThread()
my_thread.start()
my_thread.join()
# result:
# from sub thread,threadid:9332
数据共享
同一进程内的线程之间共享进程内的数据。
from threading import Thread
def func():
global i
i = 1
i = 10
thread = Thread(target=func)
thread.start()
thread.join()
print(i) # 1
守护线程与守护进程的对比
无论是进程还是线程,都遵循:守护进程/线程会等待主进程/线程运行完毕后被销毁。
需要强调的是:运行完毕并非终止运行。
- 对主进程来说,运行完毕指的是主进程代码运行完毕
- 对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
# 守护进程
from multiprocessing import Process
import time
def notDaemonFunc():
print('start notDaemonFunc')
time.sleep(10)
print('end notDaemonFunc')
def daemonFunc():
print('start daemonFunc')
time.sleep(5)
print('end daemonFunc') # 主进程代码早已执行完毕没机会执行
if __name__ == '__main__':
notDaemonProcess = Process(target=notDaemonFunc)
notDaemonProcess.start()
damonProcess = Process(target=daemonFunc)
damonProcess.daemon = True
damonProcess.start()
time.sleep(1)
print('执行完毕')
# 主进程代码执行完毕时守护进程立马结束
# result:
# start notDaemonFunc
# start daemonFunc
# 执行完毕
# end notDaemonFunc
# 守护线程
from threading import Thread
import time
def notDaemonFunc():
print('start notDaemonFunc')
time.sleep(10)
print('end notDaemonFunc')
def daemonFunc():
print('start daemonFunc')
time.sleep(5)
print('end daemonFunc')
notDaemonThread = Thread(target=notDaemonFunc)
notDaemonThread.start()
damonThread = Thread(target=daemonFunc)
damonThread.daemon = True
damonThread.start()
time.sleep(1)
print('执行完毕')
# result:
# start notDaemonFunc
# start daemonFunc
# 执行完毕
# end daemonFunc
# end notDaemonFunc
Lock(锁)
同步锁
# 未加锁
from threading import Thread
import time
def work():
global n
temp = n
time.sleep(0.1)
n = temp - 1
if __name__ == '__main__':
n = 100
l = []
for i in range(100):
p = Thread(target=work)
l.append(p)
p.start()
for p in l:
p.join()
print(n) # 期望0 但结果可能为99 98
# 加锁
from threading import Thread, Lock
import time
def work(lock):
with lock:
global n
temp = n
time.sleep(0.1)
n = temp - 1
if __name__ == '__main__':
n = 100
l = []
lock = Lock()
for i in range(100):
p = Thread(target=work, args=(lock,))
l.append(p)
p.start()
for p in l:
p.join()
print(n) # 0
死锁
是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
# 吃面示例
import time
from threading import Thread, Lock
noodle_lock = Lock()
fork_lock = Lock()
def eat1(name):
noodle_lock.acquire()
print('%s 抢到了面条' % name)
time.sleep(1)
fork_lock.acquire()
print('%s 抢到了筷子' % name)
print('%s 吃面' % name)
fork_lock.release()
noodle_lock.release()
def eat2(name):
fork_lock.acquire()
print('%s 抢到了筷子' % name)
time.sleep(1)
noodle_lock.acquire()
print('%s 抢到了面条' % name)
print('%s 吃面' % name)
noodle_lock.release()
fork_lock.release()
t1 = Thread(target=eat1, args=('tom',))
t2 = Thread(target=eat2, args=('jerry',))
t1.start()
t2.start()
#result:
# tom 抢到了面条
# jerry 抢到了叉子
死锁的解决-递归锁
在 Python 中为了支持在同一线程中多次请求同一资源,提供了可重入锁 RLock
。这个 RLock
内部维护着一个 Lock
和一个 counter
变量,counter
记录了 acquire
的次数,从而使得资源可以被多次请求。直到一个线程所有的 acquire
都被 release
,其他的线程才能获得资源。
# 解决吃面死锁问题
import time
from threading import Thread, RLock
fork_lock = noodle_lock = RLock()
def eat1(name):
noodle_lock.acquire()
print('%s 抢到了面条' % name)
time.sleep(1)
fork_lock.acquire()
print('%s 抢到了筷子' % name)
print('%s 吃面' % name)
fork_lock.release()
noodle_lock.release()
def eat2(name):
fork_lock.acquire()
print('%s 抢到了筷子' % name)
time.sleep(1)
noodle_lock.acquire()
print('%s 抢到了面条' % name)
print('%s 吃面' % name)
noodle_lock.release()
fork_lock.release()
t1 = Thread(target=eat1, args=('tom',))
t2 = Thread(target=eat2, args=('jerry',))
t1.start()
t2.start()
# result:
# tom 抢到了面条
# tom 抢到了筷子
# tom 吃面
# jerry 抢到了筷子
# jerry 抢到了面条
# jerry 吃面
Semaphore(信号量)
from threading import Thread, Semaphore
import time
def func(num, s):
s.acquire()
print('编号:{} 正在执行,'.format(num), time.strftime("%Y-%m-%d %X"))
time.sleep(1)
s.release()
s = Semaphore(2)
[Thread(target=func, args=(i, s)).start() for i in range(10)]
# result:
# 编号:0 正在执行, 2018-09-12 20:33:09
# 编号:1 正在执行, 2018-09-12 20:33:09
# 编号:2 正在执行, 2018-09-12 20:33:10
# 编号:3 正在执行, 2018-09-12 20:33:10
# 编号:4 正在执行, 2018-09-12 20:33:11
# 编号:5 正在执行, 2018-09-12 20:33:11
# 编号:7 正在执行, 2018-09-12 20:33:12
# 编号:6 正在执行, 2018-09-12 20:33:12
# 编号:9 正在执行, 2018-09-12 20:33:13
# 编号:8 正在执行, 2018-09-12 20:33:13
Event(事件)
from threading import Thread, Event
import time
# 获取指定秒数后的时间
def get_addsec_time(sec=0):
return time.strftime("%Y-%m-%d %X", time.localtime(time.time() + sec))
def func(e):
print('func准备执行')
e.wait() # 当e.is_set()为True时执行后面代码
print('执行了,当前时间:{}'.format(time.strftime("%Y-%m-%d %X")))
e = Event()
print(e.is_set()) # False 初始是阻塞状态
e.set()
print(e.is_set()) # True 不阻塞
e.clear()
print(e.is_set()) # False 恢复阻塞
after_five_sec = get_addsec_time(5) # 5秒后的时间
Thread(target=func, args=(e,)).start()
while True:
print('当前时间:{}'.format(time.strftime("%Y-%m-%d %X")))
time.sleep(1)
if time.strftime("%Y-%m-%d %X") == after_five_sec:
print('5秒过去了')
e.set()
break;
# result:
# False
# True
# False
# func准备执行
# 当前时间:2018-09-12 20:37:27
# 当前时间:2018-09-12 20:37:28
# 当前时间:2018-09-12 20:37:29
# 当前时间:2018-09-12 20:37:30
# 当前时间:2018-09-12 20:37:31
# 5秒过去了
# 执行了,当前时间:2018-09-12 20:37:32
Condition(条件)
使得线程等待,只有满足某条件时,才释放 n 个线程。
Python 提供的 Condition
对象提供了对复杂线程同步问题的支持。Condition
被称为条件变量,除了提供与 Lock
类似的 acquire
和 release
方法外,还提供了 wait
和 notify
方法。线程首先 acquire
一个条件变量,然后判断一些条件。如果条件不满足则 wait
;如果条件满足,进行一些处理改变条件后,通过 notify
方法通知其他线程,其他处于 wait
状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。
import threading
def run(n):
con.acquire()
print('prepare')
con.wait()
print("run the thread: %s" % n)
con.release()
con = threading.Condition()
for i in range(5):
t = threading.Thread(target=run, args=(i,))
t.start()
while True:
inp = input('>>>')
if inp == 'q':
break
con.acquire()
con.notify(int(inp))
con.release()
print('------------------------')
#result:
# prepare
# prepare
# prepare
# prepare
# prepare
# >>>3
# ------------------------
# run the thread: 2
# run the thread: 1
# run the thread: 0
# >>>3
# ------------------------
# run the thread: 4
# run the thread: 3
# >>>q
Timer(定时器)
指定 n 秒后执行某个函数。
from threading import Timer
import time
def func():
print('in func,current time:{}'.format(time.strftime('%X')))
print('in main,current time:{}'.format(time.strftime('%X')))
# 5秒后执行
t = Timer(5, func)
t.start()
# result:
# in main,current time:20:53:52
# in func,current time:20:53:57
扩展
queen 模块
在上述 threading 模块知识点中并没有出现一个和 multiprocessing 模块中 Queen
对应的队列,这是因为 Python 本身给我们提供的 queen 就是线程安全的,而同个进程的线程之间资源是可以共享的,所以我们可以直接使用 queen。
queue.Queue(maxsize=0) 先进先出
import queue
q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
'''
result:
first
second
third
'''
queue.LifoQueue(maxsize=0) 后进先出
import queue
q = queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')
print(q.get())
print(q.get())
print(q.get())
'''
result:
third
second
first
'''
queue.PriorityQueue(maxsize=0) 优先级
import queue
q = queue.PriorityQueue()
# put进入一个元组,第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20, 'a'))
q.put((10, 'b'))
q.put((30, 'c'))
print(q.get())
print(q.get())
print(q.get())
'''
数字越小优先级越高,优先级高的优先出队
result:
(10, 'b')
(20, 'a')
(30, 'c')
'''
线程池之 concurrent.futures 模块
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor:进程池,提供异步调用
executor = ProcessPoolExecutor(max_workers=n):初始化进程池 max_workers指定池内最大进程数
executor.submit(fn, *args, **kwargs):异步提交任务
executor.map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作
executor.shutdown(wait=True) :相当于multiprocessing模块中的pool.close()+pool.join()操作,wait=True时,等待池内所有任务执行完毕回收完资源后才继续.wait=False时,立即返回,并不会等待池内的任务执行完毕,但不管wait参数为何值,整个程序都会等到所有任务执行完毕,submit和map必须在shutdown之前
executor.submit().result(timeout=None):取得结果
executor.submit().result(timeout=None):取得结果
executor.submit().add_done_callback(fn):给任务添加回调函数
创建进程池
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import os, time
def func(n):
print('{} is runing ,current time:{}'.format(os.getpid(), time.strftime('%X')))
time.sleep(1)
return 'pid:{} finished'.format(os.getpid())
if __name__ == '__main__':
executor = ProcessPoolExecutor(max_workers=2)
result_list = []
for i in range(1, 6):
result = executor.submit(func, i)
result_list.append(result)
executor.shutdown(True)
print('---------------get result-----------------')
for result in result_list:
print(result.result())
'''
result:
3444 is runing ,current time:21:32:39
2404 is runing ,current time:21:32:39
3444 is runing ,current time:21:32:40
2404 is runing ,current time:21:32:40
3444 is runing ,current time:21:32:41
---------------get result-----------------
pid:3444 finished
pid:2404 finished
pid:3444 finished
pid:2404 finished
pid:3444 finished
'''
map 使用
from concurrent.futures import ThreadPoolExecutor
import threading
import time
def task(n):
print('threadId:{} is runing,current time:{}'.format(threading.currentThread().ident, time.strftime('%X')))
time.sleep(1)
return n ** 2
if __name__ == '__main__':
executor = ThreadPoolExecutor(max_workers=2)
# for i in range(11):
# future=executor.submit(task,i)
executor.map(task, range(1, 5)) # map取代了for+submit
'''
result:
threadId:5324 is runing,current time:21:53:24
threadId:3444 is runing,current time:21:53:24
threadId:5324 is runing,current time:21:53:25
threadId:3444 is runing,current time:21:53:25
'''
回调函数
from concurrent.futures import ThreadPoolExecutor
import threading
import time
def callback_func(result):
print(result.result())
def func(i):
return i * i
executor = ThreadPoolExecutor(5)
[executor.submit(func, i).add_done_callback(callback_func) for i in range(1, 5)]
'''
result:
1
4
9
16
'''
评论区