Aller au contenu

Programmation PHP/RabbitMQ

Un livre de Wikilivres.

RabbitMQ est un logiciel de messages en protocole AMQP. Il permet donc à des processus de produire des messages JSON dans des files d'attente pour que d'autres les consomme ensuite[1].

composer require php-amqplib/php-amqplib

L'installation du serveur est multi-plateforme. Sous Linux[2] :

apt-get install rabbitmq-server

Test de fonctionnement :

telnet localhost 5672

Site de gestion

[modifier | modifier le wikicode]

Une interface graphique existe pour lire et manipuler les messages manuellement, c'est le management plugin[3]. Pour l'activer :

/usr/sbin/rabbitmq-plugins enable rabbitmq_management

Test de fonctionnement depuis le serveur :

curl localhost:15672

Depuis le client : http://mon_serveur:15672

On la trouve aussi sur Docker[4].

Pour trouver le fichier de configuration :

cat  /usr/sbin/rabbitmq-server | grep RABBITMQ_ENV

Les identifiants par défaut de RabbitMQ dépendent des versions. On trouve soit le login / mot de passe "user / password", soit "guest / guest". Pour tester :

curl -i -u guest:guest http://localhost:15672/api/whoami

Si cela ne fonctionne pas, configurer le serveur avec rabbitmqctl. Exemple sous Linux :

/usr/sbin/rabbitmqctl add_user userDev mon_mot_de_passe
/usr/sbin/rabbitmqctl set_permissions -p / userDev '.*' '.*' '.*'
/usr/sbin/rabbitmqctl set_user_tags userDev management
/usr/sbin/rabbitmqctl list_users
$connection = new AMQPStreamConnection($host, $port, $login, $password);
...
$connection->close();
 Sur le framework Symfony, on peut utiliser le composant Messenger à la place.

Création de queue et routage

[modifier | modifier le wikicode]

Pour créer une queue simple prête à recevoir des messages :

        $this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue1', false, false, false, false);
icône image Image externe
Schéma des différents types de routage RabbitMQ sur le site : (en) Jyoti Sachdeva, « Getting Started With RabbitMQ: Python »,

Une autre manière de poster des messages est en passant par un exchange. On en distingue plusieurs types[5] :

  • direct : une seule queue recevra le message (patron de conception producteur/consommateur).
  • fanout : toutes les queues liée à l’exchange recevront le message (patron de conception producteur/abonné).
  • topic : les queues de l’exchange inscrites aux sujets concernés recevront le message (selon un motif dans la "routing key" où "*" représente un seul mot séparé par un point, et "#" au moins un)[6].
  • headers : routage par en-tête de message plutôt que par "routing key".

Dans cet exemple, on rattache la queue à un exchange "Bus" :

        $this->rabbitMqConnection->getChannel()->exchange_declare('Bus', 'fanout', false, true, false);
        $this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue2', false, true, false, false);
        $this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue2', 'Bus');
        $this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue3', false, true, false, false);
        $this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue3', 'Bus');

Exemple de topic : on ne publie pas dans la queue mais dans l’exchange qui leur routera ensuite le message.

        $this->rabbitMqConnection->getChannel()->exchange_declare('Topic_bus', 'topic', false, false, false);
        $this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue4', false, true, false, false);
        $this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue4', 'Topic_bus');
        $this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue5', false, true, false, false);
        $this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue5', 'Topic_bus');

Pour demander à RabbitMQ de ne pas surcharger les consommateurs d'une queue en leur répartissant les messages que s'ils ont terminé de traiter le précédent :

        $this->rabbitMqConnection->getChannel()->basic_qos(null, 1, null);

Le mode DLX (Dead Letter Exchanges) permet de transférer un message d'une queue dans une autre après un certain temps[7].

        $amqpMessage = new AMQPMessage(json_encode('Hello World!'),
            ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
        );
        $this->rabbitMqConnection->getChannel()->basic_publish($amqpMessage, 'Bus', 'Wikibooks.Queue1');

Par défaut on consomme un seul message de la queue. Pour tous les lire un par un, utiliser basic_ack() après basic_consume().

        $this->rabbitMqConnection->getChannel()->basic_consume(
            'Wikibooks.Queue1',
            gethostname() . '#' . rand(1, 9999),
            false,
            false,
            false,
            false,
            [$this, 'consumeCallback']
        );

        while (count($this->rabbitMqConnection->getChannel()->callbacks)) {
            $this->rabbitMqConnection->getChannel()->wait();
        }

    public function consumeCallback(?AMQPMessage $msg)
    {
        if (empty($msg)) {
            return null;
        }

        $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

        var_dump(json_decode($msg->getBody()));
    }

En mode "topic", on peut remplacer 'Wikibooks.Queue1' par 'Wikibooks.*' pour récupérer toutes les queues.