L'implémentation AMQP 0-9-1 dans sgcWebSockets a reçu une mise à jour complète : 11 corrections de bugs couvrant un ordre de paramètres critique, des incompatibilités de types, la sécurité des threads et des problèmes de perte de données, plus 6 nouvelles fonctionnalités protocolaires dont Basic.Nack, les liaisons Exchange-to-Exchange, les Publisher Confirms, les notifications Connection.Blocked/Unblocked et Connection.UpdateSecret pour le rafraîchissement de jetons. Cet article détaille chaque changement avec des exemples de code.
Table des matières
- Corrections de bugs
- Critique : ordre des paramètres de DeclareExchange
- Octet de type de table de champs
- Corrections de conformité à la spécification
- Autres corrections de bugs
- Nouvelles fonctionnalités
- Basic.Nack — accusés de réception négatifs
- Exchange.Bind/Unbind — liaisons Exchange-to-Exchange
- Classe Confirm — Publisher Confirms
- Connection.Blocked/Unblocked — alarmes de ressources
- Connection.UpdateSecret — rafraîchissement de jetons
- Fichiers modifiés
1. Corrections de bugs
Au total, 11 bugs ont été corrigés dans l'implémentation AMQP 0-9-1, allant d'un ordre de paramètres critique à des corrections de conformité à la spécification.
Critique : ordre des paramètres de DeclareExchange
Les méthodes DeclareExchange et
DeclareExchangeEx passaient
aNoWait,
aAutoDelete et
aInternal dans le mauvais ordre à
DoWrite_ExchDeclare. Cela faisait envoyer le drapeau
auto-delete en tant que
no-wait et vice versa, entraînant un comportement d'exchange inattendu
côté broker.
Avant (incorrect)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);
Après (corrigé)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);
Fichier : sgcAMQP_Client.pas
Octet de type de table de champs
La procédure sgcWriteAMQPFieldTable écrivait toujours
$53 ('S' = long string)
comme indicateur de type pour toutes les valeurs de la table de champs, indépendamment du type réel. Cela signifiait que les doubles, entiers,
booléens et valeurs int64 étaient tous incorrectement étiquetés comme des chaînes dans le format wire.
Before (Incorrect)
sgcWriteAMQPByte($53, vBytes); // Always wrote 'S' for ALL types
case vType of
'd': sgcWriteAMQPDouble(...);
'I': sgcWriteAMQPInt32(...);
...
After (Fixed)
case vType of
'd':
begin
sgcWriteAMQPByte(Byte('d'), vBytes); // Correct type per value
sgcWriteAMQPDouble(...);
end;
'I':
begin
sgcWriteAMQPByte(Byte('I'), vBytes);
sgcWriteAMQPInt32(...);
end;
'L':
begin
sgcWriteAMQPByte(Byte('L'), vBytes);
sgcWriteAMQPInt64(...);
end;
't':
begin
sgcWriteAMQPByte(Byte('t'), vBytes);
sgcWriteAMQPBoolean(...);
end;
'S':
begin
sgcWriteAMQPByte(Byte('S'), vBytes);
sgcWriteAMQPLongString(...);
end;
File: sgcAMQP_ReadWrite.pas
Corrections de conformité à la spécification
| Problème | Avant | Après | Fichier |
|---|---|---|---|
| Type incorrect pour BasicGetEmpty.Reserved1 | UInt16 / sgcReadAMQPUInt16 |
string / sgcReadAMQPShortString |
sgcAMQP_Classes.pas |
| Lecture incorrecte pour ChannelOpenOk.Reserved1 | sgcReadAMQPShortString |
sgcReadAMQPLongString |
sgcAMQP_Classes.pas |
| ConnectionClose/ChannelClose utilisaient des getters de valeur de méthode spécifiques à la classe | sgcGetAMQPConnectionValue(FailMethodId) |
sgcGetAMQPMethodValue(FailClassId, FailMethodId) |
sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas |
Une nouvelle fonction utilitaire générique sgcGetAMQPMethodValue
a été ajoutée à sgcAMQP_Helpers.pas pour résoudre le bon
ID entier de méthode pour toute classe AMQP, remplaçant les getters spécifiques à la classe qui ne fonctionnaient que pour leur propre classe.
Autres corrections de bugs
| Bug | Description | File |
|---|---|---|
| ID de channel manquant pour Channel.CloseOk | DoWrite_ChannCloseOk ne définissait pas oFrame.Header.Channel, donc le close-ok était envoyé sur le canal 0 (niveau connexion) au lieu du canal cible. Paramètre aChannelId: Word ajouté. |
sgcAMQP.pas |
| Faute de frappe dans une constante d'erreur | Remplacement de 'Now Allowed' par 'Not Allowed'. |
sgcAMQP_Const.pas |
| Données de requête manquantes pour QueueUnBind | DoWrite_QueueUnBind ne stockait pas QueueUnBindQueue et QueueUnBindExchange sur la requête du canal, faisant remonter des valeurs vides à l'événement OnAMQPQueueUnBind. |
sgcAMQP_Client.pas |
| Octets restants ignorés après DoRead | Lorsque la boucle de lecture sortait avec 1–7 octets restants (trame partielle), ils étaient silencieusement perdus. Maintenant sauvegardés dans FBytes pour le cycle de lecture suivant. |
sgcAMQP.pas |
| GetChannel ignore aRaiseIfNotFound | Le paramètre aRaiseIfNotFound n'était jamais vérifié. Maintenant, ne lève une exception que lorsque le drapeau est True. |
sgcAMQP.pas |
2. Nouvelles fonctionnalités
Six nouvelles fonctionnalités protocolaires AMQP 0-9-1 ont été implémentées, couvrant des extensions RabbitMQ largement utilisées et des méthodes additionnelles de la spécification.
Basic.Nack — accusés de réception négatifs
Basic.Nack (classe 60, méthode 120) est une extension RabbitMQ
qui permet de rejeter un ou plusieurs messages en une seule fois avec une remise en file optionnelle. Contrairement à
Basic.Reject, elle prend en charge un drapeau
multiple pour rejeter tous les messages jusqu'au
delivery tag spécifié inclus.
| Méthode | Description | Direction |
|---|---|---|
NackMessage |
Envoyer un accusé de réception négatif au broker. | Client → serveur |
OnAMQPBasicNack |
Déclenché quand le serveur envoie un Nack (en mode publisher confirm). | Serveur → client |
NackMessage
procedure NackMessage(const aChannel: string;
aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Rejeter un seul message et le remettre en file
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Rejeter tous les messages non acquittés jusqu'à ce tag, les supprimer
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);
OnAMQPBasicNack Event
procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
const aChannel: string;
const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
', Multiple: ' + BoolToStr(aNack.Multiple, True) +
', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;
Exchange.Bind/Unbind — liaisons Exchange-to-Exchange
Les liaisons Exchange-to-Exchange (classe 40, méthodes 30/31 et 40/51) permettent de router des messages entre exchanges sans file intermédiaire. C'est une extension RabbitMQ qui permet des modèles de topologie puissants comme les hiérarchies fan-out et le partitionnement par topic.
| Method | Description |
|---|---|
BindExchange / BindExchangeEx |
Lier un exchange de destination à un exchange source avec une routing key. La variante Ex attend la réponse du broker de manière synchrone. |
UnbindExchange / UnbindExchangeEx |
Supprimer une liaison Exchange-to-Exchange. |
OnAMQPExchangeBind |
Déclenché quand le broker confirme un exchange bind. |
OnAMQPExchangeUnbind |
Déclenché quand le broker confirme un exchange unbind. |
BindExchange / BindExchangeEx
procedure BindExchange(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
const aArguments: string = ''): Boolean;
// Lier 'downstream-exchange' à 'upstream-exchange' avec une routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
'downstream-exchange', // destination
'upstream-exchange', // source
'orders.#', // routing key
False) // attendre la confirmation
then
Log('Liaison d''exchange créée avec succès');
// Supprimer la liaison
sgcAMQPClient.UnbindExchange('my-channel',
'downstream-exchange', 'upstream-exchange', 'orders.#', False);
Classe Confirm — Publisher Confirms
Les publisher confirms (classe 85, méthodes 10/11) permettent au broker d'acquitter la réception des messages publiés.
Une fois un canal mis en mode confirm via Confirm.Select,
le broker enverra Basic.Ack ou
Basic.Nack pour chaque message publié, permettant
une publication fiable sans transactions.
| Méthode / événement | Description |
|---|---|
SelectConfirm / SelectConfirmEx |
Activer le mode publisher confirm sur un canal. |
OnAMQPConfirmSelectOk |
Déclenché quand le broker confirme que le mode confirm est actif. |
OnAMQPBasicAck |
Déclenché quand le broker acquitte un message publié. |
OnAMQPBasicNack |
Déclenché quand le broker rejette un message publié. |
SelectConfirm / SelectConfirmEx
procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
Exemple : publication fiable avec confirms
// 1. Activer le mode confirm sur le canal
if sgcAMQPClient.SelectConfirmEx('my-channel') then
Log('Mode confirm activé');
// 2. Publier un message - le broker enverra Ack ou Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
'routing.key', 'Hello World');
// 3. Gérer les confirmations du serveur
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
const aChannel: string;
const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;
Connection.Blocked/Unblocked — alarmes de ressources
Quand le broker manque de ressources (mémoire, disque), il envoie
Connection.Blocked (classe 10, méthode 60) avec une
chaîne de raison. Quand la condition se résout, il envoie
Connection.Unblocked (méthode 61). Ce sont des notifications uniquement
serveur-vers-client. Cette fonctionnalité est gérée dans la classe de base
TsgcAMQP, ce qui la rend disponible pour tous les composants AMQP.
| Événement | Description |
|---|---|
OnAMQPConnectionBlocked |
Déclenché quand le broker bloque la connexion pour cause de contraintes de ressources. Inclut une chaîne Reason (par ex. 'low on memory'). |
OnAMQPConnectionUnblocked |
Déclenché quand le broker lève le blocage de la connexion. |
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
const aReason: string);
begin
Log('Connexion BLOQUÉE : ' + aReason);
// Mettre en pause la publication pour éviter la perte de messages
FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
Log('Connexion débloquée - reprise');
FPublishingPaused := False;
end;
Connection.UpdateSecret — rafraîchissement de jetons
Connection.UpdateSecret (classe 10, méthode 70) permet
de rafraîchir les identifiants d'authentification sur une connexion active sans se reconnecter. C'est essentiel pour
l'authentification basée sur OAuth2/JWT où les jetons expirent périodiquement.
| Method / Event | Description |
|---|---|
UpdateSecret / UpdateSecretEx |
Envoyer un nouveau secret (jeton) au broker avec une chaîne de raison optionnelle. |
OnAMQPConnectionUpdateSecretOk |
Déclenché quand le broker accepte le nouveau secret. |
UpdateSecret / UpdateSecretEx
procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Rafraîchir le jeton OAuth avant qu'il n'expire
var
vNewToken: string;
begin
vNewToken := GetRefreshedOAuthToken();
if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
Log('Jeton rafraîchi avec succès')
else
Log('Échec du rafraîchissement du jeton - reconnexion');
end;
3. Fichiers modifiés
| File | Changements |
|---|---|
sgcAMQP_Const.pas |
Correction de faute ('Not Allowed'). |
sgcAMQP_Helpers.pas |
Nouvelle fonction sgcGetAMQPMethodValue, helpers pour la classe Confirm (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), mappings d'ID de méthode pour toutes les nouvelles méthodes. |
sgcAMQP_ReadWrite.pas |
Correction de l'octet de type de table de champs — chaque type de valeur écrit maintenant son indicateur de type correct. |
sgcAMQP_Classes.pas |
Nouvelles énumérations (amqpClassConfirm, 12 nouvelles méthodes), 13 nouvelles classes de payload, tables de dispatch mises à jour, correction de sécurité des threads, corrections de conformité, nouveaux champs de stockage de requête. |
sgcAMQP.pas |
8 nouveaux types d'événement, gestion de Connection.Blocked/Unblocked, correction de l'ID de channel pour DoWrite_ChannCloseOk, préservation des octets restants, correction du drapeau GetChannel. |
sgcAMQP_Client.pas |
6 nouveaux gestionnaires de lecture, 5 nouvelles méthodes d'écriture, 11 nouvelles méthodes publiques, 6 nouveaux événements, table de dispatch mise à jour, correction de l'ordre des paramètres, correction des données de requête. |
