Share nothing, copy by value and go \parallel

Photo by ArtisanalPhoto on Unsplash

Parallel processing is a popular concept — especially these days — because it allows for the efficient execution of tasks by dividing them into smaller subtasks that can be processed simultaneously.

This usually leads to faster computation and increased performance, making it particularly useful for tasks that require high computational power, such as scientific simulations, data analysis, and artificial intelligence.

While parallel processing has its advantages we tend not to talk about the drawbacks:

  • It adds complexity and overhead to the design and implementation of the system.

  • It may introduce synchronization issues or race conditions if the tasks share data or resources.

  • It may not be suitable for tasks that are not independent or parallelizable.

  • It may waste resources if the pool size is too large or too small for the workload.

Sounds like a nightmare? You are right …

If it comes to PHP we tend to focus on its capabilities related to web application development and not so much on its CLI.

“Parallel” is a PHP extension, which allows for parallel execution of PHP code by utilizing multiple processes or threads. It provides functions and constructs to manage parallelism and parallel data processing.

Now let’s see how the parallel package tackles the above-listed disadvantages:

  • Allowing the programmer to use either a functional or an object-oriented API depending on their needs and preferences.

  • Bootstrapping all executing runtimes identically with a custom code.

  • Providing parallel\{Channel, Events, Sync, Future} classes that can be used for sharing data, polling events, synchronizing shared resources and retrieving results between runtimes safely and efficiently.

So let us see it in action.

In one of my previous article, I showed how the worker pool design pattern can be implemented with the pthreads API.

It was not a painful exercise, however, collecting the results from the workers was not straightforward.

In the parallel API, the central element is the channel.

A channel is used to share data, arguments and even closures between runtimes.

You can create buffered (non-blocking) or non-buffered (blocking) channels to share data.

To recreate our last example (messages in a queue waiting to be delivered by a pool of workers asynchronously) we will have a pool of workers again which are going to communicate with the main processes through 2 channels:

  • a channel that is used to receive the tasks for execution, let us call it taskChannel,

  • and a channel that is used to share the results of their work, let us call it resultChannel.

The “semantic architecture” of the program:

Image by the author (created with ASCII Flow)

For all these to work we need to create the workers and assign each worker one or more tasks to execute. In case a worker receives multiple tasks it will execute them sequentially (in FIFO) but the workers are doing their job independently from each other.

Please notice that the entire system is synchronised by the resultChannel!

public function run() {

        $tasks = $this->createTasks(Helpers::scanDirectory(DIR));

        $futures = []; // Keep track of task executors
        $workers = []; // Keep track of the worker pool

        // Create the worker pool and set their task processor
        foreach ($tasks as $index => $task) {
            $workerIndex = $index % $this->maxWorkers; // Calculate the worker index
            $rt = !isset($workers[$workerIndex])
                ? new Runtime('bootstrap.php')
                : $workers[$workerIndex];
            $future = $rt->run($this->createTaskProcessor(), [$workerIndex, $this->taskChannel, $this->resultChannel]);
            $futures[] = $future;
            ($index < $this->maxWorkers) && $workers[] = $rt;
        }

        // This is non-blocking ... sending tasks to workers
        foreach($tasks as $task) {
            $this->taskChannel->send($task);
        }

        $this->taskChannel->close();

        // This is blocking, therefore we have to move it from the sending part ... 
        // We have to receive a result for each message in our queue ... 
        foreach($tasks as $task) {
            Helpers::printMessage($this->resultChannel->recv());
        }

        $this->resultChannel->close();

        // Runtime graceful join for the worker pool
        array_map(function ($worker) {
            $worker->close();
        }, $workers);

    }

We create a pool of workers and we fire them up with a task processor which will wait for a task to execute.

This is accomplished through a closure that we share with our pool of workers:

public function createTaskProcessor(): closure {
        return 
            function($workerId, $taskChannel, $resultChannel) {
                // Again this is blocking, so the worker is not doing
                // 'anything" until a task arrives ...
                list($closure, $args) = $taskChannel->recv();
                // Execute the task and send the result
                $result = ($closure)(...$args);
                $resultChannel->send("(${workerId}) ${result}");
            };
    }

Our closure receives an id (for demonstration purposes only), the task and the result channel.

Once it runs it starts to listen for new tasks to execute. If the task is received it executes it and sends the results back using the result channel.

What a wonderful concept!

So let’s take a look on the task itself. We have a method that creates a task for every message in our queue:

protected function createTasks($files): array {
   return array_map(function ($fileName) {
       list('url' => $url, 'recipient' => $recipient) = Helpers::processSms($fileName);
       return [($this->sendSms(...)),[
                   $url,
                   $recipient,
                   $fileName
               ]]; // pass the arguments as an array
   }, $files);
}

As you can see the task itself contains a closure with the required parameters.

We are sharing a closure with a closure that will execute it.

This is next level parallel programming and we are not even using events, synchronisation and other cool features of the parallel API!

As usual the full source code can be found on my Gist.