Skip to content

Python multiprocessing#

Split array and process chunks independently#

from multiprocessing import Pool
from time import sleep
from typing import Callable, List

import numpy as np


def split_and_merge(
    data: np.ndarray,
    num_processes: int,
    function: Callable[[np.ndarray], float],
) -> float:

    def merge(chunks: List[float]) -> float:
        return sum(chunks)

    with Pool(num_processes) as pool:
        return merge(pool.map(function, np.array_split(data, num_processes)))


if __name__ == '__main__':
    def f(arr: np.ndarray) -> float:
        sleep(2)
        return float(sum(arr))

    # Calculate the sum of 0..999 using 5 processes
    arr = np.array(range(1000))
    assert split_and_merge(arr, 5, f) == sum(range(1000)), "Uh oh"

References#