介绍
单线程里执行多个任务代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务 1 时遇到阻塞,就利用阻塞的时间去执行任务 2。如此,才能提高效率,这就用到了 gevent 模块。
协程是单线程下的并发,又称微线程,纤程。英文名 Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。
需要强调的是:
- python 的线程属于内核级别的,即由操作系统控制调度(如单线程遇到 io 或执行时间过长就会被迫交出 cpu 执行权限,切换其他线程运行)
- 单线程内开启协程,一旦遇到 io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非 io 操作的切换与效率无关) 对比操作系统控制线程的切换,用户在单线程内控制协程的切换
优点
- 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级。
- 单线程内就可以实现并发的效果,最大限度地利用 cpu。
缺点
- 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程。
- 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程。
特点
- 必须在只有一个单线程里实现并发。
- 修改共享数据不需加锁。
- 用户程序里自己保存多个控制流的上下文栈。
greenlet 模块
安装
pip3 install greenlet
实现状态切换
from greenlet import greenlet
def func1():
print('func1 start')
g2.switch()
print('func1 end')
g2.switch()
def func2():
print('func2 start')
g1.switch()
print('func2 end')
g1 = greenlet(func1)
g2 = greenlet(func2)
g1.switch()
'''
result:
func1 start
func2 start
func1 end
func2 end
'''
顺序执行与切换执行效率对比
#顺序执行
import time
def f1():
res=1
for i in range(100000000):
res+=i
def f2():
res=1
for i in range(100000000):
res*=i
start=time.time()
f1()
f2()
stop=time.time()
print('run time is %s' %(stop-start)) #10.985628366470337
#切换
from greenlet import greenlet
import time
def f1():
res=1
for i in range(100000000):
res+=i
g2.switch()
def f2():
res=1
for i in range(100000000):
res*=i
g1.switch()
start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 52.763017892837524
单纯的切换(在没有 io 的情况下或者没有重复开辟内存空间的操作),反而会降低程序的执行速度。
gevent 协程初使用
安装
pip3 install gevent
非协程和协程耗时对比
import gevent
import threading
import os
import time
def func1():
print('pid:{} threadid:{} from func1 | start'.format(os.getpid(), threading.get_ident()))
gevent.sleep(1)
print('pid:{} threadid:{} from func1 | end'.format(os.getpid(), threading.get_ident()))
def func2():
print('pid:{} threadid:{} from func2 | start'.format(os.getpid(), threading.get_ident()))
gevent.sleep(1)
print('pid:{} threadid:{} from func2 | end'.format(os.getpid(), threading.get_ident()))
start = time.time()
func1()
func2()
print('非协程耗时:{}'.format(time.time() - start))
start = time.time()
g1 = gevent.spawn(func1)
g2 = gevent.spawn(func2)
g1.join()
g2.join()
print('协程耗时:{}'.format(time.time() - start))
'''
result:
pid:12092 threadid:2828 from func1 | start
pid:12092 threadid:2828 from func1 | end
pid:12092 threadid:2828 from func2 | start
pid:12092 threadid:2828 from func2 | end
非协程耗时:2.008000135421753
pid:12092 threadid:2828 from func1 | start
pid:12092 threadid:2828 from func2 | start
pid:12092 threadid:2828 from func1 | end
pid:12092 threadid:2828 from func2 | end
协程耗时:1.0
'''
monkey-识别 io 阻塞
上例 gevent.sleep(2) 模拟的是 gevent 可以识别的 io 阻塞,而 time.sleep(2)
或其他的阻塞,gevent 是不能直接识别的需要用下面一行代码,打补丁,就可以识别了。
from gevent import monkey;monkey.patch_all() # 必须放到被打补丁者的前面
例:
from gevent import monkey;monkey.patch_all()
import gevent
import threading
import os
import time
def func1():
print('pid:{} threadid:{} from func1 | start'.format(os.getpid(), threading.get_ident()))
time.sleep(1)
print('pid:{} threadid:{} from func1 | end'.format(os.getpid(), threading.get_ident()))
def func2():
print('pid:{} threadid:{} from func2 | start'.format(os.getpid(), threading.get_ident()))
time.sleep(1)
print('pid:{} threadid:{} from func2 | end'.format(os.getpid(), threading.get_ident()))
start = time.time()
func1()
func2()
print('非协程耗时:{}'.format(time.time() - start))
start = time.time()
g1 = gevent.spawn(func1)
g2 = gevent.spawn(func2)
g1.join()
g2.join()
print('协程耗时:{}'.format(time.time() - start))
'''
result:
pid:7200 threadid:43458064 from func1 | start
pid:7200 threadid:43458064 from func1 | end
pid:7200 threadid:43458064 from func2 | start
pid:7200 threadid:43458064 from func2 | end
非协程耗时:2.004999876022339
pid:7200 threadid:55386728 from func1 | start
pid:7200 threadid:55387544 from func2 | start
pid:7200 threadid:55386728 from func1 | end
pid:7200 threadid:55387544 from func2 | end
协程耗时:1.000999927520752
'''
统计网页长度示例
from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time
def get_page(url):
print('GET: %s' % url)
response = requests.get(url)
if response.status_code == 200:
print('%d bytes received from %s' % (len(response.text), url))
start_time = time.time()
gevent.joinall([
gevent.spawn(get_page, 'https://www.python.org/'),
gevent.spawn(get_page, 'https://www.yahoo.com/'),
gevent.spawn(get_page, 'https://github.com/'),
])
stop_time = time.time()
print('run time is %s' % (stop_time - start_time))
'''
result:
GET: https://www.python.org/
GET: https://www.yahoo.com/
GET: https://github.com/
64127 bytes received from https://github.com/
48854 bytes received from https://www.python.org/
502701 bytes received from https://www.yahoo.com/
run time is 1.9760000705718994
'''
单线程下的 socket 并发
# server 端
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent
# 如果不想用money.patch_all()打补丁,可以用gevent自带的socket
# from gevent import socket
# s=socket.socket()
def server(server_ip, port):
s = socket(AF_INET, SOCK_STREAM)
s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
s.bind((server_ip, port))
s.listen(5)
while True:
conn, addr = s.accept()
gevent.spawn(talk, conn, addr)
def talk(conn, addr):
try:
while True:
res = conn.recv(1024)
print('client %s:%s msg: %s' % (addr[0], addr[1], res))
conn.send(res.upper())
except Exception as e:
print(e)
finally:
conn.close()
if __name__ == '__main__':
server('127.0.0.1', 8080)
# client 端
from threading import Thread
from socket import *
import threading
def client(server_ip, port):
c = socket(AF_INET, SOCK_STREAM) # 套接字对象一定要加到函数内,即局部名称空间内,放在函数外则被所有线程共享,则大家公用一个套接字对象,那么客户端端口永远一样了
c.connect((server_ip, port))
count = 0
while True:
c.send(('%s say hello %s' % (threading.current_thread().getName(), count)).encode('utf-8'))
msg = c.recv(1024)
print(msg.decode('utf-8'))
count += 1
if __name__ == '__main__':
for i in range(500):
t = Thread(target=client, args=('127.0.0.1', 8080))
t.start()
评论区