Pebble

Release:

5.0.7

Date:

Mar 26, 2024

Modern languages should natively support concurrency, threading and synchronization primitives. Their usage should be the most intuitive possible, yet allowing all the required flexibility.

Pebble aims to help managing threads and processes in an easier way. It wraps Python’s standard library threading and multiprocessing objects.

Concurrent Module

@concurrent.process(timeout=None, name=None, daemon=True, context=None)

Runs the decorated function in a concurrent process, taking care of the results and error management.

The decorated function will return a pebble.ProcessFuture object.

If timeout is set, the process will be stopped once expired and the future object will raise a concurrent.futures.TimeoutError exception.

The name parameter let you define the process name.

The daemon parameter switches between daemon and non-daemon threads.

The context parameter can be used to specify the multiprocessing.context object used for starting the process.

@concurrent.thread(name=None, daemon=True)

Runs the decorated function in a concurrent thread, taking care of the results and error management.

The decorated function will return a concurrent.futures.Future object.

The name parameter let you define the thread name.

The daemon parameter switches between daemon and non-daemon threads.

Asynchronous Module

@asynchronous.process(timeout=None, name=None, daemon=True, context=None)

Runs the decorated function in a concurrent process, taking care of the results and error management.

The decorated function will return an asyncio.Future object.

If timeout is set, the process will be stopped once expired and the future object will raise a asyncio.TimeoutError exception.

The name parameter let you define the process name.

The daemon parameter switches between daemon and non-daemon threads.

The context parameter can be used to specify the multiprocessing.context object used for starting the process.

@asynchronous.thread(name=None, daemon=True)

Runs the decorated function in a concurrent thread, taking care of the results and error management.

The decorated function will return a an asyncio.Future object.

The name parameter let you define the thread name.

The daemon parameter switches between daemon and non-daemon threads.

Pebble Module

class pebble.ProcessPool(max_workers=multiprocessing.cpu_count(), max_tasks=0, initializer=None, initargs=None, context=None)

A Pool allows to schedule jobs into a Pool of Processes which will perform them concurrently. Process pools work as well as a context manager.

max_workers is an integer representing the amount of desired process workers managed by the pool. If max_tasks is a number greater than zero, each worker will be restarted after performing an equal amount of tasks. initializer must be callable, if passed, it will be called every time a worker is started, receiving initargs as arguments.

The context parameter can be used to specify the multiprocessing.context object used for starting the worker processes.

active

True if the Pool is running, false otherwise.

schedule(function, args=(), kwargs={}, timeout=None)

Schedule the function to be executed within the Pool.

Returns a pebble.ProcessFuture object representing the execution of the callable.

function is the function which is about to be scheduled.

args and kwargs will be passed to the function respectively as its arguments and keyword arguments.

timeout can be None, an integer or a float. Once expired, it will force the timed out task to be interrupted and the worker will be restarted. Future.result() will raise TimeoutError, callbacks will be executed.

map(function, *iterables, chunksize=1, timeout=None)

Concurrently compute the function using arguments from each of the iterables. Stop when the shortest iterable is exhausted.

chunksize controls the size of the chunks the iterables will be broken into before being passed to the function.

timeout is an integer or a float. If given, it will be assigned to chunk of the iterables. If the computation of the given chunk will last longer than timeout, its execution will be terminated and iterating over its result will raise TimeoutError.

A pebble.ProcessMapFuture object is returned. Its result method will return an iterable containing the results of the computation in the same order as they were given.

close()

No more job will be allowed into the Pool, queued jobs will be consumed. To ensure all the jobs are performed call ProcessPool.join() just after closing the Pool.

stop()

The Pool will be stopped abruptly. All enqueued and running jobs will be lost. To ensure the Pool to be released call ProcessPool.join() after stopping the Pool.

join(timeout=None)

Waits for all workers to exit, must not be called before calling either close() or stop(). If timeout is set and some worker is still running after it expired, a TimeoutError will be raised.

The join function must be called only in the main loop. Calling it in a pebble.ProcessFuture callback will result in a deadlock.

class pebble.ThreadPool(max_workers=multiprocessing.cpu_count(), max_tasks=0, initializer=None, initargs=None)

A ThreadPool allows to schedule jobs into a Pool of Threads which will perform them concurrently. Thread pools work as well as a context manager.

max_workers is an integer representing the amount of desired process workers managed by the pool. If max_tasks is a number greater than zero, each worker will be restarted after performing an equal amount of tasks. initializer must be callable, if passed, it will be called every time a worker is started, receiving initargs as arguments.

active

True if the Pool is running, false otherwise.

schedule(function, args=(), kwargs={})

Schedule the function to be executed within the Pool.

Returns a concurrent.futures.Future object representing the execution of the callable.

function is the function which is about to be scheduled.

args and kwargs will be passed to the function respectively as its arguments and keyword arguments.

map(function, *iterables, chunksize=1)

Concurrently compute the function using arguments from each of the iterables. Stop when the shortest iterable is exhausted.

chunksize controls the size of the chunks the iterables will be broken into before being passed to the function.

timeout is an integer or a float. If given, it will be assigned to every chunk of the iterables. If the computation of the given chunk will last longer than timeout, iterating over its result will raise TimeoutError.

A pebble.MapFuture object is returned. Its result method will return an iterable containing the results of the computation in the same order as they were given.

close()

No more job will be allowed into the Pool, queued jobs will be consumed. To ensure all the jobs are performed call ThreadPool.join() just after closing the Pool.

stop()

The ongoing jobs will be performed, all the enqueued ones dropped; this is a fast way to terminate the Pool. To ensure the Pool to be released call ThreadPool.join() after stopping the Pool.

join(timeout=None)

Waits for all workers to exit, must not be called before calling either close() or stop(). If timeout is set and some worker is still running after it expired, a TimeoutError will be raised.

The join function must be called only in the main loop. Calling it in a pebble.ProcessFuture callback will result in a deadlock.

@pebble.synchronized([lock])

A synchronized function prevents two or more callers to interleave its execution preventing race conditions.

The synchronized decorator accepts as optional parameter a Lock, RLock or Semaphore from threading and multiprocessing modules.

If no synchronization object is given, a threading.Lock will be employed. This implies that between different decorated functions only one at a time will be executed.

@pebble.sighandler(signals)

Convenience decorator for setting the decorated function as signal handler for the specified signals.

signals can either be a single signal or a list/tuple of signals.

pebble.waitforthreads(threads, timeout=None)

Waits for one or more Thread to exit or until timeout expires.

threads is a list containing one or more threading.Thread objects. If timeout is not None the function will block for the specified amount of seconds returning an empty list if no Thread is ready.

The function returns a list containing the ready Threads.

Note

Expired Threads are not joined by waitforthreads.

pebble.waitforqueues(queues, timeout=None)

Waits for one or more Queue to be ready or until timeout expires.

queues is a list containing one or more Queue.Queue objects. If timeout is not None the function will block for the specified amount of seconds returning an empty list if no Queue is ready.

The function returns a list containing the ready Queues.

class pebble.ProcessFuture

This class inherits from concurrent.futures.Future. The sole difference with the parent class is the possibility to cancel running calls.

cancel()

Cancel a running or enqueued call. If the call has already completed then the method will return False, otherwise the call will be cancelled and the method will return True. If the call is running, the process executing it will be stopped allowing to reclaim its resources.

class pebble.MapFuture

This class inherits from concurrent.futures.Future. It is returned by the map function of a ThreadPool.

result()

Returns an iterator over the results of the map function. If a call raises an exception, then that exception will be raised when its value is retrieved from the iterator. The returned iterator raises a concurrent.futures.TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to Pool.map().

cancel()

Cancel the computation of enqueued element of the iterables passed to the map function. If all the elements are already in progress or completed then the method will return False. True is returned otherwise.

class pebble.ProcessMapFuture

This class inherits from pebble.ProcessFuture. It is returned by the map function of a ProcessPool.

result()

Returns an iterator over the results of the map function. If a call raises an exception, then that exception will be raised when its value is retrieved from the iterator. The returned iterator raises a concurrent.futures.TimeoutError if __next__() is called and the result isn’t available after timeout seconds from the original call to Pool.map().

cancel()

Cancel the computation of running or enqueued element of the iterables passed to the map function. If all the elements are already completed then the method will return False. True is returned otherwise.

exception pebble.ProcessExpired

Raised by Future.result() functions if the related process died unexpectedly during the execution.

exitcode

Integer representing the process’ exit code.

Programming Guidelines

The Python’s multiprocessing programming guidelines apply as well for all functionalities within the process namespace.

Pool workers termination

When a Future is cancelled or the underlying task times out or the ProcessPool is stopped, the affected worker processes are terminated. As a consequence, scheduled functions which allocate resources such as temporary files or child processes are to be handled carefully. If a worker process is terminated abruptly due to the above reason, such resources will not be relinquished.

Concurrent process decorator

When the start method is either spawn or forkserver, the recommendation is to limit the decoration via concurrent.process to top level functions only. More specifically, inner scope functions decoration is not supported.

The following will not work with the mentioned start methods:

def outer():

    @concurrent.process
    def inner():
        return

    future = inner()

    return future.result()

Examples

Concurrent decorators

Run a function in a separate process and wait for its results.

from pebble import concurrent

@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg

future = function(1, kwarg=1)

print(future.result())

Quite often developers need to integrate in their projects third party code which appears to be unstable, to leak memory or to hang. The concurrent function allows to easily take advantage of the isolation offered by processes without the need of handling any multiprocessing primitive.

from pebble import concurrent
from concurrent.futures import TimeoutError
from third_party_lib import unstable_function

@concurrent.process(timeout=10)
def function(arg, kwarg=0):
    unstable_function(arg, kwarg=kwarg)

future = function(1, kwarg=1)

try:
    results = future.result()
except TimeoutError as error:
    print("unstable_function took longer than %d seconds" % error.args[1])
except ProcessExpired as error:
    print("%s. Exit code: %d" % (error, error.exitcode))
except Exception as error:
    print("unstable_function raised %s" % error)
    print(error.traceback)  # Python's traceback of remote process

Asynchronous decorators

As for the concurrent namespace, asynchronous decorators are asyncio compatible instead.

import asyncio

from pebble import asynchronous

@asynchronous.process
def function(arg, kwarg=0):
    return arg + kwarg

async def asynchronous_function():
    result = await function(1, kwarg=1)
    print(result)

asyncio.run(asynchronous_function())

More Pickles

All process APIs accept multiprocessing.context compatible objects. Therefore, it is possible to use alternative implementations such as multiprocess with dill in order to serialize more object types.

import threading
import multiprocess

from pebble import concurrent

@concurrent.process(context=multiprocess.get_context('spawn'))
def function(unpicklable_object):
    """Pickle an unpicklable object."""
    unpicklable_object.pickle_this()

class Unpicklable:
    """This class cannot be serialized by standard Python Pickle."""
    def __init__(self):
        self.lock = threading.Lock()

    def pickle_this(self):
        with self.lock:
            print('Just pickled!')

def main():
    unpicklable = Unpicklable()
    future = function(unpicklable)
    future.result()

if __name__ == '__main__':
    main()

Pools

The ProcessPool has been designed to support task timeouts and critical errors. If a task reaches its timeout, the worker will be interrupted immediately. Abrupt interruptions of the workers are dealt transparently.

The map function returns a Future object to better control its execution. When the first result is ready, the result function will return an iterator. The iterator can be used to retrieve the results no matter their outcome.

from concurrent.futures import TimeoutError
from pebble import ProcessPool, ProcessExpired

def function(n):
    return n

with ProcessPool() as pool:
    future = pool.map(function, range(100), timeout=10)

    iterator = future.result()

    while True:
        try:
            result = next(iterator)
        except StopIteration:
            break
        except TimeoutError as error:
            print("function took longer than %d seconds" % error.args[1])
        except ProcessExpired as error:
            print("%s. Exit code: %d" % (error, error.exitcode))
        except Exception as error:
            print("function raised %s" % error)
            print(error.traceback)  # Python's traceback of remote process

The following example shows how to compute the Fibonacci sequence up to a certain duration after which, all the remaining computations will be cancelled as they would timeout anyway.

from pebble import ProcessPool
from concurrent.futures import TimeoutError

def fibonacci(n):
    if n == 0: return 0
    elif n == 1: return 1
    else: return fibonacci(n - 1) + fibonacci(n - 2)

with ProcessPool() as pool:
    future = pool.map(fibonacci, range(50), timeout=10)

    try:
        for n in future.result():
            print(n)
    except TimeoutError:
        print("TimeoutError: aborting remaining computations")
        future.cancel()

To compute large collections of elements without incurring in IPC performance limitations, it is possible to use the chunksize parameter of the map function.

from pebble import ProcessPool
from multiprocessing import cpu_count
from concurrent.futures import TimeoutError

def function(n):
    return n

elements = list(range(1000000))

cpus = cpu_count()
size = len(elements)
chunksize = size / cpus
# the timeout will be assigned to each chunk
# therefore, we need to consider its size
timeout = 10 * chunksize

with ProcessPool(max_workers=cpus) as pool:
    future = pool.map(function, elements, chunksize=chunksize, timeout=timeout)

    assert list(future.result()) == elements

Pools and AsyncIO

Pebble pool implemetations are asyncio compatible. Parameters such as timeouts can be passed via the loop.run_in_executor function.

import time
import asyncio

from pebble import ProcessPool

SLEEP = 10
TIMEOUT = 3

def function(seconds):
    print(f"Going to sleep {seconds}s..")
    time.sleep(seconds)
    print(f"Slept {seconds}s.")

async def main():
    loop = asyncio.get_running_loop()
    pool = ProcessPool()

    await loop.run_in_executor(pool, function, TIMEOUT, SLEEP)

asyncio.run(main())

Control process resources usage

By combining the resource module and the ProcessPool initializer function, it is possible to control the amount of resources each process can consume.

In the following example, the memory consumption of each worker process is limited to 1 Kb.

import resource
from pebble import ProcessPool

MAX_MEM = 1024

def initializer(limit):
    """Set maximum amount of memory each worker process can allocate."""
    soft, hard = resource.getrlimit(resource.RLIMIT_AS)
    resource.setrlimit(resource.RLIMIT_AS, (limit, hard))

def function():
    """This function tries to allocate 1Mb worth of string."""
    string = ''

    for _ in range(1024):
        string += 1024 * 'A'

pool = ProcessPool(initializer=initializer, initargs=(MAX_MEM,))
future = pool.schedule(function)

assert isinstance(future.exception(), MemoryError)

Sighandler decorator

The syntax

import signal
from pebble import sighandler

@sighandler((signal.SIGINT, signal.SIGTERM))
def signal_handler(signum, frame):
    print("Termination request received!")

Is equivalent to

import signal

def signal_handler(signum, frame):
    print("Termination request received!")

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

Running the tests

Please refer to the .travis.yml to see how to run the tests.