Worker Pool design pattern with PHP

Photo by Museums Victoria on Unsplash

In this article, I will show you how to use the Worker Pool design pattern with PHP.

In the first part, I will use the Pool and the Threaded classes to demonstrate parallel processing. They are both part of the pthreads object-oriented API and require the ZTS option to be enabled in your PHP version as well (https://www.php.net/manual/en/class.thread.php).

Worker Pool is a design pattern that involves creating a pool of worker threads or processes and assigning tasks to them for parallel execution. This pattern provides a solution for managing concurrent execution and controlling the number of simultaneous tasks being processed.

The benefits of using the Worker Pool pattern in this code include:

  1. Efficient resource utilization: By limiting the number of concurrent processes using a pool, the code can control and optimize resource utilization, preventing an excessive number of processes from overwhelming the system.

  2. Improved performance: Parallel execution of tasks using multiple worker processes can significantly improve the performance of the overall system by utilizing the available resources effectively.

  3. Simplified task assignment: The worker pool abstracts away the complexities of managing individual worker processes. The main process can simply assign tasks to the pool, and the pool takes care of task distribution and process management.

  4. Scalability: The Worker Pool pattern allows for easy scalability. The code can be adapted to handle larger workloads by increasing the size of the worker pool without significant changes to the overall structure.

By employing the Worker Pool pattern, the code achieves better control over process management, resource utilization, and task execution, leading to improved efficiency and performance.

Let us assume that we have a queue (for the sake of simplicity a directory) containing text messages waiting to be sent to various recipients.

The queue might contain thousand of messages so we can not deliver all of them at once since it would overload the gateway (if its rate limiter is not blocking us sooner).

Our job is to send out every message through the gateway using the HTTP protocol.

If we successfully deliver the message then we delete it from the queue, if not then we keep it (and will try to deliver it later).

Now, let’s take a look at the sample code:

class MyPool extends Pool {

    public $data = [];
    private $numTasks = 0;

    public function submit(Threaded $task): int {
        $this->numTasks++;
        return parent::submit($task);
    }

    public function process() {
        // Run this loop as long as we have jobs in the pool
        while (count($this->data) < $this->numTasks) {
            $this->collect(function (SmsWorker $task) {
                // If a task was marked as done, collect its results
                if ($task->isDone()) {
                    $this->data[] = (object) [
                        'complete' => $task->complete,
                        'result' => $task->result,
                        'recipient' => $task->recipient
                    ];
                }
                return $task->isDone();
            });
        }

        // All jobs are done, we can shutdown the pool
        $this->shutdown();

        return $this->data;
    }

}

I have created a Helper class, that contains all the utility functions, so you can focus on the important parts.

The MyPool extends the built-in Pool class, which manages a pool of threads. We need to extend the original class since it does not support the collection of data produced by the threads.

The process method collects the results of the tasks and shuts down the pool:

  1. It uses a loop to check if the $data array has the same size as the $numTasks property, meaning that all tasks have been completed.

  2. Inside the loop, it calls the collect method of the parent class, which takes a callback function as a parameter and returns an integer that represents the number of remaining tasks in the pool.

  3. The callback function takes a SmsWorker object as a parameter and returns a boolean value that indicates if the task is done or not.

  4. If the task is done, it adds an object with three properties: complete, which indicates if the task was successful or not; result, which contains the result message of the task; and recipient, which is the phone number of the recipient of the SMS.

  5. All this information is then stored in the $data property of the class.

class SmsWorker extends Threaded {

    private $url;
    private $recipient;
    private $file_name;

    private $result;
    private $complete = false;

    public function __construct($url, $recipient, $file_name) {
        $this->url = $url;
        $this->recipient = $recipient;
        $this->file_name = $file_name;
    }

    public function run() {

        try {
            if (Helpers::sendMessage($this->url, '200')) {
                $this->result = 'SMS successfully sent to '.$this->recipient.' phone number';
                unlink(DIR.'/'.$this->file_name);
            } else {
                $this->result = 'Failed to send SMS to '.$this->recipient.' phone number';
            }
        } catch (Exception $e) {
            $this->result = 'Error: ' . $e->getMessage();
        }

        $this->complete = true;

    }

    public function isDone() {
        return $this->complete;
    }

}

The Threaded class is a built-in class that allows creating and manipulating threads.

The SmsWorker class extends the Threaded class and represents a task that sends an SMS to a recipient using a URL.

It stores the result of its execution and reports its completion back to the pool.

class SmsClient {

    private $maxProcess;
    private $results = [];

    public function __construct($maxProcess) {
        $this->maxProcess = $maxProcess;
    }

    public function run() {

        try {
            $files = Helpers::scanDirectory(DIR);
            $pool = new MyPool($this->maxProcess);

            $widSuffix = 1;

            foreach ($files as $fileName) {
                $smsData = Helpers::processSms($fileName);
                $wid = $pool->submit(new SmsWorker($smsData['url'], $smsData['recipient'], $fileName));
                $wid = (string) "${wid}." . $widSuffix++;
                $this->results[$smsData['recipient']][] = $wid;
                Helpers::printMessage("SMS client (${wid}) started");
            }

            array_map(function ($task) {
                Helpers::printMessage('SMS client ('.$this->results[$task->recipient][0].') finished: '.$task->result);
                $this->results[$task->recipient][] = $task->result;
            }, $pool->process());
        } catch (Exception $e) {
            Helpers::printMessage('Error: ' . $e->getMessage());
        }

    }

}

The “central part” of the code:

  1. Reads all messages in our queue.

  2. Creates a pool of workers.

  3. Inject the workers into the pool with the necessary information (e.g. recipient, gateway URL, file name).

  4. Keeps track of the workers and their responses as they complete their operation using an array that is keyed by the recipient number.

  5. Prints the final results.

By applying the Worker Pool pattern, the code achieves improved concurrency and parallelism. It efficiently manages the execution of SMS sending tasks using a pool of worker threads, leading to better resource utilization and performance.

If you run the code it will print a similar result like the one below:

Sample output of our queue processor. Image created by the author.

To demonstrate how the pthreads API submits worker tasks and collects results, I have set the maximum number of threads to 2.

In our example we have a queue containing 15 messages, and we submit a total of 15 workers to process them.

The worker IDs are either 0 or 1, which corresponds to the maximum number of threads that can run simultaneously. The second number after the ‘.’ represents a sequence number, allowing easy differentiation between the workers.

As you can observe, the results are not collected in the order of submission. In a multithreading system, workers report their results as soon as they complete their tasks, which may differ from the order of creation.

You can find the full code here: https://gist.github.com/dihjital/6552914cfba7541161050a7c4f64dd3b