Die AMQP-0-9-1-Implementierung in sgcWebSockets hat ein umfassendes Update erhalten: 11 Bugfixes zu kritischer Parameterreihenfolge, Typkonflikten, Thread-Sicherheit und Datenverlust-Problemen sowie 6 neue Protokollfunktionen wie Basic.Nack, Exchange-to-Exchange-Bindings, Publisher Confirms, Connection.Blocked/Unblocked- Benachrichtigungen und Connection.UpdateSecret für die Token-Erneuerung. Dieser Artikel beschreibt jede Änderung mit Codebeispielen.
Inhaltsverzeichnis
- Bugfixes
- Kritisch: Reihenfolge der DeclareExchange-Parameter
- Type-Byte der Field Table
- Korrekturen zur Spec-Compliance
- Weitere Bugfixes
- Neue Funktionen
- Basic.Nack — Negative Bestätigungen
- Exchange.Bind/Unbind — Exchange-zu-Exchange-Bindings
- Confirm-Klasse — Publisher Confirms
- Connection.Blocked/Unblocked — Ressourcen-Alarme
- Connection.UpdateSecret — Token-Erneuerung
- Geänderte Dateien
1. Bugfixes
Insgesamt wurden 11 Bugs in der AMQP-0-9-1-Implementierung behoben, von kritischen Problemen bei der Parameterreihenfolge bis hin zu Korrekturen der Spec-Compliance.
Kritisch: Reihenfolge der DeclareExchange-Parameter
Die Methoden DeclareExchange und
DeclareExchangeEx übergaben
aNoWait,
aAutoDelete und
aInternal in falscher Reihenfolge an
DoWrite_ExchDeclare. Dadurch wurde das
auto-delete-Flag als
no-wait gesendet und umgekehrt, was zu unerwartetem Exchange-
Verhalten am Broker führte.
Vorher (Fehlerhaft)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);
Nachher (Korrigiert)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);
Datei: sgcAMQP_Client.pas
Type-Byte der Field Table
Die Prozedur sgcWriteAMQPFieldTable schrieb stets
$53 ('S' = long string)
als Typindikator für alle Field-Table-Werte, unabhängig vom tatsächlichen Werttyp. Dadurch wurden Doubles, Integer,
Booleans und Int64-Werte im Wire-Format fälschlicherweise alle als Strings markiert.
Vorher (Fehlerhaft)
sgcWriteAMQPByte($53, vBytes); // Always wrote 'S' for ALL types
case vType of
'd': sgcWriteAMQPDouble(...);
'I': sgcWriteAMQPInt32(...);
...
Nachher (Korrigiert)
case vType of
'd':
begin
sgcWriteAMQPByte(Byte('d'), vBytes); // Correct type per value
sgcWriteAMQPDouble(...);
end;
'I':
begin
sgcWriteAMQPByte(Byte('I'), vBytes);
sgcWriteAMQPInt32(...);
end;
'L':
begin
sgcWriteAMQPByte(Byte('L'), vBytes);
sgcWriteAMQPInt64(...);
end;
't':
begin
sgcWriteAMQPByte(Byte('t'), vBytes);
sgcWriteAMQPBoolean(...);
end;
'S':
begin
sgcWriteAMQPByte(Byte('S'), vBytes);
sgcWriteAMQPLongString(...);
end;
Datei: sgcAMQP_ReadWrite.pas
Korrekturen zur Spec-Compliance
| Problem | Vorher | Nachher | Datei |
|---|---|---|---|
| BasicGetEmpty.Reserved1 falscher Typ | UInt16 / sgcReadAMQPUInt16 |
string / sgcReadAMQPShortString |
sgcAMQP_Classes.pas |
| ChannelOpenOk.Reserved1 falscher Lesevorgang | sgcReadAMQPShortString |
sgcReadAMQPLongString |
sgcAMQP_Classes.pas |
| ConnectionClose/ChannelClose verwendeten klassen-spezifische Method-Value-Getter | sgcGetAMQPConnectionValue(FailMethodId) |
sgcGetAMQPMethodValue(FailClassId, FailMethodId) |
sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas |
Eine neue generische Helper-Funktion sgcGetAMQPMethodValue
wurde zu sgcAMQP_Helpers.pas hinzugefügt, um die korrekte
Method-Integer-ID für jede AMQP-Klasse aufzulösen und die klassen-spezifischen Getter zu ersetzen, die nur für ihre eigene Klasse funktionierten.
Weitere Bugfixes
| Bug | Beschreibung | Datei |
|---|---|---|
| Channel.CloseOk fehlende Channel-ID | DoWrite_ChannCloseOk setzte oFrame.Header.Channel nicht, sodass das close-ok auf Channel 0 (Verbindungsebene) statt am Ziel-Channel gesendet wurde. Parameter aChannelId: Word hinzugefügt. |
sgcAMQP.pas |
| Tippfehler in Fehlerkonstante | 'Now Allowed' in 'Not Allowed' geändert. |
sgcAMQP_Const.pas |
| QueueUnBind fehlende Request-Daten | DoWrite_QueueUnBind speicherte QueueUnBindQueue und QueueUnBindExchange nicht in der Channel-Request, sodass das Ereignis OnAMQPQueueUnBind leere Werte meldete. |
sgcAMQP_Client.pas |
| Verbleibende Bytes nach DoRead verworfen | Wenn die Read-Schleife mit 1–7 verbleibenden Bytes (partial frame) endete, gingen diese still verloren. Sie werden jetzt in FBytes für den nächsten Lesezyklus gespeichert. |
sgcAMQP.pas |
| GetChannel ignoriert aRaiseIfNotFound | Der Parameter aRaiseIfNotFound wurde nie geprüft. Eine Exception wird jetzt nur ausgelöst, wenn das Flag True ist. |
sgcAMQP.pas |
2. Neue Funktionen
Sechs neue AMQP-0-9-1-Protokollfunktionen wurden implementiert, die weit verbreitete RabbitMQ-Erweiterungen und zusätzliche Spec-Methoden abdecken.
Basic.Nack — Negative Bestätigungen
Basic.Nack (Klasse 60, Methode 120) ist eine RabbitMQ-
Erweiterung, die das Ablehnen einer oder mehrerer Nachrichten auf einmal mit optionalem Requeue ermöglicht. Im Gegensatz zu
Basic.Reject unterstützt sie ein
multiple-Flag, um alle Nachrichten bis einschließlich des
angegebenen Delivery-Tags abzulehnen.
| Methode | Beschreibung | Richtung |
|---|---|---|
NackMessage |
Sendet eine negative Bestätigung an den Broker. | Client → Server |
OnAMQPBasicNack |
Wird ausgelöst, wenn der Server ein Nack sendet (im Publisher-Confirm-Modus). | Server → Client |
NackMessage
procedure NackMessage(const aChannel: string;
aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);
OnAMQPBasicNack-Ereignis
procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
const aChannel: string;
const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
', Multiple: ' + BoolToStr(aNack.Multiple, True) +
', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;
Exchange.Bind/Unbind — Exchange-zu-Exchange-Bindings
Exchange-zu-Exchange-Bindings (Klasse 40, Methoden 30/31 und 40/51) ermöglichen das Routing von Nachrichten zwischen Exchanges ohne eine zwischengeschaltete Queue. Das ist eine RabbitMQ-Erweiterung, die mächtige Topologie-Muster wie Fan-out-Hierarchien und Topic-Partitionierung ermöglicht.
| Methode | Beschreibung |
|---|---|
BindExchange / BindExchangeEx |
Bindet einen Ziel-Exchange an einen Quell-Exchange mit einem Routing-Schlüssel. Die Ex-Variante wartet synchron auf die Broker-Antwort. |
UnbindExchange / UnbindExchangeEx |
Entfernt ein Exchange-zu-Exchange-Binding. |
OnAMQPExchangeBind |
Wird ausgelöst, wenn der Broker einen Exchange-Bind bestätigt. |
OnAMQPExchangeUnbind |
Wird ausgelöst, wenn der Broker einen Exchange-Unbind bestätigt. |
BindExchange / BindExchangeEx
procedure BindExchange(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
'downstream-exchange', // destination
'upstream-exchange', // source
'orders.#', // routing key
False) // wait for confirmation
then
Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
'downstream-exchange', 'upstream-exchange', 'orders.#', False);
Confirm-Klasse — Publisher Confirms
Publisher Confirms (Klasse 85, Methoden 10/11) ermöglichen dem Broker, den Empfang veröffentlichter Nachrichten zu bestätigen.
Sobald ein Channel über Confirm.Select in den Confirm-Modus versetzt wird,
sendet der Broker Basic.Ack oder
Basic.Nack für jede veröffentlichte Nachricht, was
zuverlässiges Publishing ohne Transaktionen ermöglicht.
| Methode / Ereignis | Beschreibung |
|---|---|
SelectConfirm / SelectConfirmEx |
Aktiviert den Publisher-Confirm-Modus auf einem Channel. |
OnAMQPConfirmSelectOk |
Wird ausgelöst, wenn der Broker bestätigt, dass der Confirm-Modus aktiv ist. |
OnAMQPBasicAck |
Wird ausgelöst, wenn der Broker eine veröffentlichte Nachricht bestätigt. |
OnAMQPBasicNack |
Wird ausgelöst, wenn der Broker eine veröffentlichte Nachricht negativ bestätigt. |
SelectConfirm / SelectConfirmEx
procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
Beispiel: Zuverlässiges Publishing mit Confirms
// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
const aChannel: string;
const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;
Connection.Blocked/Unblocked — Ressourcen-Alarme
Wenn dem Broker die Ressourcen ausgehen (Speicher, Disk), sendet er
Connection.Blocked (Klasse 10, Methode 60) mit einem
Grund-String. Wenn die Situation behoben ist, sendet er
Connection.Unblocked (Methode 61). Das sind reine Server-zu-Client-
Benachrichtigungen. Diese Funktion wird in der Basis-
Klasse TsgcAMQP behandelt, sodass sie allen AMQP-Komponenten zur Verfügung steht.
| Ereignis | Beschreibung |
|---|---|
OnAMQPConnectionBlocked |
Wird ausgelöst, wenn der Broker die Verbindung wegen Ressourcenbeschränkungen blockiert. Enthält einen Reason-String (z.B. 'low on memory'). |
OnAMQPConnectionUnblocked |
Wird ausgelöst, wenn der Broker die Verbindungsblockade aufhebt. |
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
const aReason: string);
begin
Log('Connection BLOCKED: ' + aReason);
// Pause publishing to avoid message loss
FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
Log('Connection unblocked - resuming');
FPublishingPaused := False;
end;
Connection.UpdateSecret — Token-Erneuerung
Connection.UpdateSecret (Klasse 10, Methode 70) ermöglicht
das Erneuern der Authentifizierungsdaten auf einer aktiven Verbindung, ohne sich neu zu verbinden. Das ist wichtig für
OAuth2/JWT-basierte Authentifizierung, bei der Tokens periodisch ablaufen.
| Methode / Ereignis | Beschreibung |
|---|---|
UpdateSecret / UpdateSecretEx |
Sendet ein neues Secret (Token) an den Broker mit einem optionalen Grund-String. |
OnAMQPConnectionUpdateSecretOk |
Wird ausgelöst, wenn der Broker das neue Secret akzeptiert. |
UpdateSecret / UpdateSecretEx
procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
vNewToken: string;
begin
vNewToken := GetRefreshedOAuthToken();
if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
Log('Token refreshed successfully')
else
Log('Token refresh failed - reconnecting');
end;
3. Geänderte Dateien
| Datei | Änderungen |
|---|---|
sgcAMQP_Const.pas |
Tippfehler-Korrektur ('Not Allowed'). |
sgcAMQP_Helpers.pas |
Neue Funktion sgcGetAMQPMethodValue, Confirm-Klassen-Helper (sgcGetAMQPConfirm/sgcGetAMQPConfirmValue), Method-ID-Mappings für alle neuen Methoden. |
sgcAMQP_ReadWrite.pas |
Field-Table-Type-Byte-Fix — jeder Werttyp schreibt jetzt seinen korrekten Typindikator. |
sgcAMQP_Classes.pas |
Neue Enums (amqpClassConfirm, 12 neue Methoden), 13 neue Payload-Klassen, aktualisierte Dispatch-Tabellen, Thread-Sicherheits-Fix, Spec-Compliance-Fixes, neue Request-Speicherfelder. |
sgcAMQP.pas |
8 neue Ereignistypen, Connection.Blocked/Unblocked-Handling, DoWrite_ChannCloseOk-Channel-ID-Fix, Erhalt verbleibender Bytes, GetChannel-Flag-Fix. |
sgcAMQP_Client.pas |
6 neue Read-Handler, 5 neue Write-Methoden, 11 neue öffentliche Methoden, 6 neue Ereignisse, aktualisierte Dispatch-Tabelle, Parameter-Reihenfolge-Fix, Request-Daten-Fix. |
