Atualização do Cliente Delphi AMQP 0.9.1

· Recursos

The AMQP 0-9-1 implementation em sgcWebSockets has received um comprehensive update: 11 bug fixes covering critical parameter ordering, type mismatches, thread safety, e data loss issues, plus 6 new protocol features incluindo Basic.Nack, Exchange-to-Exchange bindings, Publisher Confirms, Conexão.Blocked/Unblocked notificações, e Conexão.UpdateSecret para token refresh. Este artigo details every change with code exemplos.

Sumário

  1. Bug Fixes
  2. Critical: DeclareExchange Parameter Order
  3. Field Table Type Byte
  4. Spec Compliance Fixes
  5. Other Bug Fixes
  6. New Features
  7. Basic.Nack — Negative Acknowledgements
  8. Exchange.Bind/Unbind — Exchange-to-Exchange Bindings
  9. Confirm Class — Publisher Confirms
  10. Conexão.Blocked/Unblocked — Resource Alarms
  11. Conexão.UpdateSecret — Token Refresh
  12. Files Modified

1. Correções de Bugs

A total de 11 bugs foram fixed across o AMQP 0-9-1 implementation, ranging um partir de critical parameter ordering issues para spec compliance corrections.

Critical: DeclareExchange Parameter Order

The DeclareExchange and DeclareExchangeEx métodos passed aNoWait, aAutoDelete, and aInternal no wrong order to DoWrite_ExchDeclare. This caused the auto-delete flag para be sent as no-wait e vice versa, leading para unexpected exchange behavior no broker.

Before (Incorrect)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);

After (Fixed)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);

File: sgcAMQP_Client.pas


Field Table Type Byte

The sgcWriteAMQPFieldTable procedure sempre wrote $53 ('S' = long string) como o type indicator para all field table values, regardless do actual value type. This meant doubles, integers, booleans, e int64 values were all incorrectly tagged como strings no wire format.

Before (Incorrect)

sgcWriteAMQPByte($53, vBytes);  // Always wrote 'S' for ALL types
case vType of
  'd': sgcWriteAMQPDouble(...);
  'I': sgcWriteAMQPInt32(...);
  ...

After (Fixed)

case vType of
  'd':
  begin
    sgcWriteAMQPByte(Byte('d'), vBytes);  // Correct type per value
    sgcWriteAMQPDouble(...);
  end;
  'I':
  begin
    sgcWriteAMQPByte(Byte('I'), vBytes);
    sgcWriteAMQPInt32(...);
  end;
  'L':
  begin
    sgcWriteAMQPByte(Byte('L'), vBytes);
    sgcWriteAMQPInt64(...);
  end;
  't':
  begin
    sgcWriteAMQPByte(Byte('t'), vBytes);
    sgcWriteAMQPBoolean(...);
  end;
  'S':
  begin
    sgcWriteAMQPByte(Byte('S'), vBytes);
    sgcWriteAMQPLongString(...);
  end;

File: sgcAMQP_ReadWrite.pas


Spec Compliance Fixes

Issue Before After File
BasicGetEmpty.Reserved1 wrong type UInt16 / sgcReadAMQPUInt16 string / sgcReadAMQPShortString sgcAMQP_Classes.pas
ChannelOpenOk.Reserved1 wrong read sgcReadAMQPShortString sgcReadAMQPLongString sgcAMQP_Classes.pas
ConnectionClose/ChannelClose used class-specific method value getters sgcGetAMQPConnectionValue(FailMethodId) sgcGetAMQPMethodValue(FailClassId, FailMethodId) sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas

A new generic helper function sgcGetAMQPMethodValue was added para sgcAMQP_Helpers.pas para resolve o correct method integer ID para any AMQP class, replacing o class-specific getters that somente worked para their own class.


Other Bug Fixes

Bug Description File
Channel.CloseOk missing channel ID DoWrite_ChannCloseOk did not set oFrame.Header.Channel, so o fechar-ok was sent em channel 0 (conexão level) instead do target channel. Added aChannelId: Word parameter. sgcAMQP.pas
Typo em error constant Changed 'Now Allowed' para 'Not Allowed'. sgcAMQP_Const.pas
QueueUnBind missing request data DoWrite_QueueUnBind did not store QueueUnBindQueue e QueueUnBindExchange no channel request, causing o OnAMQPQueueUnBind event para report empty values. sgcAMQP_Client.pas
Remaining bytes discarded after DoRead Quando o read loop exited com 1–7 remaining bytes (partial frame), they were silently lost. Now saved para FBytes para o next read cycle. sgcAMQP.pas
GetChannel ignores aRaiseIfNotFound The aRaiseIfNotFound parameter was nunca checked. Now somente raises quando o flag is True. sgcAMQP.pas

2. Novos Recursos

Six new AMQP 0-9-1 protocol features foram implemented, covering widely-used RabbitMQ extensions and additional spec métodos.


Basic.Nack — Negative Acknowledgements

Basic.Nack (class 60, method 120) is um RabbitMQ extension that allows rejecting one ou more messages em once com optional requeue. Unlike Basic.Reject, it suporta a multiple flag para reject all messages até e incluindo the specified delivery tag.

Method Description Direction
NackMessage Send um negative acknowledgement para o broker. Client → Server
OnAMQPBasicNack Fired when o servidor sends um Nack (in publisher confirm mode). Server → Client

NackMessage

procedure NackMessage(const aChannel: string;
  aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);

OnAMQPBasicNack Event

procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
  const aChannel: string;
  const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
  Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aNack.Multiple, True) +
    ', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;

Exchange.Bind/Unbind — Exchange-to-Exchange Bindings

Exchange-to-exchange bindings (class 40, métodos 30/31 e 40/51) allow routing messages entre exchanges sem um intermediate queue. This is um RabbitMQ extension that enables powerful topology patterns como fan-out hierarchies e topic partitioning.

Method Description
BindExchange / BindExchangeEx Bind um destination exchange para um source exchange com um routing key. The Ex variant waits synchronously para o broker response.
UnbindExchange / UnbindExchangeEx Remover um exchange-to-exchange binding.
OnAMQPExchangeBind Fired quando o broker confirms um exchange bind.
OnAMQPExchangeUnbind Fired quando o broker confirms um exchange unbind.

BindExchange / BindExchangeEx

procedure BindExchange(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
  const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
  'downstream-exchange',  // destination
  'upstream-exchange',    // source
  'orders.#',             // routing key
  False)                  // wait for confirmation
then
  Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
  'downstream-exchange', 'upstream-exchange', 'orders.#', False);

Confirm Class — Publisher Confirms

Publisher confirms (class 85, métodos 10/11) allow o broker para acknowledge receipt de published messages. Once um canal is put into confirm mode via Confirm.Select, o broker will send Basic.Ack or Basic.Nack para each published message, enabling reliable publishing sem transactions.

Method / Event Description
SelectConfirm / SelectConfirmEx Enable publisher confirm mode em um canal.
OnAMQPConfirmSelectOk Fired quando o broker confirms that confirm mode is active.
OnAMQPBasicAck Fired quando o broker acknowledges um published message.
OnAMQPBasicNack Fired quando o broker negatively acknowledges um published message.

SelectConfirm / SelectConfirmEx

procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;

Exemplo: Reliable Publishing com Confirms

// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
  Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
  'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
  const aChannel: string;
  const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
  Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;

Conexão.Blocked/Unblocked — Resource Alarms

Quando o broker runs low em resources (memory, disk), it sends Connection.Blocked (class 10, method 60) com um reason string. Quando o condition clears, it sends Connection.Unblocked (method 61). These are server-to-client somente notificações. This feature is handled no base TsgcAMQP class, making it available para all AMQP components.

Event Description
OnAMQPConnectionBlocked Fired quando o broker blocks o conexão due para resource constraints. Includes um Reason string (e.g., 'low on memory').
OnAMQPConnectionUnblocked Fired quando o broker lifts o conexão block.
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
  const aReason: string);
begin
  Log('Connection BLOCKED: ' + aReason);
  // Pause publishing to avoid message loss
  FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
  Log('Connection unblocked - resuming');
  FPublishingPaused := False;
end;

Conexão.UpdateSecret — Token Refresh

Connection.UpdateSecret (class 10, method 70) allows refreshing o autenticação credentials em um active conexão sem reconnecting. This is essential for OAuth2/JWT-based autenticação where tokens expire periodically.

Method / Event Description
UpdateSecret / UpdateSecretEx Send um novo secret (token) para o broker com um optional reason string.
OnAMQPConnectionUpdateSecretOk Fired quando o broker accepts o novo secret.

UpdateSecret / UpdateSecretEx

procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
  vNewToken: string;
begin
  vNewToken := GetRefreshedOAuthToken();
  if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
    Log('Token refreshed successfully')
  else
    Log('Token refresh failed - reconnecting');
end;

3. Arquivos Modificados

File Changes
sgcAMQP_Const.pas Typo fix ('Not Allowed').
sgcAMQP_Helpers.pas New sgcGetAMQPMethodValue function, Confirm class helpers (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), method ID mappings para all new métodos.
sgcAMQP_ReadWrite.pas Field table type byte fix — each value type now writes its correct type indicator.
sgcAMQP_Classes.pas New enums (amqpClassConfirm, 12 new métodos), 13 new payload classes, updated dispatch tables, thread safety fix, spec compliance fixes, new request storage fields.
sgcAMQP.pas 8 new event types, Conexão.Blocked/Unblocked handling, DoWrite_ChannCloseOk channel ID fix, remaining bytes preservation, GetChannel flag fix.
sgcAMQP_Client.pas 6 new read handlers, 5 new write métodos, 11 new public métodos, 6 new events, updated dispatch table, parameter order fix, request data fix.