The AMQP 0-9-1 implementation em sgcWebSockets has received um comprehensive update: 11 bug fixes covering critical parameter ordering, type mismatches, thread safety, e data loss issues, plus 6 new protocol features incluindo Basic.Nack, Exchange-to-Exchange bindings, Publisher Confirms, Conexão.Blocked/Unblocked notificações, e Conexão.UpdateSecret para token refresh. Este artigo details every change with code exemplos.
Sumário
- Bug Fixes
- Critical: DeclareExchange Parameter Order
- Field Table Type Byte
- Spec Compliance Fixes
- Other Bug Fixes
- New Features
- Basic.Nack — Negative Acknowledgements
- Exchange.Bind/Unbind — Exchange-to-Exchange Bindings
- Confirm Class — Publisher Confirms
- Conexão.Blocked/Unblocked — Resource Alarms
- Conexão.UpdateSecret — Token Refresh
- Files Modified
1. Correções de Bugs
A total de 11 bugs foram fixed across o AMQP 0-9-1 implementation, ranging um partir de critical parameter ordering issues para spec compliance corrections.
Critical: DeclareExchange Parameter Order
The DeclareExchange and
DeclareExchangeEx métodos passed
aNoWait,
aAutoDelete, and
aInternal no wrong order to
DoWrite_ExchDeclare. This caused the
auto-delete flag para be sent as
no-wait e vice versa, leading para unexpected exchange
behavior no broker.
Before (Incorrect)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);
After (Fixed)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);
File: sgcAMQP_Client.pas
Field Table Type Byte
The sgcWriteAMQPFieldTable procedure sempre wrote
$53 ('S' = long string)
como o type indicator para all field table values, regardless do actual value type. This meant doubles, integers,
booleans, e int64 values were all incorrectly tagged como strings no wire format.
Before (Incorrect)
sgcWriteAMQPByte($53, vBytes); // Always wrote 'S' for ALL types
case vType of
'd': sgcWriteAMQPDouble(...);
'I': sgcWriteAMQPInt32(...);
...
After (Fixed)
case vType of
'd':
begin
sgcWriteAMQPByte(Byte('d'), vBytes); // Correct type per value
sgcWriteAMQPDouble(...);
end;
'I':
begin
sgcWriteAMQPByte(Byte('I'), vBytes);
sgcWriteAMQPInt32(...);
end;
'L':
begin
sgcWriteAMQPByte(Byte('L'), vBytes);
sgcWriteAMQPInt64(...);
end;
't':
begin
sgcWriteAMQPByte(Byte('t'), vBytes);
sgcWriteAMQPBoolean(...);
end;
'S':
begin
sgcWriteAMQPByte(Byte('S'), vBytes);
sgcWriteAMQPLongString(...);
end;
File: sgcAMQP_ReadWrite.pas
Spec Compliance Fixes
| Issue | Before | After | File |
|---|---|---|---|
| BasicGetEmpty.Reserved1 wrong type | UInt16 / sgcReadAMQPUInt16 |
string / sgcReadAMQPShortString |
sgcAMQP_Classes.pas |
| ChannelOpenOk.Reserved1 wrong read | sgcReadAMQPShortString |
sgcReadAMQPLongString |
sgcAMQP_Classes.pas |
| ConnectionClose/ChannelClose used class-specific method value getters | sgcGetAMQPConnectionValue(FailMethodId) |
sgcGetAMQPMethodValue(FailClassId, FailMethodId) |
sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas |
A new generic helper function sgcGetAMQPMethodValue
was added para sgcAMQP_Helpers.pas para resolve o correct
method integer ID para any AMQP class, replacing o class-specific getters that somente worked para their own class.
Other Bug Fixes
| Bug | Description | File |
|---|---|---|
| Channel.CloseOk missing channel ID | DoWrite_ChannCloseOk did not set oFrame.Header.Channel, so o fechar-ok was sent em channel 0 (conexão level) instead do target channel. Added aChannelId: Word parameter. |
sgcAMQP.pas |
| Typo em error constant | Changed 'Now Allowed' para 'Not Allowed'. |
sgcAMQP_Const.pas |
| QueueUnBind missing request data | DoWrite_QueueUnBind did not store QueueUnBindQueue e QueueUnBindExchange no channel request, causing o OnAMQPQueueUnBind event para report empty values. |
sgcAMQP_Client.pas |
| Remaining bytes discarded after DoRead | Quando o read loop exited com 1–7 remaining bytes (partial frame), they were silently lost. Now saved para FBytes para o next read cycle. |
sgcAMQP.pas |
| GetChannel ignores aRaiseIfNotFound | The aRaiseIfNotFound parameter was nunca checked. Now somente raises quando o flag is True. |
sgcAMQP.pas |
2. Novos Recursos
Six new AMQP 0-9-1 protocol features foram implemented, covering widely-used RabbitMQ extensions and additional spec métodos.
Basic.Nack — Negative Acknowledgements
Basic.Nack (class 60, method 120) is um RabbitMQ
extension that allows rejecting one ou more messages em once com optional requeue. Unlike
Basic.Reject, it suporta a
multiple flag para reject all messages até e incluindo the
specified delivery tag.
| Method | Description | Direction |
|---|---|---|
NackMessage |
Send um negative acknowledgement para o broker. | Client → Server |
OnAMQPBasicNack |
Fired when o servidor sends um Nack (in publisher confirm mode). | Server → Client |
NackMessage
procedure NackMessage(const aChannel: string;
aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);
OnAMQPBasicNack Event
procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
const aChannel: string;
const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
', Multiple: ' + BoolToStr(aNack.Multiple, True) +
', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;
Exchange.Bind/Unbind — Exchange-to-Exchange Bindings
Exchange-to-exchange bindings (class 40, métodos 30/31 e 40/51) allow routing messages entre exchanges sem um intermediate queue. This is um RabbitMQ extension that enables powerful topology patterns como fan-out hierarchies e topic partitioning.
| Method | Description |
|---|---|
BindExchange / BindExchangeEx |
Bind um destination exchange para um source exchange com um routing key. The Ex variant waits synchronously para o broker response. |
UnbindExchange / UnbindExchangeEx |
Remover um exchange-to-exchange binding. |
OnAMQPExchangeBind |
Fired quando o broker confirms um exchange bind. |
OnAMQPExchangeUnbind |
Fired quando o broker confirms um exchange unbind. |
BindExchange / BindExchangeEx
procedure BindExchange(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
'downstream-exchange', // destination
'upstream-exchange', // source
'orders.#', // routing key
False) // wait for confirmation
then
Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
'downstream-exchange', 'upstream-exchange', 'orders.#', False);
Confirm Class — Publisher Confirms
Publisher confirms (class 85, métodos 10/11) allow o broker para acknowledge receipt de published messages.
Once um canal is put into confirm mode via Confirm.Select,
o broker will send Basic.Ack or
Basic.Nack para each published message, enabling
reliable publishing sem transactions.
| Method / Event | Description |
|---|---|
SelectConfirm / SelectConfirmEx |
Enable publisher confirm mode em um canal. |
OnAMQPConfirmSelectOk |
Fired quando o broker confirms that confirm mode is active. |
OnAMQPBasicAck |
Fired quando o broker acknowledges um published message. |
OnAMQPBasicNack |
Fired quando o broker negatively acknowledges um published message. |
SelectConfirm / SelectConfirmEx
procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
Exemplo: Reliable Publishing com Confirms
// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
const aChannel: string;
const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;
Conexão.Blocked/Unblocked — Resource Alarms
Quando o broker runs low em resources (memory, disk), it sends
Connection.Blocked (class 10, method 60) com um
reason string. Quando o condition clears, it sends
Connection.Unblocked (method 61). These are server-to-client
somente notificações. This feature is handled no base
TsgcAMQP class, making it available para all AMQP components.
| Event | Description |
|---|---|
OnAMQPConnectionBlocked |
Fired quando o broker blocks o conexão due para resource constraints. Includes um Reason string (e.g., 'low on memory'). |
OnAMQPConnectionUnblocked |
Fired quando o broker lifts o conexão block. |
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
const aReason: string);
begin
Log('Connection BLOCKED: ' + aReason);
// Pause publishing to avoid message loss
FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
Log('Connection unblocked - resuming');
FPublishingPaused := False;
end;
Conexão.UpdateSecret — Token Refresh
Connection.UpdateSecret (class 10, method 70) allows
refreshing o autenticação credentials em um active conexão sem reconnecting. This is essential for
OAuth2/JWT-based autenticação where tokens expire periodically.
| Method / Event | Description |
|---|---|
UpdateSecret / UpdateSecretEx |
Send um novo secret (token) para o broker com um optional reason string. |
OnAMQPConnectionUpdateSecretOk |
Fired quando o broker accepts o novo secret. |
UpdateSecret / UpdateSecretEx
procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
vNewToken: string;
begin
vNewToken := GetRefreshedOAuthToken();
if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
Log('Token refreshed successfully')
else
Log('Token refresh failed - reconnecting');
end;
3. Arquivos Modificados
| File | Changes |
|---|---|
sgcAMQP_Const.pas |
Typo fix ('Not Allowed'). |
sgcAMQP_Helpers.pas |
New sgcGetAMQPMethodValue function, Confirm class helpers (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), method ID mappings para all new métodos. |
sgcAMQP_ReadWrite.pas |
Field table type byte fix — each value type now writes its correct type indicator. |
sgcAMQP_Classes.pas |
New enums (amqpClassConfirm, 12 new métodos), 13 new payload classes, updated dispatch tables, thread safety fix, spec compliance fixes, new request storage fields. |
sgcAMQP.pas |
8 new event types, Conexão.Blocked/Unblocked handling, DoWrite_ChannCloseOk channel ID fix, remaining bytes preservation, GetChannel flag fix. |
sgcAMQP_Client.pas |
6 new read handlers, 5 new write métodos, 11 new public métodos, 6 new events, updated dispatch table, parameter order fix, request data fix. |
