Actualización del cliente AMQP 0.9.1 para Delphi

· Características

La implementación de AMQP 0-9-1 en sgcWebSockets ha recibido una actualización integral: 11 correcciones de errores que abarcan orden crítico de parámetros, discrepancias de tipos, seguridad de hilos y problemas de pérdida de datos, más 6 nuevas características del protocolo como Basic.Nack, vinculaciones Exchange-to-Exchange, Publisher Confirms, notificaciones Connection.Blocked/Unblocked y Connection.UpdateSecret para refrescar tokens. Este artículo detalla cada cambio con ejemplos de código.

Índice de contenidos

  1. Correcciones de errores
  2. Crítico: orden de parámetros de DeclareExchange
  3. Byte de tipo de la field table
  4. Correcciones de cumplimiento de la especificación
  5. Otras correcciones
  6. Nuevas características
  7. Basic.Nack — Negative Acknowledgements
  8. Exchange.Bind/Unbind — Vinculaciones Exchange-to-Exchange
  9. Clase Confirm — Publisher Confirms
  10. Connection.Blocked/Unblocked — Alarmas de recursos
  11. Connection.UpdateSecret — Refresco de token
  12. Archivos modificados

1. Correcciones de errores

Se han corregido un total de 11 errores en la implementación de AMQP 0-9-1, que van desde problemas críticos de orden de parámetros hasta correcciones de cumplimiento de la especificación.

Crítico: orden de parámetros de DeclareExchange

Los métodos DeclareExchange y DeclareExchangeEx pasaban aNoWait, aAutoDelete y aInternal en el orden incorrecto a DoWrite_ExchDeclare. Esto provocaba que el flag auto-delete se enviara como no-wait y viceversa, generando un comportamiento inesperado del exchange en el broker.

Antes (incorrecto)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);

Después (corregido)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);

Archivo: sgcAMQP_Client.pas


Byte de tipo de la field table

El procedimiento sgcWriteAMQPFieldTable siempre escribía $53 ('S' = long string) como indicador de tipo para todos los valores de la field table, independientemente del tipo real. Esto significaba que doubles, enteros, booleanos y valores int64 quedaban marcados incorrectamente como strings en el formato de cable.

Antes (incorrecto)

sgcWriteAMQPByte($53, vBytes);  // Always wrote 'S' for ALL types
case vType of
  'd': sgcWriteAMQPDouble(...);
  'I': sgcWriteAMQPInt32(...);
  ...

Después (corregido)

case vType of
  'd':
  begin
    sgcWriteAMQPByte(Byte('d'), vBytes);  // Correct type per value
    sgcWriteAMQPDouble(...);
  end;
  'I':
  begin
    sgcWriteAMQPByte(Byte('I'), vBytes);
    sgcWriteAMQPInt32(...);
  end;
  'L':
  begin
    sgcWriteAMQPByte(Byte('L'), vBytes);
    sgcWriteAMQPInt64(...);
  end;
  't':
  begin
    sgcWriteAMQPByte(Byte('t'), vBytes);
    sgcWriteAMQPBoolean(...);
  end;
  'S':
  begin
    sgcWriteAMQPByte(Byte('S'), vBytes);
    sgcWriteAMQPLongString(...);
  end;

Archivo: sgcAMQP_ReadWrite.pas


Correcciones de cumplimiento de la especificación

Problema Antes Después Archivo
BasicGetEmpty.Reserved1 con tipo erróneo UInt16 / sgcReadAMQPUInt16 string / sgcReadAMQPShortString sgcAMQP_Classes.pas
ChannelOpenOk.Reserved1 lectura incorrecta sgcReadAMQPShortString sgcReadAMQPLongString sgcAMQP_Classes.pas
ConnectionClose/ChannelClose usaban getters de valor de método específicos de clase sgcGetAMQPConnectionValue(FailMethodId) sgcGetAMQPMethodValue(FailClassId, FailMethodId) sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas

Se ha añadido una nueva función helper genérica sgcGetAMQPMethodValue a sgcAMQP_Helpers.pas para resolver el ID entero de método correcto para cualquier clase AMQP, sustituyendo a los getters específicos de clase que solo funcionaban con su propia clase.


Otras correcciones

Error Descripción Archivo
Channel.CloseOk sin ID de canal DoWrite_ChannCloseOk no establecía oFrame.Header.Channel, así que el close-ok se enviaba por el canal 0 (nivel de conexión) en lugar del canal de destino. Se añadió el parámetro aChannelId: Word. sgcAMQP.pas
Errata en constante de error Se cambió 'Now Allowed' por 'Not Allowed'. sgcAMQP_Const.pas
QueueUnBind sin datos de solicitud DoWrite_QueueUnBind no almacenaba QueueUnBindQueue ni QueueUnBindExchange en la solicitud del canal, lo que hacía que el evento OnAMQPQueueUnBind informase valores vacíos. sgcAMQP_Client.pas
Bytes restantes descartados tras DoRead Cuando el bucle de lectura terminaba con 1–7 bytes restantes (frame parcial), se perdían silenciosamente. Ahora se guardan en FBytes para el siguiente ciclo de lectura. sgcAMQP.pas
GetChannel ignoraba aRaiseIfNotFound El parámetro aRaiseIfNotFound nunca se comprobaba. Ahora solo lanza la excepción cuando el flag es True. sgcAMQP.pas

2. Nuevas características

Se han implementado seis nuevas características del protocolo AMQP 0-9-1, que cubren extensiones muy usadas de RabbitMQ y métodos adicionales de la especificación.


Basic.Nack — Negative Acknowledgements

Basic.Nack (clase 60, método 120) es una extensión de RabbitMQ que permite rechazar uno o varios mensajes a la vez con requeue opcional. A diferencia de Basic.Reject, soporta un flag multiple para rechazar todos los mensajes hasta el delivery tag especificado (incluido).

Método Descripción Dirección
NackMessage Envía un acknowledgement negativo al broker. Cliente → Servidor
OnAMQPBasicNack Se dispara cuando el servidor envía un Nack (en modo publisher confirm). Servidor → Cliente

NackMessage

procedure NackMessage(const aChannel: string;
  aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);

Evento OnAMQPBasicNack

procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
  const aChannel: string;
  const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
  Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aNack.Multiple, True) +
    ', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;

Exchange.Bind/Unbind — Exchange-to-Exchange Bindings

Las vinculaciones exchange-to-exchange (clase 40, métodos 30/31 y 40/51) permiten enrutar mensajes entre exchanges sin una cola intermedia. Esta extensión de RabbitMQ habilita patrones de topología potentes como jerarquías fan-out y particionado por topic.

Método Descripción
BindExchange / BindExchangeEx Vincula un exchange de destino con un exchange de origen mediante una routing key. La variante Ex espera de forma síncrona la respuesta del broker.
UnbindExchange / UnbindExchangeEx Elimina una vinculación exchange-to-exchange.
OnAMQPExchangeBind Se dispara cuando el broker confirma una vinculación de exchange.
OnAMQPExchangeUnbind Se dispara cuando el broker confirma la desvinculación de un exchange.

BindExchange / BindExchangeEx

procedure BindExchange(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
  const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
  'downstream-exchange',  // destination
  'upstream-exchange',    // source
  'orders.#',             // routing key
  False)                  // wait for confirmation
then
  Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
  'downstream-exchange', 'upstream-exchange', 'orders.#', False);

Confirm Class — Publisher Confirms

Los publisher confirms (clase 85, métodos 10/11) permiten al broker confirmar la recepción de mensajes publicados. Una vez que un canal entra en modo confirm vía Confirm.Select, el broker envía Basic.Ack o Basic.Nack por cada mensaje publicado, habilitando una publicación fiable sin transacciones.

Método / Evento Descripción
SelectConfirm / SelectConfirmEx Activa el modo publisher confirm en un canal.
OnAMQPConfirmSelectOk Se dispara cuando el broker confirma que el modo confirm está activo.
OnAMQPBasicAck Se dispara cuando el broker confirma un mensaje publicado.
OnAMQPBasicNack Se dispara cuando el broker rechaza (NACK) un mensaje publicado.

SelectConfirm / SelectConfirmEx

procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;

Ejemplo: publicación fiable con confirms

// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
  Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
  'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
  const aChannel: string;
  const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
  Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;

Connection.Blocked/Unblocked — Resource Alarms

Cuando el broker se queda corto de recursos (memoria, disco), envía Connection.Blocked (clase 10, método 60) con una cadena de motivo. Cuando la condición desaparece, envía Connection.Unblocked (método 61). Estas notificaciones son solo del servidor al cliente. Esta funcionalidad se gestiona en la clase base TsgcAMQP, haciéndola disponible para todos los componentes AMQP.

Evento Descripción
OnAMQPConnectionBlocked Se dispara cuando el broker bloquea la conexión por restricciones de recursos. Incluye una cadena Reason (p. ej., 'low on memory').
OnAMQPConnectionUnblocked Se dispara cuando el broker libera el bloqueo de la conexión.
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
  const aReason: string);
begin
  Log('Connection BLOCKED: ' + aReason);
  // Pause publishing to avoid message loss
  FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
  Log('Connection unblocked - resuming');
  FPublishingPaused := False;
end;

Connection.UpdateSecret — Token Refresh

Connection.UpdateSecret (clase 10, método 70) permite refrescar las credenciales de autenticación en una conexión activa sin reconectar. Es esencial para autenticación basada en OAuth2/JWT donde los tokens caducan periódicamente.

Método / Evento Descripción
UpdateSecret / UpdateSecretEx Envía un nuevo secreto (token) al broker con una cadena de motivo opcional.
OnAMQPConnectionUpdateSecretOk Se dispara cuando el broker acepta el nuevo secreto.

UpdateSecret / UpdateSecretEx

procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
  vNewToken: string;
begin
  vNewToken := GetRefreshedOAuthToken();
  if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
    Log('Token refreshed successfully')
  else
    Log('Token refresh failed - reconnecting');
end;

3. Archivos modificados

Archivo Cambios
sgcAMQP_Const.pas Corrección de errata ('Not Allowed').
sgcAMQP_Helpers.pas Nueva función sgcGetAMQPMethodValue, helpers de la clase Confirm (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), mapeos de ID de método para todos los nuevos métodos.
sgcAMQP_ReadWrite.pas Corrección del byte de tipo de la field table — cada tipo de valor escribe ahora su indicador de tipo correcto.
sgcAMQP_Classes.pas Nuevos enums (amqpClassConfirm, 12 nuevos métodos), 13 nuevas clases de payload, tablas de despacho actualizadas, corrección de seguridad de hilos, correcciones de cumplimiento de la especificación, nuevos campos de almacenamiento de solicitudes.
sgcAMQP.pas 8 nuevos tipos de evento, gestión de Connection.Blocked/Unblocked, corrección del ID de canal en DoWrite_ChannCloseOk, preservación de bytes restantes, corrección del flag en GetChannel.
sgcAMQP_Client.pas 6 nuevos handlers de lectura, 5 nuevos métodos de escritura, 11 nuevos métodos públicos, 6 nuevos eventos, tabla de despacho actualizada, corrección del orden de parámetros, corrección de datos de solicitud.