La implementación de AMQP 0-9-1 en sgcWebSockets ha recibido una actualización integral: 11 correcciones de errores que abarcan orden crítico de parámetros, discrepancias de tipos, seguridad de hilos y problemas de pérdida de datos, más 6 nuevas características del protocolo como Basic.Nack, vinculaciones Exchange-to-Exchange, Publisher Confirms, notificaciones Connection.Blocked/Unblocked y Connection.UpdateSecret para refrescar tokens. Este artículo detalla cada cambio con ejemplos de código.
Índice de contenidos
- Correcciones de errores
- Crítico: orden de parámetros de DeclareExchange
- Byte de tipo de la field table
- Correcciones de cumplimiento de la especificación
- Otras correcciones
- Nuevas características
- Basic.Nack — Negative Acknowledgements
- Exchange.Bind/Unbind — Vinculaciones Exchange-to-Exchange
- Clase Confirm — Publisher Confirms
- Connection.Blocked/Unblocked — Alarmas de recursos
- Connection.UpdateSecret — Refresco de token
- Archivos modificados
1. Correcciones de errores
Se han corregido un total de 11 errores en la implementación de AMQP 0-9-1, que van desde problemas críticos de orden de parámetros hasta correcciones de cumplimiento de la especificación.
Crítico: orden de parámetros de DeclareExchange
Los métodos DeclareExchange y
DeclareExchangeEx pasaban
aNoWait,
aAutoDelete y
aInternal en el orden incorrecto a
DoWrite_ExchDeclare. Esto provocaba que el flag
auto-delete se enviara como
no-wait y viceversa, generando un comportamiento inesperado
del exchange en el broker.
Antes (incorrecto)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);
Después (corregido)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);
Archivo: sgcAMQP_Client.pas
Byte de tipo de la field table
El procedimiento sgcWriteAMQPFieldTable siempre escribía
$53 ('S' = long string)
como indicador de tipo para todos los valores de la field table, independientemente del tipo real. Esto significaba que doubles, enteros,
booleanos y valores int64 quedaban marcados incorrectamente como strings en el formato de cable.
Antes (incorrecto)
sgcWriteAMQPByte($53, vBytes); // Always wrote 'S' for ALL types
case vType of
'd': sgcWriteAMQPDouble(...);
'I': sgcWriteAMQPInt32(...);
...
Después (corregido)
case vType of
'd':
begin
sgcWriteAMQPByte(Byte('d'), vBytes); // Correct type per value
sgcWriteAMQPDouble(...);
end;
'I':
begin
sgcWriteAMQPByte(Byte('I'), vBytes);
sgcWriteAMQPInt32(...);
end;
'L':
begin
sgcWriteAMQPByte(Byte('L'), vBytes);
sgcWriteAMQPInt64(...);
end;
't':
begin
sgcWriteAMQPByte(Byte('t'), vBytes);
sgcWriteAMQPBoolean(...);
end;
'S':
begin
sgcWriteAMQPByte(Byte('S'), vBytes);
sgcWriteAMQPLongString(...);
end;
Archivo: sgcAMQP_ReadWrite.pas
Correcciones de cumplimiento de la especificación
| Problema | Antes | Después | Archivo |
|---|---|---|---|
| BasicGetEmpty.Reserved1 con tipo erróneo | UInt16 / sgcReadAMQPUInt16 |
string / sgcReadAMQPShortString |
sgcAMQP_Classes.pas |
| ChannelOpenOk.Reserved1 lectura incorrecta | sgcReadAMQPShortString |
sgcReadAMQPLongString |
sgcAMQP_Classes.pas |
| ConnectionClose/ChannelClose usaban getters de valor de método específicos de clase | sgcGetAMQPConnectionValue(FailMethodId) |
sgcGetAMQPMethodValue(FailClassId, FailMethodId) |
sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas |
Se ha añadido una nueva función helper genérica sgcGetAMQPMethodValue
a sgcAMQP_Helpers.pas para resolver el ID entero de método correcto
para cualquier clase AMQP, sustituyendo a los getters específicos de clase que solo funcionaban con su propia clase.
Otras correcciones
| Error | Descripción | Archivo |
|---|---|---|
| Channel.CloseOk sin ID de canal | DoWrite_ChannCloseOk no establecía oFrame.Header.Channel, así que el close-ok se enviaba por el canal 0 (nivel de conexión) en lugar del canal de destino. Se añadió el parámetro aChannelId: Word. |
sgcAMQP.pas |
| Errata en constante de error | Se cambió 'Now Allowed' por 'Not Allowed'. |
sgcAMQP_Const.pas |
| QueueUnBind sin datos de solicitud | DoWrite_QueueUnBind no almacenaba QueueUnBindQueue ni QueueUnBindExchange en la solicitud del canal, lo que hacía que el evento OnAMQPQueueUnBind informase valores vacíos. |
sgcAMQP_Client.pas |
| Bytes restantes descartados tras DoRead | Cuando el bucle de lectura terminaba con 1–7 bytes restantes (frame parcial), se perdían silenciosamente. Ahora se guardan en FBytes para el siguiente ciclo de lectura. |
sgcAMQP.pas |
| GetChannel ignoraba aRaiseIfNotFound | El parámetro aRaiseIfNotFound nunca se comprobaba. Ahora solo lanza la excepción cuando el flag es True. |
sgcAMQP.pas |
2. Nuevas características
Se han implementado seis nuevas características del protocolo AMQP 0-9-1, que cubren extensiones muy usadas de RabbitMQ y métodos adicionales de la especificación.
Basic.Nack — Negative Acknowledgements
Basic.Nack (clase 60, método 120) es una extensión de RabbitMQ
que permite rechazar uno o varios mensajes a la vez con requeue opcional. A diferencia de
Basic.Reject, soporta un flag
multiple para rechazar todos los mensajes hasta el delivery tag
especificado (incluido).
| Método | Descripción | Dirección |
|---|---|---|
NackMessage |
Envía un acknowledgement negativo al broker. | Cliente → Servidor |
OnAMQPBasicNack |
Se dispara cuando el servidor envía un Nack (en modo publisher confirm). | Servidor → Cliente |
NackMessage
procedure NackMessage(const aChannel: string;
aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);
Evento OnAMQPBasicNack
procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
const aChannel: string;
const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
', Multiple: ' + BoolToStr(aNack.Multiple, True) +
', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;
Exchange.Bind/Unbind — Exchange-to-Exchange Bindings
Las vinculaciones exchange-to-exchange (clase 40, métodos 30/31 y 40/51) permiten enrutar mensajes entre exchanges sin una cola intermedia. Esta extensión de RabbitMQ habilita patrones de topología potentes como jerarquías fan-out y particionado por topic.
| Método | Descripción |
|---|---|
BindExchange / BindExchangeEx |
Vincula un exchange de destino con un exchange de origen mediante una routing key. La variante Ex espera de forma síncrona la respuesta del broker. |
UnbindExchange / UnbindExchangeEx |
Elimina una vinculación exchange-to-exchange. |
OnAMQPExchangeBind |
Se dispara cuando el broker confirma una vinculación de exchange. |
OnAMQPExchangeUnbind |
Se dispara cuando el broker confirma la desvinculación de un exchange. |
BindExchange / BindExchangeEx
procedure BindExchange(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
'downstream-exchange', // destination
'upstream-exchange', // source
'orders.#', // routing key
False) // wait for confirmation
then
Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
'downstream-exchange', 'upstream-exchange', 'orders.#', False);
Confirm Class — Publisher Confirms
Los publisher confirms (clase 85, métodos 10/11) permiten al broker confirmar la recepción de mensajes publicados.
Una vez que un canal entra en modo confirm vía Confirm.Select,
el broker envía Basic.Ack o
Basic.Nack por cada mensaje publicado, habilitando una
publicación fiable sin transacciones.
| Método / Evento | Descripción |
|---|---|
SelectConfirm / SelectConfirmEx |
Activa el modo publisher confirm en un canal. |
OnAMQPConfirmSelectOk |
Se dispara cuando el broker confirma que el modo confirm está activo. |
OnAMQPBasicAck |
Se dispara cuando el broker confirma un mensaje publicado. |
OnAMQPBasicNack |
Se dispara cuando el broker rechaza (NACK) un mensaje publicado. |
SelectConfirm / SelectConfirmEx
procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
Ejemplo: publicación fiable con confirms
// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
const aChannel: string;
const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;
Connection.Blocked/Unblocked — Resource Alarms
Cuando el broker se queda corto de recursos (memoria, disco), envía
Connection.Blocked (clase 10, método 60) con una
cadena de motivo. Cuando la condición desaparece, envía
Connection.Unblocked (método 61). Estas notificaciones son
solo del servidor al cliente. Esta funcionalidad se gestiona en la clase base
TsgcAMQP, haciéndola disponible para todos los componentes AMQP.
| Evento | Descripción |
|---|---|
OnAMQPConnectionBlocked |
Se dispara cuando el broker bloquea la conexión por restricciones de recursos. Incluye una cadena Reason (p. ej., 'low on memory'). |
OnAMQPConnectionUnblocked |
Se dispara cuando el broker libera el bloqueo de la conexión. |
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
const aReason: string);
begin
Log('Connection BLOCKED: ' + aReason);
// Pause publishing to avoid message loss
FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
Log('Connection unblocked - resuming');
FPublishingPaused := False;
end;
Connection.UpdateSecret — Token Refresh
Connection.UpdateSecret (clase 10, método 70) permite
refrescar las credenciales de autenticación en una conexión activa sin reconectar. Es esencial para
autenticación basada en OAuth2/JWT donde los tokens caducan periódicamente.
| Método / Evento | Descripción |
|---|---|
UpdateSecret / UpdateSecretEx |
Envía un nuevo secreto (token) al broker con una cadena de motivo opcional. |
OnAMQPConnectionUpdateSecretOk |
Se dispara cuando el broker acepta el nuevo secreto. |
UpdateSecret / UpdateSecretEx
procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
vNewToken: string;
begin
vNewToken := GetRefreshedOAuthToken();
if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
Log('Token refreshed successfully')
else
Log('Token refresh failed - reconnecting');
end;
3. Archivos modificados
| Archivo | Cambios |
|---|---|
sgcAMQP_Const.pas |
Corrección de errata ('Not Allowed'). |
sgcAMQP_Helpers.pas |
Nueva función sgcGetAMQPMethodValue, helpers de la clase Confirm (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), mapeos de ID de método para todos los nuevos métodos. |
sgcAMQP_ReadWrite.pas |
Corrección del byte de tipo de la field table — cada tipo de valor escribe ahora su indicador de tipo correcto. |
sgcAMQP_Classes.pas |
Nuevos enums (amqpClassConfirm, 12 nuevos métodos), 13 nuevas clases de payload, tablas de despacho actualizadas, corrección de seguridad de hilos, correcciones de cumplimiento de la especificación, nuevos campos de almacenamiento de solicitudes. |
sgcAMQP.pas |
8 nuevos tipos de evento, gestión de Connection.Blocked/Unblocked, corrección del ID de canal en DoWrite_ChannCloseOk, preservación de bytes restantes, corrección del flag en GetChannel. |
sgcAMQP_Client.pas |
6 nuevos handlers de lectura, 5 nuevos métodos de escritura, 11 nuevos métodos públicos, 6 nuevos eventos, tabla de despacho actualizada, corrección del orden de parámetros, corrección de datos de solicitud. |
