Python基础(33)之gevent协程模块

Python基础(33)之gevent协程模块

微信搜索 zze_coding 或扫描 👉 二维码关注我的微信公众号获取更多资源推送:

介绍

单线程里执行多个任务代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务 1 时遇到阻塞,就利用阻塞的时间去执行任务 2。如此,才能提高效率,这就用到了 gevent 模块。

协程是单线程下的并发,又称微线程,纤程。英文名 Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

需要强调的是:

  • python 的线程属于内核级别的,即由操作系统控制调度(如单线程遇到 io 或执行时间过长就会被迫交出 cpu 执行权限,切换其他线程运行)
  • 单线程内开启协程,一旦遇到 io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非 io 操作的切换与效率无关) 对比操作系统控制线程的切换,用户在单线程内控制协程的切换

优点

  1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级。
  2. 单线程内就可以实现并发的效果,最大限度地利用 cpu。

缺点

  1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程。
  2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程。

特点

  1. 必须在只有一个单线程里实现并发。
  2. 修改共享数据不需加锁。
  3. 用户程序里自己保存多个控制流的上下文栈。

greenlet 模块

安装

pip3 install greenlet

实现状态切换

from greenlet import greenlet

def func1():
    print('func1 start')
    g2.switch()
    print('func1 end')
    g2.switch()

def func2():
    print('func2 start')
    g1.switch()
    print('func2 end')

g1 = greenlet(func1)
g2 = greenlet(func2)
g1.switch()

'''
result:
    func1 start
    func2 start
    func1 end
    func2 end
'''

顺序执行与切换执行效率对比

#顺序执行
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i

def f2():
    res=1
    for i in range(100000000):
        res*=i

start=time.time()
f1()
f2()
stop=time.time()
print('run time is %s' %(stop-start)) #10.985628366470337

#切换
from greenlet import greenlet
import time
def f1():
    res=1
    for i in range(100000000):
        res+=i
        g2.switch()

def f2():
    res=1
    for i in range(100000000):
        res*=i
        g1.switch()

start=time.time()
g1=greenlet(f1)
g2=greenlet(f2)
g1.switch()
stop=time.time()
print('run time is %s' %(stop-start)) # 52.763017892837524

单纯的切换(在没有 io 的情况下或者没有重复开辟内存空间的操作),反而会降低程序的执行速度。

gevent 协程初使用

安装

pip3 install gevent

非协程和协程耗时对比

import gevent
import threading
import os
import time


def func1():
    print('pid:{} threadid:{} from func1 | start'.format(os.getpid(), threading.get_ident()))
    gevent.sleep(1)
    print('pid:{} threadid:{} from func1 | end'.format(os.getpid(), threading.get_ident()))


def func2():
    print('pid:{} threadid:{} from func2 | start'.format(os.getpid(), threading.get_ident()))
    gevent.sleep(1)
    print('pid:{} threadid:{} from func2 | end'.format(os.getpid(), threading.get_ident()))


start = time.time()
func1()
func2()
print('非协程耗时:{}'.format(time.time() - start))

start = time.time()
g1 = gevent.spawn(func1)
g2 = gevent.spawn(func2)
g1.join()
g2.join()
print('协程耗时:{}'.format(time.time() - start))

'''
result:
    pid:12092 threadid:2828 from func1 | start
    pid:12092 threadid:2828 from func1 | end
    pid:12092 threadid:2828 from func2 | start
    pid:12092 threadid:2828 from func2 | end
    非协程耗时:2.008000135421753
    pid:12092 threadid:2828 from func1 | start
    pid:12092 threadid:2828 from func2 | start
    pid:12092 threadid:2828 from func1 | end
    pid:12092 threadid:2828 from func2 | end
    协程耗时:1.0
'''

monkey-识别 io 阻塞

上例 gevent.sleep(2) 模拟的是 gevent 可以识别的 io 阻塞,而 time.sleep(2) 或其他的阻塞,gevent 是不能直接识别的需要用下面一行代码,打补丁,就可以识别了。

from gevent import monkey;monkey.patch_all() # 必须放到被打补丁者的前面

例:

from gevent import monkey;monkey.patch_all()
import gevent
import threading
import os
import time


def func1():
    print('pid:{} threadid:{} from func1 | start'.format(os.getpid(), threading.get_ident()))
    time.sleep(1)
    print('pid:{} threadid:{} from func1 | end'.format(os.getpid(), threading.get_ident()))


def func2():
    print('pid:{} threadid:{} from func2 | start'.format(os.getpid(), threading.get_ident()))
    time.sleep(1)
    print('pid:{} threadid:{} from func2 | end'.format(os.getpid(), threading.get_ident()))


start = time.time()
func1()
func2()
print('非协程耗时:{}'.format(time.time() - start))

start = time.time()
g1 = gevent.spawn(func1)
g2 = gevent.spawn(func2)
g1.join()
g2.join()
print('协程耗时:{}'.format(time.time() - start))

'''
result:
    pid:7200 threadid:43458064 from func1 | start
    pid:7200 threadid:43458064 from func1 | end
    pid:7200 threadid:43458064 from func2 | start
    pid:7200 threadid:43458064 from func2 | end
    非协程耗时:2.004999876022339
    pid:7200 threadid:55386728 from func1 | start
    pid:7200 threadid:55387544 from func2 | start
    pid:7200 threadid:55386728 from func1 | end
    pid:7200 threadid:55387544 from func2 | end
    协程耗时:1.000999927520752
'''

统计网页长度示例

from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time


def get_page(url):
    print('GET: %s' % url)
    response = requests.get(url)
    if response.status_code == 200:
        print('%d bytes received from %s' % (len(response.text), url))


start_time = time.time()
gevent.joinall([
    gevent.spawn(get_page, 'https://www.python.org/'),
    gevent.spawn(get_page, 'https://www.yahoo.com/'),
    gevent.spawn(get_page, 'https://github.com/'),
])
stop_time = time.time()
print('run time is %s' % (stop_time - start_time))
'''
result:
    GET: https://www.python.org/
    GET: https://www.yahoo.com/
    GET: https://github.com/
    64127 bytes received from https://github.com/
    48854 bytes received from https://www.python.org/
    502701 bytes received from https://www.yahoo.com/
    run time is 1.9760000705718994
'''

单线程下的 socket 并发

# server 端
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent

# 如果不想用money.patch_all()打补丁,可以用gevent自带的socket
# from gevent import socket
# s=socket.socket()

def server(server_ip, port):
    s = socket(AF_INET, SOCK_STREAM)
    s.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    s.bind((server_ip, port))
    s.listen(5)
    while True:
        conn, addr = s.accept()
        gevent.spawn(talk, conn, addr)

def talk(conn, addr):
    try:
        while True:
            res = conn.recv(1024)
            print('client %s:%s msg: %s' % (addr[0], addr[1], res))
            conn.send(res.upper())
    except Exception as e:
        print(e)
    finally:
        conn.close()


if __name__ == '__main__':
    server('127.0.0.1', 8080)
# client 端
from threading import Thread
from socket import *
import threading


def client(server_ip, port):
    c = socket(AF_INET, SOCK_STREAM)  # 套接字对象一定要加到函数内,即局部名称空间内,放在函数外则被所有线程共享,则大家公用一个套接字对象,那么客户端端口永远一样了
    c.connect((server_ip, port))

    count = 0
    while True:
        c.send(('%s say hello %s' % (threading.current_thread().getName(), count)).encode('utf-8'))
        msg = c.recv(1024)
        print(msg.decode('utf-8'))
        count += 1


if __name__ == '__main__':
    for i in range(500):
        t = Thread(target=client, args=('127.0.0.1', 8080))
        t.start()

Copyright: 采用 知识共享署名4.0 国际许可协议进行许可

Links: https://www.zze.xyz/archives/python-base-33.html

Buy me a cup of coffee ☕.