这是将不同大小的多个影像阵列保存在回圈中以及同时使用执行绪/行程的定时示例:
import tempfile
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from pathlib import Path
from time import perf_counter
import numpy as np
from cv2 import cv2
def save_img(idx, image, dst):
cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)
if __name__ == '__main__':
l1 = np.random.randint(0, 255, (100, 50, 50, 1))
l2 = np.random.randint(0, 255, (1000, 50, 50, 1))
l3 = np.random.randint(0, 255, (10000, 50, 50, 1))
temp_dir = tempfile.mkdtemp()
workers = 4
t1 = perf_counter()
for ll in l1, l2, l3:
t = perf_counter()
for i, img in enumerate(ll):
save_img(i, img, temp_dir)
print(f'Time for {len(ll)}: {perf_counter() - t} seconds')
for executor in ThreadPoolExecutor, ProcessPoolExecutor:
with executor(workers) as ex:
futures = [
ex.submit(save_img, i, img, temp_dir) for (i, img) in enumerate(ll)
]
for f in as_completed(futures):
f.result()
print(
f'Time for {len(ll)} ({executor.__name__}): {perf_counter() - t} seconds'
)
我在 i5 mbp 上得到这些持续时间:
Time for 100: 0.09495482999999982 seconds
Time for 100 (ThreadPoolExecutor): 0.14151873999999998 seconds
Time for 100 (ProcessPoolExecutor): 1.5136184309999998 seconds
Time for 1000: 0.36972280300000016 seconds
Time for 1000 (ThreadPoolExecutor): 0.619205703 seconds
Time for 1000 (ProcessPoolExecutor): 2.016624468 seconds
Time for 10000: 4.232915643999999 seconds
Time for 10000 (ThreadPoolExecutor): 7.251599262 seconds
Time for 10000 (ProcessPoolExecutor): 13.963426469999998 seconds
难道执行绪/行程不需要更少的时间来实作同样的事情吗?在这种情况下为什么不呢?
uj5u.com热心网友回复:
代码中的计时t
是错误的,因为在测验池之前没有重置计时器。然而,时间的相对顺序是正确的。一个可能的定时器复位代码是:
import tempfile
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from pathlib import Path
from time import perf_counter
import numpy as np
from cv2 import cv2
def save_img(idx, image, dst):
cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)
if __name__ == '__main__':
l1 = np.random.randint(0, 255, (100, 50, 50, 1))
l2 = np.random.randint(0, 255, (1000, 50, 50, 1))
l3 = np.random.randint(0, 255, (10000, 50, 50, 1))
temp_dir = tempfile.mkdtemp()
workers = 4
for ll in l1, l2, l3:
t = perf_counter()
for i, img in enumerate(ll):
save_img(i, img, temp_dir)
print(f'Time for {len(ll)}: {perf_counter() - t} seconds')
for executor in ThreadPoolExecutor, ProcessPoolExecutor:
t = perf_counter()
with executor(workers) as ex:
futures = [
ex.submit(save_img, i, img, temp_dir) for (i, img) in enumerate(ll)
]
for f in as_completed(futures):
f.result()
print(
f'Time for {len(ll)} ({executor.__name__}): {perf_counter() - t} seconds'
)
多执行绪速度更快,特别是对于 I/O 系结行程。在这种情况下,压缩影像是 CPU 密集型的,因此根据 OpenCV 和 python 包装器的实作,多执行绪可能会慢得多。在很多情况下,罪魁祸首是 CPython 的 GIL,但我不确定是否是这种情况(我不知道 GIL 是否在imwrite
呼叫程序中被释放)。在我的设定(i7 8th gen)中,执行绪与 100 张影像的回圈一样快,而 1000 和 10000 张影像的速度几乎没有快。如果ThreadPoolExecutor
重用执行绪,则将新任务分配给现有执行绪会产生开销。如果它不重用执行绪,则启动新执行绪会产生开销。
多处理绕过了 GIL 问题,但还有一些其他问题。首先,在行程之间传递资料需要一些时间,而在影像的情况下,它可能非常昂贵。其次,在 windows 的情况下,生成一个新行程需要很多时间。查看开销(行程和执行绪)的一个简单测验是将save_image
函式更改为一个什么都不做但仍需要酸洗等的函式:
def save_img(idx, image, dst):
if idx != idx:
print("impossible!")
并通过一个类似的没有自变量的方式来查看产生行程的开销等。
我的设定中的计时显示仅生成 10000 个行程需要 2.3 秒,而酸洗需要 0.6 秒,这远远超过处理所需的时间。
提高吞吐量并将开销保持在最低限度的一种方法是中断块上的作业,并将每个块提交给作业人员:
import tempfile
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from pathlib import Path
from time import perf_counter
import numpy as np
from cv2 import cv2
def save_img(idx, image, dst):
cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)
def multi_save_img(idx_start, images, dst):
for idx, image in zip(range(idx_start, idx_start len(images)), images):
cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)
if __name__ == '__main__':
l1 = np.random.randint(0, 255, (100, 50, 50, 1))
l2 = np.random.randint(0, 255, (1000, 50, 50, 1))
l3 = np.random.randint(0, 255, (10000, 50, 50, 1))
temp_dir = tempfile.mkdtemp()
workers = 4
for ll in l1, l2, l3:
t = perf_counter()
for i, img in enumerate(ll):
save_img(i, img, temp_dir)
print(f'Time for {len(ll)}: {perf_counter() - t} seconds')
chunk_size = len(ll)//workers
ends = [chunk_size * (_ 1) for _ in range(workers)]
ends[-1] = len(ll) % workers
starts = [chunk_size * _ for _ in range(workers)]
for executor in ThreadPoolExecutor, ProcessPoolExecutor:
t = perf_counter()
with executor(workers) as ex:
futures = [
ex.submit(multi_save_img, start, ll[start:end], temp_dir) for (start, end) in zip(starts, ends)
]
for f in as_completed(futures):
f.result()
print(
f'Time for {len(ll)} ({executor.__name__}): {perf_counter() - t} seconds'
)
对于多处理和多执行绪方法,这应该比简单的 for 有显著的提升。
0 评论