Python 中的多进程、多线程编程(关键词:Python/进程/线程/多进程/多线程)
进程
进程是什么?
process: a state of a program during execution. It encompasses various components such as code segments, program counters, stack segments (which accommodate temporary data like local variables, function arguments, and return addresses), heap segments (for dynamic memory allocation), process states, CPU scheduling information, and I/O state information (e.g., open file lists).
进程控制块(Process Control Block)
(详见《操作系统概念(第 7 版)》 - P73)
线程
线程是什么?
线程是 CPU 使用的基本单元,由线程 ID、程序计数器、寄存器集合和栈组成。
进程与线程的区别?
每个线程与同一进程中与其他线程共用代码块和数据块,并负责打开相关文件以及处理信号事件等任务。

(《操作系统概念(第 7 版)》 - P111)
多进程与多线程的区别?
区别 1:不同于线程,在进程之间不存在共享状态;当某个进程修改数据时,改动仅限于该进程中。
区别 2:基于 Python 的多进程中能够实现真正意义上的并行;而基于 Python 的多线程无法实现真正意义上的并行。
(OS概念 - 第 7 版 - P112)
线程优点:
- 资源共享:线程默认继承所属进程的各种资源;
- 经济性:相较于进程而言,创建/切换线程的开销较低;
- 响应能力较强。
Python 中的多线程?
基于 GIL 的机制设计,在同一时间段内只有一个线程能够执行。即使在拥有多个 CPU 核的情况下,在每个时刻也只能有一个线程占用 CPU 资源。由此可见,在 Python 中实现真正意义上的并行是不可能的。
Python 多进程编程
后台进程
当一个进程被标记为后台进程中,在其父进程中一旦退出时(即父进程中发生结束事件),该后台进程中也会相应地退出系统。
以单个进程的形式创建、启动函数
代码清单 - 1
# p337_single_process_with_function.py
from multiprocessing import Process
from time import ctime, sleep
def clock(interval):
while True:
print("The time is %s" % ctime())
sleep(interval)
if __name__ == "__main__":
p = Process(target=clock, args=(2,))
#p.daemon = False
p.start()
#print("hello!")
#p.join()
代码清单 - 2
# p337_single_process_with_function_Daemon_False.py
from multiprocessing import Process
from time import ctime, sleep
def clock(interval):
while True:
print("The time is %s" % ctime())
sleep(interval)
if __name__ == "__main__":
p = Process(target=clock, args=(2,))
p.daemon = False
p.start()
#print("hello!")
#p.join()
以上两段代码均会永远执行。
补充一点背景知识:
- 主进程中daemon标志被设定为False(即主进程中没有设置成为后台进程);
- 当父进程中启动其子时(这些子会被父所继承),其子将获得相同的daemon标志设置(因此这些子也将不被设置为后台进程);
- 如果父过程退出后其子将随之退出(而非父过程不会退出)。
上面两段代码的主要区别在于是否明确地将子进程中程p设定为非后台运行模式。具体而言,在第1段代码中并未对子进程中程p进行显式的设定(即未将其指定为非后台),然而其实际运行效果等同于第2段代码中p.daemon = False这一操作所实现的效果(即将子进程中程p的daemon属性设为False)。由此可见,在实际运行效果上两者并无显著差异
在代码中首先创建了一个Process实例p:
创建了一个Process实例p,并将其目标设为clock函数,
设置其daemon属性为False,
启动了该子进程,
当主进程完成时,
程序继续往下执行,
此时子进程中函数clock陷入死循环,
导致子进程中函数clock陷入死循环,
因此该子进程中函数clock陷入死循环。
如下代码将子进程 p 的daemon 标志设置为 True:
代码清单 - 3
from multiprocessing import Process
from time import ctime, sleep
def clock(interval):
while True:
print("The time is %s" % ctime())
sleep(interval)
if __name__ == "__main__":
p = Process(target=clock, args=(2,))
p.daemon = True
p.start()
#print("hello!")
#p.join()
p.daemon = True 将子进程中台设置为主机进程中台的一次操作;一旦主机进程中止运行,则该后台进程中台也随之终止运行;通过调用 start() 方法启动子进程中台;即使 clock 函数内部存在死循环导致长时间无法终止该台上都会正常退出。
参考资料:
《Python 编程指南》第P337页;
[https://github.com/henry199101/python_CanKao_ShouCE/blob/master/chapter_20/p337_single_process_with_function.py]
注:该代码示例未启用线程间通信;
[https://github.com/henry199101/python_CanKao_ShouCE/blob/master/chapter_20/p337_single_process_with_function_Daemon_False.py]
注:该代码示例已启用线程间通信;
https://github.com/henry199101/python_CanKao_ShouCE/blob/master/chapter_20/p337_single_process_with_function_Daemon_True.py
注:该代码示例默认启用线程间通信。
将进程定义为继承自 Process 的类
代码清单 - 4
from multiprocessing import Process
from time import ctime, sleep
class ClockProcess(Process):
def __init__(self, interval):
Process.__init__(self)
self.interval = interval
def run(self):
while True:
print("The time is %s" % ctime())
sleep(self.interval)
if __name__ == "__main__":
p = ClockProcess(2)
p.start()
代码清单 4 中,进程 p 是非后台进程,与代码清单1、2相同,不再做详细解释。
参考文献如下:
《Python编程语言基础》第337页
此处插入链接
进程间通信
进程间通信机制包括以下几种典型实现方式:管道机制、名称管道、消息队列系统、套接字机制、以及基于同步信号的多线程协调方法等。其中主要的技术手段包括:使用共享内存机制实现进程间的直接访问与互操作性支持等技术手段。
该模块旨在实现其支持的进程间通信机制的两大核心模式:队列和管道。
队列(使用哨兵关闭消费者进程)
from multiprocessing import Process, Queue
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(item)
def producer(sequence, q):
for item in sequence:
q.put(item)
if __name__ == "__main__":
q = Queue()
consumer_process = Process(target=consumer, args=(q,))
consumer_process.start()
sequence = [1, 2, 3, 4]
producer(sequence, q)
q.put(None)
consumer_process.join()
(https://github.com/henry199101/python_CanKao_ShouCe/blob/master/chapter_20/p340_producer_consumer_shao_bing.py)
(Python 参考手册 - P340)
JoinableQueue
详细描述了队列类Queue的实现过程,并具体阐述了其在生产者与消费者同步机制中的应用。该实现分为两种情况:当队列是可连接的时候以及非连接型的队列结构。其中,在可连接的情况下,默认采用 producers synchronizing consumers 的方式;而当需要独立运行两个不相连的 producer 消费器时,则应选择 nonjoinable 的类型。
Python参考手册 - P339
详细介绍了 Python 中用于实现生产者消费者模型的相关知识。其中指出:当一个队列是可连接的时候,默认采用 producers synchronizing consumers 的方式;而如果需要独立运行两个不相连的 producer 消费器,则应选择 nonjoinable 的类型。
管道


APUE - 3rd - P431
from multiprocessing import Process, Pipe
def consumer(pipe):
output_side, input_side = pipe
input_side.close()
while True:
try:
item = output_side.recv()
except EOFError:
break
print(item)
output_side.close()
def producer(sequence, pipe):
output_side, input_side = pipe
output_side.close()
for item in sequence:
input_side.send(item)
input_side.close()
if __name__ == "__main__":
pipe = Pipe()
consumer_process = Process(target=consumer, args=(pipe,))
consumer_process.start()
sequence = [1,2,3,4]
producer(sequence, pipe)
consumer_process.join()
解释:
Pipe()函数会生成一条管道,并提供两个连接器conn1和conn2;其默认行为是双向通信。
若生产者或消费者未有效利用管道某一端口,则应对此端口予以关闭。
该管道采用基于操作系统维护的引用计数机制,在所有相关进程均完成关闭操作后,并使各进程间的引用计数值归零时方能正确触发EOFError异常。
双向通信的管道(使用管道,编写与进程交互的程序)
from multiprocessing import Process, Pipe
def adder(pipe):
server_side, client_side = pipe
client_side.close()
while True:
try:
x, y = server_side.recv()
except EOFError:
break
result = x + y
server_side.send(result)
print("Done!")
if __name__ == '__main__':
pipe = Pipe()
server_side, client_side = pipe
adder_process = Process(target=adder, args=(pipe,))
adder_process.start()
server_side.close()
client_side.send((3, 4))
print(client_side.recv())
client_side.send(('Hello, ', 'World!'))
print(client_side.recv())
client_side.close()
adder_process.join()
(Python 参考手册 - P342——P343)
解释:
不做详细解释,请看图示,一目了然。

进程池
# 程序来自廖雪峰Python
#https://www.liaoxuefeng.com/wiki/0014316089557264a6b348958f449949df42a6d3a2e542c000/001431927781401bb47ccf187b24c3b955157bb12c5882d000#0
from multiprocessing import Pool
from time import time, sleep
from os import getpid
from random import random
def long_time_task(name):
print("Task %s (%s) starts..." % (name, getpid()))
start = time()
sleep(random() * 3)
end = time()
elapsed_time = end - start
print("Task %s runs %.2f seconds!" % (name, elapsed_time))
if __name__ == "__main__":
print("Main process (%s) starts..." % getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print("Waiting for all subprocesses done!")
p.close()
p.join()
print("All subprocesses done!")
输出:
$ python3 p343_p345_process_pool.py
Main process (2701) starts...
Waiting for all subprocesses done!
Task 0 (2702) starts...
Task 1 (2703) starts...
Task 3 (2704) starts...
Task 2 (2705) starts...
Task 2 runs 0.23 seconds!
Task 4 (2705) starts...
Task 3 runs 0.32 seconds!
Task 0 runs 0.64 seconds!
Task 1 runs 1.49 seconds!
Task 4 runs 1.75 seconds!
All subprocesses done!
基于单线程池实现的异步调用函数;
该方法被设计为用于终止进程池;
该操作被配置为用来等待所有工作队项完成;
由于进程池启动了4个子进程中却存在5项待处理的任务,
最后一个额外的任务必须在前面的所有4个工作队项都完成各自的任务后才能开始处理。
参考文献:
廖雪峰 - Python 3 - 多进程 - Pool
Python 参考手册 - P343——P345
连接
(Python 参考手册 - P352——P353)
服务器:
from multiprocessing.connection import Listener
server = Listener(address=('', 15000), authkey='12345')
while True:
conn = server.accept()
while True:
try:
x, y = conn.recv()
except EOFError:
break
result = x + y
conn.send(result)
conn.close()
客户端:
from multiprocessing.connection import Client
conn = Client(address=('localhost', 15000), authkey='12345')
conn.send((2, 3))
result = conn.recv()
print(result)
conn.send(('Hello, ', 'World!'))
result = conn.recv()
print(result)
conn.close()
Python 多线程编程
以线程的形式创建、启动 1 个函数
from threading import Thread
from time import ctime, sleep
def clock(interval):
while True:
print("The time is %s" % ctime())
sleep(interval)
t = Thread(target=clock, args=(2,))
t.daemon = True
t.start()
将单个线程定义为 1 个类
from threading import Thread
from time import sleep, ctime
class ClockThread(Thread):
def __init__(self, seconds):
Thread.__init__(self)
self.seconds = seconds
self.daemon = False
def run(self):
while True:
print('The time is %s' % ctime())
sleep(self.seconds)
if __name__ == '__main__':
t = ClockThread(2)
t.start()
Lock 对象
(Python 核心编程)
with 语句
信号量
可能暂时略
