class: center, middle # Tricks for Efficient Multicore Computing .normal[
Thomas Moreau - **Olivier Grisel**
] .affiliations[ ![CMLA](images/logo_cmla.png) ![ENS Paris-Saclay](images/logo_ens.png) ![Inria](images/inria-logo.png) ]
.normal[ https://ogrisel.github.io/decks ] --- # Outline ### Introduction to the `concurrent.futures` API ### Thread-based vs Process-based multicore computing ### Thread-based parallelism for array processing with BLAS --- # Embarassingly parallel computation .normal2[ Three APIs available: - `multiprocessing.Pool`: first implementation
- `concurrent.futures`: newer API reusing some `multiprocessing` under the hood
- `loky`: robustification of `concurrent.futures`
] --- class: middle, center # `concurrent.futures` --- ## The `Future` object ### `Future`: reference to the result of some asynchronous computation ### 4 states: .state[Not started], .state[Running], .state[Cancelled] and .state[Done] ### Can be checked using `f.running, f.cancelled, f.done` .left-column[.normal[ ### Blocking methods * `f.result(timeout=None)` * `f.exception(timeout=None)` ]] .right-column[.normal[
wait for computations to be done. ]] --- ### The `Executor`: a worker pool .wide-left-column[ ```python from concurrent.futures import ThreadPoolExecutor *def fit_model(params): * # Heavy computation * return model # Create an executor with 4 threads with ThreadPoolExecutor(max_workers=4) as executor: # Submit an asynchronous job and return a Future future1 = executor.submit(fit_model, param1) # Submit other job future2 = executor.submit(fit_model, param2) # Run other computation ... # Blocking call, wait and return the result model1 = future1.result(timeout=None) model2 = future2.result(timeout=None) # The ressources have been cleaned up print(model1, model2) ``` ] .small-right-column[ ![:scale 100%](images/1_init.png) ] --- count: false ### The `Executor`: a worker pool .wide-left-column[ ```python from concurrent.futures import ThreadPoolExecutor def fit_model(params): # Heavy computation return model *# Create an executor with 4 threads *with ThreadPoolExecutor(max_workers=4) as executor: # Submit an asynchronous job and return a Future future1 = executor.submit(fit_model, param1) # Submit other job future2 = executor.submit(fit_model, param2) # Run other computation ... # Blocking call, wait and return the result model1 = future1.result(timeout=None) model2 = future2.result(timeout=None) # The ressources have been cleaned up print(model1, model2) ``` ] .small-right-column[ ![:scale 100%](images/2_spawn.png) ] --- count: false ### The `Executor`: a worker pool .wide-left-column[ ```python from concurrent.futures import ThreadPoolExecutor def fit_model(params): # Heavy computation return model # Create an executor with 4 threads with ThreadPoolExecutor(max_workers=4) as executor: * # Submit an asynchronous job and return a Future * future1 = executor.submit(fit_model, param1) # Submit other job future2 = executor.submit(fit_model, param2) # Run other computation ... # Blocking call, wait and return the result model1 = future1.result(timeout=None) model2 = future2.result(timeout=None) # The ressources have been cleaned up print(model1, model2) ``` ] .small-right-column[ ![:scale 100%](images/3_submit1.png) ] --- count: false ### The `Executor`: a worker pool .wide-left-column[ ```python from concurrent.futures import ThreadPoolExecutor def fit_model(params): # Heavy computation return model # Create an executor with 4 threads with ThreadPoolExecutor(max_workers=4) as executor: # Submit an asynchronous job and return a Future future1 = executor.submit(fit_model, param1) * # Submit other job * future2 = executor.submit(fit_model, param2) # Run other computation ... # Blocking call, wait and return the result model1 = future1.result(timeout=None) model2 = future2.result(timeout=None) # The ressources have been cleaned up print(model1, model2) ``` ] .small-right-column[ ![:scale 100%](images/4_submit2.png) ] --- count: false ### The `Executor`: a worker pool .wide-left-column[ ```python from concurrent.futures import ThreadPoolExecutor def fit_model(params): # Heavy computation return model # Create an executor with 4 threads with ThreadPoolExecutor(max_workers=4) as executor: # Submit an asynchronous job and return a Future future1 = executor.submit(fit_model, param1) # Submit other job future2 = executor.submit(fit_model, param2) * # Run other computation * ... # Blocking call, wait and return the result model1 = future1.result(timeout=None) model2 = future2.result(timeout=None) # The ressources have been cleaned up print(model1, model2) ``` ] .small-right-column[ ![:scale 100%](images/5_running.png) ] --- count: false ### The `Executor`: a worker pool .wide-left-column[ ```python from concurrent.futures import ThreadPoolExecutor def fit_model(params): # Heavy computation return model # Create an executor with 4 threads with ThreadPoolExecutor(max_workers=4) as executor: # Submit an asynchronous job and return a Future future1 = executor.submit(fit_model, param1) # Submit other job future2 = executor.submit(fit_model, param2) # Run other computation ... * # Blocking call, wait and return the result * model1 = future1.result(timeout=None) model2 = future2.result(timeout=None) # The ressources have been cleaned up print(model1, model2) ``` ] .small-right-column[ ![:scale 100%](images/6_collect1.png) ] --- count: false ### The `Executor`: a worker pool .wide-left-column[ ```python from concurrent.futures import ThreadPoolExecutor def fit_model(params): # Heavy computation return model # Create an executor with 4 threads with ThreadPoolExecutor(max_workers=4) as executor: # Submit an asynchronous job and return a Future future1 = executor.submit(fit_model, param1) # Submit other job future2 = executor.submit(fit_model, param2) # Run other computation ... * # Blocking call, wait and return the result model1 = future1.result(timeout=None) * model2 = future2.result(timeout=None) # The ressources have been cleaned up print(model1, model2) ``` ] .small-right-column[ ![:scale 100%](images/7_collect2.png) ] --- count: false ### The `Executor`: a worker pool .wide-left-column[ ```python from concurrent.futures import ThreadPoolExecutor def fit_model(params): # Heavy computation return model # Create an executor with 4 threads with ThreadPoolExecutor(max_workers=4) as executor: # Submit an asynchronous job and return a Future future1 = executor.submit(fit_model, param1) # Submit other job future2 = executor.submit(fit_model, param2) # Run other computation ... # Blocking call, wait and return the result model1 = future1.result(timeout=None) model2 = future2.result(timeout=None) *# The ressources have been cleaned up *print(model1, model2) ``` ] .small-right-column[ ![:scale 100%](images/8_final.png) ] --- class: middle, center # Choosing the type of worker: # `Thread` or `Process` ? --- # Thread Worker .left-column[.normal[ - Real system thread: - pthread - windows thread - All the computation are done with a **single** interpreter. ]]
.normal[ **Advantages:** - Fast spawning - Low memory overhead - No communication overhead (shared python objects) ]
.reset-column[ ] -- count: false .normal[.centered[
Wait... shared python objects and a single interpreter?!? ]] -- count: false .centered[ ## There is only one GIL! ] --- # Python Global Interpreter Lock (GIL) .normal[ **Global lock everytime we access a python object** Not designed for efficient multicore computing Released when performing long I/O operations Released by some libraries: numpy, pandas, sklearn, Cython `with nogil`... ] --- # Thread Worker .normal[ Multiple threads running python code: ![:scale 85%](images/thread-2.png) This is not quicker than sequential even on a multicore machine. ] --- # Thread Worker .normal[ Threads hold the GIL when running python code. They release it when blocking for I/O: ![](images/thread-0.png) Or when calling into some native library: ![](images/thread-1.png) ] --- exclude: true # Thread Worker .left-column[ Even worse: ```python >>> from threading import Thread >>> def count(n): ... while n > 0: ... n -= 1 >>> %timeit count(10000000) 1 loop, best of 3: 409 ms per loop >>> %%timeit ... t1 = Thread(target=count, args=(5000000,)) ... t2 = Thread(target=count, args=(5000000,)) ... t1.start(); t2.start() ... t1.join(); t2.join() 1 loop, best of 3: 401 ms per loop ``` ] .right-column[.normal[
The threads spend more time trying to acquire the
GIL
than computing. ]] --- # Process Worker .left-column[.normal[ - Create a new python interpreter per worker. - Each worker run in **its own ** interpreter. ]]
.normal[ **Inconvenients:** - *Slow* spawning - Higher memory overhead - Higher communication overhead. ]
.reset-column[ ] -- count: false .normal[ **No GIL contention anymore!**
The computation can be done in parallel even for python code. ] --- class: center, middle # Methods to create a new interpreter: # *fork* or *spawn* *spawn* is `fork` followed by `exec` on POSIX --- # Launching a new interpreter: *fork* .normal[ Duplicate the current interpreter (only on UNIX) .left-column[ **Advantages:** - Low spawning overhead - The interpreter is warm *imported* ] .right-column[ **Inconvenient:** - Bad interaction with multithreaded programs - Does not respect the POSIX specifications ] .reset-column[ ] $\Rightarrow$ **crash**: numpy on OSX (Accelerate), OpenMP (XGBoost, spaCy, OpenCV...) ] --- # Launching a new interpreter: *spawn* .normal[ Create a new interpreter from scratch .left-column[ **Advantages:** - Safe (respect POSIX) - Fresh interpreter without extra libraries ] .right-column[ **Inconvenient:** - Slower to start (50-300ms) - Need to reload the libraries, redefine the functions... ] ] --- # `Thread` vs `Process` .table[ | | Thread | Process .subitem[(fork)] | Process .subitem[(spawn)] | |---------------|:------------------:|:------------------:|:------------------:| | Efficient multicore on pure Python code | ![](images/no.jpg) | ![](images/ok.jpg) | ![](images/ok.jpg) | | No communication overhead | ![](images/ok.jpg) | ![](images/no.jpg) | ![](images/no.jpg) | | POSIX safe | ![](images/ok.jpg) | ![](images/no.jpg) | ![](images/ok.jpg) | | Low spawning overhead | ![](images/ok.jpg) | ![](images/ok.jpg) | ![](images/no.jpg) | ] --- # `Thread` vs `Process` .table[ | | Thread | Process .subitem[(fork)] | Process .subitem[(spawn)] | |---------------|:------------------:|:------------------:|:------------------:| | Efficient multicore on pure Python code | ![](images/no.jpg) | ![](images/ok.jpg) | ![](images/ok.jpg) | | No communication overhead | ![](images/ok.jpg) | ![](images/no.jpg) | ![](images/no.jpg) | | POSIX safe | ![](images/ok.jpg) | ![](images/no.jpg) | ![](images/ok.jpg) | | Low spawning overhead | ![](images/ok.jpg) | ![](images/ok.jpg) |
| ] .centered[.normal[
$\Rightarrow$ Hide the spawning overhead by reusing the pool of processes ]] --- class: middle, center # Reusing a `ProcessPoolExecutor` --- # Reusing a `ProcessPoolExecutor`
.normal2[ Reuse a previously started global `ProcessPoolExecutor` singleton The spawning overhead is only paid once __Main issue:__ is that robust? ] --- # Managing the state of the executor .normal[ Example deadlock: ```python >>> from concurrent.futures import ProcessPoolExecutor >>> with ProcessPoolExecutor(max_workers=4) as e: *... e.submit(lambda: 1) ... Traceback (most recent call last): File "/usr/lib/python3.6/multiprocessing/queues.py", line 241, in _feed obj = _ForkingPickler.dumps(obj) File "/usr/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) _pickle.PicklingError: Can't pickle
at 0x7f5c787bd488>: attribute lookup
on __main__ failed ^C ``` Deadlock: the exception is printed on stderr but cannot be caught. ] --- class: middle, center # Robust and reusable pool of workers: `loky` --- # A reusable `ProcessPoolExecutor` .wide-left-column[ ```python *>>> from loky import get_reusable_executor *>>> executor = get_reusable_executor(max_workers=4) *>>> print(executor.executor_id) *0 >>> executor.submit(id, 42).result() 139655595838272 >>> executor = get_reusable_executor(max_workers=4) >>> print(executor.executor_id) 0 >>> executor.submit(func, unpicklable_obj).result() Traceback (most recent call last): ... BrokenExecutorError >>> executor = get_reusable_executor(max_workers=4) >>> print(executor.executor_id) 1 >>> executor.submit(id, 42).result() 139655595838272 ``` ] .small-right-column[ Create a `ProcessPoolExecutor` using the factory function `get_reusable_executor`. .small[] ] --- count: false # A reusable `ProcessPoolExecutor` .wide-left-column[ ```python >>> from loky import get_reusable_executor >>> executor = get_reusable_executor(max_workers=4) >>> print(executor.executor_id) 0 *>>> executor.submit(id, 42).result() *139655595838272 >>> executor = get_reusable_executor(max_workers=4) >>> print(executor.executor_id) 0 >>> executor.submit(func, unpicklable_obj).result() Traceback (most recent call last): ... BrokenExecutorError >>> executor = get_reusable_executor(max_workers=4) >>> print(executor.executor_id) 1 >>> executor.submit(id, 42).result() 139655595838272 ``` ] .small-right-column[ Create a `ProcessPoolExecutor` using the factory function `get_reusable_executor`. The executor can be used exactly as `ProcessPoolExecutor`. ] --- count: false # A reusable `ProcessPoolExecutor` .wide-left-column[ ```python >>> from loky import get_reusable_executor >>> executor = get_reusable_executor(max_workers=4) >>> print(executor.executor_id) 0 >>> executor.submit(id, 42).result() 139655595838272 *>>> executor = get_reusable_executor(max_workers=4) *>>> print(executor.executor_id) *0 >>> executor.submit(func, unpicklable_obj).result() Traceback (most recent call last): ... BrokenExecutorError >>> executor = get_reusable_executor(max_workers=4) >>> print(executor.executor_id) 1 >>> executor.submit(id, 42).result() 139655595838272 ``` ] .small-right-column[ Create a `ProcessPoolExecutor` using the factory function `get_reusable_executor`. The executor can be used exactly as `ProcessPoolExecutor`. When the factory is called elsewhere, reuse the same executor if it is working. ] --- count: false # A reusable `ProcessPoolExecutor` .wide-left-column[ ```python >>> from loky import get_reusable_executor >>> executor = get_reusable_executor(max_workers=4) >>> print(executor.executor_id) 0 >>> executor.submit(id, 42).result() 139655595838272 >>> executor = get_reusable_executor(max_workers=4) >>> print(executor.executor_id) 0 *>>> executor.submit(func, unpicklable_obj).result() *Traceback (most recent call last): *... *BrokenExecutorError >>> executor = get_reusable_executor(max_workers=4) >>> print(executor.executor_id) 1 >>> executor.submit(id, 42).result() 139655595838272 ``` ] .small-right-column[ Create a `ProcessPoolExecutor` using the factory function `get_reusable_executor`. The executor can be used exactly as `ProcessPoolExecutor`. When the factory is called elsewhere, reuse the same executor if it is working. When the executor is broken, an exception is raised without deadlock. ] --- count: false # A reusable `ProcessPoolExecutor` .wide-left-column[ ```python >>> from loky import get_reusable_executor >>> executor = get_reusable_executor(max_workers=4) >>> print(executor.executor_id) 0 >>> executor.submit(id, 42).result() 139655595838272 >>> executor = get_reusable_executor(max_workers=4) >>> print(executor.executor_id) 0 >>> executor.submit(func, unpicklable_obj).result() Traceback (most recent call last): ... BrokenExecutorError *>>> executor = get_reusable_executor(max_workers=4) *>>> print(executor.executor_id) *1 *>>> executor.submit(id, 42).result() *139655595838272 ``` ] .small-right-column[ Create a `ProcessPoolExecutor` using the factory function `get_reusable_executor`. The executor can be used exactly as `ProcessPoolExecutor`. When the factory is called elsewhere, reuse the same executor if it is working. When the executor is broken, an exception is raised without deadlock. Fetching the singleton again creates a fresh working instance. ] --- class: middle, center # Thread-based multicore computing for arrays with BLAS --- # BLAS ### Numpy is a Python wrapper for BLAS routines ### Efficient implementations: - OpenBLAS (numpy from PyPI or conda-forge) - Intel MKL (numpy from Anaconda) ### `np.dot(A, B)` will use all cores by default ### Control number of threads with env variables: - `OPENBLAS_NUM_THREADS=2 python mycode.py` - `MKL_NUM_THREADS=2 python mycode.py` --- class: center, middle # Roofline Analysis --- # Roofline Analysis ### Compute bound: speed of CPU, e.g. 100 GFLOP/s ### Memory bound: speed of RAM, e.g. 50 GByte/s ### Arithmetic Intensity: compute / data transfer ### Example: Matrix Matrix multiplication `(n, k)` by `(k, m)` .normal[ - Compute: `~ 2 * m * n * k` - Data: `(m * k + n * k) + m * n` ] --- class: center, middle ![MKL roofline](images/roofline_mkl.png) --- class: center, middle ![OpenBLAS roofline](images/roofline_openblas.png) --- class: center, middle ![MKL roofline 10 cores](images/roofline_mkl_10_cores.png) --- class: center, middle ![Speed up 10 cores](images/speedup_mkl_10_cores.png) --- class: middle, center ## Multicore scalability is limited by I/O for workloads with low arithmetic intensity --- class: middle, center # Conclusion --- # Conclusion .normal3[ - `Thread` is the best method to run multicore programs if your code releases the **GIL** in performance critical sections. - Array operations already benefit from threading via numpy + BLAS (e.g. MKL or OpenBLAS) - `loky` uses `Process` with `spawn` and tries to reuse the pool of process as much as possible. - `loky` will be used by default in future `joblib` and `scikit-learn`. ] --- class: middle, center # Thanks for your attention! .normal[
Slides available at [ogrisel.github.io/decks/2017_euroscipy_parallelism](https://ogrisel.github.io/decks/2017_euroscipy_parallelism)
Based on Thomas Moreau slides available at [tommoral.github.io/pyparis17/](https://tommoral.github.io/pyparis17/)
More on the GIL by Dave Beazley : [dabeaz.com/python/GIL.pdf](http://dabeaz.com/python/GIL.pdf)
![:scale 1em](images/github.png) Loky project : [github.com/tommoral/loky](https://github.com/tommoral/loky)
![:scale 1em](images/twitter.png) @[ogrisel](https://twitter.com/ogrisel)
] .filler[]