本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名原文链接~~~

Python基础(32)之threading线程模块

微信搜索 zze_coding 或扫描 👉 二维码关注我的微信公众号获取更多资源推送:


介绍

multiprocess 模块的完全模仿了 threading 模块的接口,二者在使用层面,有很大的相似性,例:

import threading
import time


def func():
    print('start sub thread1')
    print(threading.currentThread())  # <Thread(sub thread1, started 11832)>
    time.sleep(10)
    print('end sub thread1')


thread = threading.Thread(target=func)
thread.start()
print(thread.is_alive())  # True
print(thread.getName())  # Thread-1
thread.setName('sub thread1')
print(thread.getName())  # sub thread1
print(threading.currentThread())  # <_MainThread(MainThread, started 9708)>
print(threading.enumerate())  # [<_MainThread(MainThread, started 9708)>, <Thread(sub thread1, started 11832)>]
print(threading.activeCount())  # 2
Thread实例对象的方法
  isAlive(): 返回线程是否活动的。
  getName(): 返回线程名。
  setName(): 设置线程名。

threading模块提供的一些方法:
  threading.currentThread(): 返回当前的线程变量。
  threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
  threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

进程与线程的关系

1)地址空间和其它资源(如打开文件):进程间相互独立,同一进程的各线程间共享。某进程内的线程在其它进程不可见。
2)通信:进程间通信IPC,线程间可以直接读写进程数据段(如全局变量)来进行通信——需要进程同步和互斥手段的辅助,以保证数据的一致性。
3)调度和切换:线程上下文切换比进程上下文切换要快得多。
4)在多线程操作系统中,进程不是一个可执行的实体。
5)进程是资源分配的最小单位,线程是 CPU 调度的最小单位,每一个进程中至少有一个线程。

使用

创建线程

from threading import Thread

def func():
    print('from sub threading')

p = Thread(target=func)
p.start()
p.join()
# result:
    # from sub threading

继承 Thread 创建线程

from threading import Thread

class MyThread(Thread):
    def run(self):
        print('from sub thread,threadid:{}'.format(self.ident))

my_thread = MyThread()
my_thread.start()
my_thread.join()

# result:
    # from sub thread,threadid:9332

数据共享

同一进程内的线程之间共享进程内的数据。

from threading import Thread

def func():
    global i
    i = 1

i = 10
thread = Thread(target=func)
thread.start()
thread.join()
print(i)  # 1

守护线程与守护进程的对比

无论是进程还是线程,都遵循:守护进程/线程会等待主进程/线程运行完毕后被销毁。

需要强调的是:运行完毕并非终止运行。

  1. 对主进程来说,运行完毕指的是主进程代码运行完毕
  2. 对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕
# 守护进程
from multiprocessing import Process
import time


def notDaemonFunc():
    print('start notDaemonFunc')
    time.sleep(10)
    print('end notDaemonFunc')


def daemonFunc():
    print('start daemonFunc')
    time.sleep(5)
    print('end daemonFunc')  # 主进程代码早已执行完毕没机会执行


if __name__ == '__main__':
    notDaemonProcess = Process(target=notDaemonFunc)
    notDaemonProcess.start()
    damonProcess = Process(target=daemonFunc)
    damonProcess.daemon = True
    damonProcess.start()
    time.sleep(1)
    print('执行完毕')

# 主进程代码执行完毕时守护进程立马结束

# result:
    # start notDaemonFunc
    # start daemonFunc
    # 执行完毕
    # end notDaemonFunc
# 守护线程
from threading import Thread
import time


def notDaemonFunc():
    print('start notDaemonFunc')
    time.sleep(10)
    print('end notDaemonFunc')


def daemonFunc():
    print('start daemonFunc')
    time.sleep(5)
    print('end daemonFunc')


notDaemonThread = Thread(target=notDaemonFunc)
notDaemonThread.start()
damonThread = Thread(target=daemonFunc)
damonThread.daemon = True
damonThread.start()
time.sleep(1)
print('执行完毕')

# result:
    # start notDaemonFunc
    # start daemonFunc
    # 执行完毕
    # end daemonFunc
    # end notDaemonFunc

Lock(锁)

同步锁

# 未加锁
from threading import Thread
import time

def work():
    global n
    temp = n
    time.sleep(0.1)
    n = temp - 1

if __name__ == '__main__':
    n = 100
    l = []
    for i in range(100):
        p = Thread(target=work)
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n)  # 期望0 但结果可能为99 98
# 加锁
from threading import Thread, Lock
import time

def work(lock):
    with lock:
        global n
        temp = n
        time.sleep(0.1)
        n = temp - 1

if __name__ == '__main__':
    n = 100
    l = []
    lock = Lock()
    for i in range(100):
        p = Thread(target=work, args=(lock,))
        l.append(p)
        p.start()
    for p in l:
        p.join()

    print(n)  # 0

死锁

是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

# 吃面示例
import time
from threading import Thread, Lock

noodle_lock = Lock()
fork_lock = Lock()


def eat1(name):
    noodle_lock.acquire()
    print('%s 抢到了面条' % name)
    time.sleep(1)
    fork_lock.acquire()
    print('%s 抢到了筷子' % name)
    print('%s 吃面' % name)
    fork_lock.release()
    noodle_lock.release()


def eat2(name):
    fork_lock.acquire()
    print('%s 抢到了筷子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 抢到了面条' % name)
    print('%s 吃面' % name)
    noodle_lock.release()
    fork_lock.release()


t1 = Thread(target=eat1, args=('tom',))
t2 = Thread(target=eat2, args=('jerry',))
t1.start()
t2.start()
#result:
    # tom 抢到了面条
    # jerry 抢到了叉子

死锁的解决-递归锁

在 Python 中为了支持在同一线程中多次请求同一资源,提供了可重入锁 RLock。这个 RLock 内部维护着一个 Lock 和一个 counter 变量,counter 记录了 acquire 的次数,从而使得资源可以被多次请求。直到一个线程所有的 acquire 都被 release,其他的线程才能获得资源。

# 解决吃面死锁问题
import time
from threading import Thread, RLock

fork_lock = noodle_lock = RLock()


def eat1(name):
    noodle_lock.acquire()
    print('%s 抢到了面条' % name)
    time.sleep(1)
    fork_lock.acquire()
    print('%s 抢到了筷子' % name)
    print('%s 吃面' % name)
    fork_lock.release()
    noodle_lock.release()


def eat2(name):
    fork_lock.acquire()
    print('%s 抢到了筷子' % name)
    time.sleep(1)
    noodle_lock.acquire()
    print('%s 抢到了面条' % name)
    print('%s 吃面' % name)
    noodle_lock.release()
    fork_lock.release()


t1 = Thread(target=eat1, args=('tom',))
t2 = Thread(target=eat2, args=('jerry',))
t1.start()
t2.start()

# result:
    # tom 抢到了面条
    # tom 抢到了筷子
    # tom 吃面
    # jerry 抢到了筷子
    # jerry 抢到了面条
    # jerry 吃面

Semaphore(信号量)

from threading import Thread, Semaphore
import time


def func(num, s):
    s.acquire()
    print('编号:{} 正在执行,'.format(num), time.strftime("%Y-%m-%d %X"))
    time.sleep(1)
    s.release()


s = Semaphore(2)
[Thread(target=func, args=(i, s)).start() for i in range(10)]

# result:
    # 编号:0 正在执行, 2018-09-12 20:33:09
    # 编号:1 正在执行, 2018-09-12 20:33:09
    # 编号:2 正在执行, 2018-09-12 20:33:10
    # 编号:3 正在执行, 2018-09-12 20:33:10
    # 编号:4 正在执行, 2018-09-12 20:33:11
    # 编号:5 正在执行, 2018-09-12 20:33:11
    # 编号:7 正在执行, 2018-09-12 20:33:12
    # 编号:6 正在执行, 2018-09-12 20:33:12
    # 编号:9 正在执行, 2018-09-12 20:33:13
    # 编号:8 正在执行, 2018-09-12 20:33:13

Event(事件)

from threading import Thread, Event
import time


# 获取指定秒数后的时间
def get_addsec_time(sec=0):
    return time.strftime("%Y-%m-%d %X", time.localtime(time.time() + sec))


def func(e):
    print('func准备执行')
    e.wait()  # 当e.is_set()为True时执行后面代码
    print('执行了,当前时间:{}'.format(time.strftime("%Y-%m-%d %X")))


e = Event()
print(e.is_set())  # False 初始是阻塞状态
e.set()
print(e.is_set())  # True 不阻塞
e.clear()
print(e.is_set())  # False 恢复阻塞
after_five_sec = get_addsec_time(5)  # 5秒后的时间
Thread(target=func, args=(e,)).start()
while True:
    print('当前时间:{}'.format(time.strftime("%Y-%m-%d %X")))
    time.sleep(1)
    if time.strftime("%Y-%m-%d %X") == after_five_sec:
        print('5秒过去了')
        e.set()
        break;

# result:
    # False
    # True
    # False
    # func准备执行
    # 当前时间:2018-09-12 20:37:27
    # 当前时间:2018-09-12 20:37:28
    # 当前时间:2018-09-12 20:37:29
    # 当前时间:2018-09-12 20:37:30
    # 当前时间:2018-09-12 20:37:31
    # 5秒过去了
    # 执行了,当前时间:2018-09-12 20:37:32

Condition(条件)

使得线程等待,只有满足某条件时,才释放 n 个线程。

Python 提供的 Condition 对象提供了对复杂线程同步问题的支持。Condition 被称为条件变量,除了提供与 Lock 类似的 acquirerelease 方法外,还提供了 waitnotify 方法。线程首先 acquire 一个条件变量,然后判断一些条件。如果条件不满足则 wait;如果条件满足,进行一些处理改变条件后,通过 notify 方法通知其他线程,其他处于 wait 状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。

import threading


def run(n):
    con.acquire()
    print('prepare')
    con.wait()
    print("run the thread: %s" % n)
    con.release()

con = threading.Condition()
for i in range(5):
    t = threading.Thread(target=run, args=(i,))
    t.start()

while True:
    inp = input('>>>')
    if inp == 'q':
        break
    con.acquire()
    con.notify(int(inp))
    con.release()
    print('------------------------')

#result:
    # prepare
    # prepare
    # prepare
    # prepare
    # prepare
    # >>>3
    # ------------------------
    # run the thread: 2
    # run the thread: 1
    # run the thread: 0
    # >>>3
    # ------------------------
    # run the thread: 4
    # run the thread: 3
    # >>>q

Timer(定时器)

指定 n 秒后执行某个函数。

from threading import Timer
import time


def func():
    print('in func,current time:{}'.format(time.strftime('%X')))


print('in main,current time:{}'.format(time.strftime('%X')))
# 5秒后执行
t = Timer(5, func)
t.start()

# result:
    # in main,current time:20:53:52
    # in func,current time:20:53:57

扩展

queen 模块

在上述 threading 模块知识点中并没有出现一个和 multiprocessing 模块中 Queen 对应的队列,这是因为 Python 本身给我们提供的 queen 就是线程安全的,而同个进程的线程之间资源是可以共享的,所以我们可以直接使用 queen。

queue.Queue(maxsize=0) 先进先出

import queue

q=queue.Queue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())

'''
result:
    first
    second
    third
'''

queue.LifoQueue(maxsize=0) 后进先出

import queue

q = queue.LifoQueue()
q.put('first')
q.put('second')
q.put('third')

print(q.get())
print(q.get())
print(q.get())
'''
result:
    third
    second
    first
'''

queue.PriorityQueue(maxsize=0) 优先级

import queue

q = queue.PriorityQueue()
# put进入一个元组,第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
q.put((20, 'a'))
q.put((10, 'b'))
q.put((30, 'c'))

print(q.get())
print(q.get())
print(q.get())
'''
数字越小优先级越高,优先级高的优先出队
result:
    (10, 'b')
    (20, 'a')
    (30, 'c')
'''

线程池之 concurrent.futures 模块

concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor:进程池,提供异步调用

executor = ProcessPoolExecutor(max_workers=n):初始化进程池 max_workers指定池内最大进程数
executor.submit(fn, *args, **kwargs):异步提交任务
executor.map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作
executor.shutdown(wait=True)  :相当于multiprocessing模块中的pool.close()+pool.join()操作,wait=True时,等待池内所有任务执行完毕回收完资源后才继续.wait=False时,立即返回,并不会等待池内的任务执行完毕,但不管wait参数为何值,整个程序都会等到所有任务执行完毕,submit和map必须在shutdown之前
executor.submit().result(timeout=None):取得结果
executor.submit().result(timeout=None):取得结果
executor.submit().add_done_callback(fn):给任务添加回调函数

创建进程池

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

import os, time


def func(n):
    print('{} is runing ,current time:{}'.format(os.getpid(), time.strftime('%X')))
    time.sleep(1)
    return 'pid:{} finished'.format(os.getpid())


if __name__ == '__main__':
    executor = ProcessPoolExecutor(max_workers=2)
    result_list = []
    for i in range(1, 6):
        result = executor.submit(func, i)
        result_list.append(result)
    executor.shutdown(True)
    print('---------------get result-----------------')
    for result in result_list:
        print(result.result())

'''
result:
    3444 is runing ,current time:21:32:39
    2404 is runing ,current time:21:32:39
    3444 is runing ,current time:21:32:40
    2404 is runing ,current time:21:32:40
    3444 is runing ,current time:21:32:41
    ---------------get result-----------------
    pid:3444 finished
    pid:2404 finished
    pid:3444 finished
    pid:2404 finished
    pid:3444 finished
'''

map 使用

from concurrent.futures import ThreadPoolExecutor
import threading
import time


def task(n):
    print('threadId:{} is runing,current time:{}'.format(threading.currentThread().ident, time.strftime('%X')))
    time.sleep(1)
    return n ** 2


if __name__ == '__main__':
    executor = ThreadPoolExecutor(max_workers=2)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task, range(1, 5))  # map取代了for+submit

'''
result:
    threadId:5324 is runing,current time:21:53:24
    threadId:3444 is runing,current time:21:53:24
    threadId:5324 is runing,current time:21:53:25
    threadId:3444 is runing,current time:21:53:25
'''

回调函数

from concurrent.futures import ThreadPoolExecutor
import threading
import time


def callback_func(result):
    print(result.result())


def func(i):
    return i * i


executor = ThreadPoolExecutor(5)
[executor.submit(func, i).add_done_callback(callback_func) for i in range(1, 5)]

'''
result:
    1
    4
    9
    16
'''

如果这篇文章对您有帮助,可点击下方链接分享给你的朋友们😋,如果遇到问题欢迎评论、留言~~~😇

评论

公众号:zze_coding

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×