sgcWebSockets の AMQP 0-9-1 実装は包括的なアップデートを受けました。重要なパラメーター順序、型の不一致、スレッドセーフティ、データ損失の問題に対する 11 件のバグ修正と、Basic.Nack、Exchange-to-Exchange バインディング、Publisher Confirms、Connection.Blocked/Unblocked 通知、トークン更新のための Connection.UpdateSecret を含む 6 件の新しいプロトコル機能を追加しました。本記事ではすべての変更点をコード例とともに詳しく解説します。
目次
- バグ修正
- 重大:DeclareExchange パラメーター順序
- フィールドテーブル型バイト
- 仕様準拠の修正
- その他のバグ修正
- 新機能
- Basic.Nack — 否定応答
- Exchange.Bind/Unbind — エクスチェンジ間バインディング
- Confirm クラス — パブリッシャー確認
- Connection.Blocked/Unblocked — リソースアラーム
- Connection.UpdateSecret — トークン更新
- 変更されたファイル
1. バグ修正
AMQP 0-9-1 実装全体で合計 11 件のバグが修正されました。重大なパラメーター順序の問題から仕様準拠の修正まで多岐にわたります。
重大:DeclareExchange パラメーター順序
DeclareExchange および DeclareExchangeEx メソッドが aNoWait、aAutoDelete、aInternal を誤った順序で DoWrite_ExchDeclare に渡していました。これにより auto-delete フラグが no-wait として送信されるなど、ブローカーで予期しない動作が発生していました。
修正前(誤り)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);
修正後(正しい)
DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);
File: sgcAMQP_Client.pas
フィールドテーブル型バイト
sgcWriteAMQPFieldTable プロシージャは、実際の値の型に関わらず、すべてのフィールドテーブル値の型インジケーターとして常に $53('S' = 長文字列)を書き込んでいました。その結果、double・integer・boolean・int64 の値がすべてワイヤーフォーマット上で誤って文字列としてタグ付けされていました。
修正前(誤り)
sgcWriteAMQPByte($53, vBytes); // Always wrote 'S' for ALL types
case vType of
'd': sgcWriteAMQPDouble(...);
'I': sgcWriteAMQPInt32(...);
...
修正後(正しい)
case vType of
'd':
begin
sgcWriteAMQPByte(Byte('d'), vBytes); // Correct type per value
sgcWriteAMQPDouble(...);
end;
'I':
begin
sgcWriteAMQPByte(Byte('I'), vBytes);
sgcWriteAMQPInt32(...);
end;
'L':
begin
sgcWriteAMQPByte(Byte('L'), vBytes);
sgcWriteAMQPInt64(...);
end;
't':
begin
sgcWriteAMQPByte(Byte('t'), vBytes);
sgcWriteAMQPBoolean(...);
end;
'S':
begin
sgcWriteAMQPByte(Byte('S'), vBytes);
sgcWriteAMQPLongString(...);
end;
File: sgcAMQP_ReadWrite.pas
仕様準拠の修正
| 問題 | 修正前 | 修正後 | ファイル |
|---|---|---|---|
| BasicGetEmpty.Reserved1 の型が誤っていた | UInt16 / sgcReadAMQPUInt16 |
string / sgcReadAMQPShortString |
sgcAMQP_Classes.pas |
| ChannelOpenOk.Reserved1 の読み取りが誤っていた | sgcReadAMQPShortString |
sgcReadAMQPLongString |
sgcAMQP_Classes.pas |
| ConnectionClose/ChannelClose がクラス固有のメソッド値ゲッターを使用していた | sgcGetAMQPConnectionValue(FailMethodId) |
sgcGetAMQPMethodValue(FailClassId, FailMethodId) |
sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas |
任意の AMQP クラスの正しいメソッド整数 ID を解決するための新しい汎用ヘルパー関数 sgcGetAMQPMethodValue が sgcAMQP_Helpers.pas に追加されました。自分のクラスでしか機能しなかったクラス固有のゲッターを置き換えます。
その他のバグ修正
| バグ | 説明 | File |
|---|---|---|
| Channel.CloseOk のチャンネル ID が欠落していた | DoWrite_ChannCloseOk が oFrame.Header.Channel を設定していなかったため、close-ok が対象チャンネルではなくチャンネル 0(接続レベル)で送信されていました。aChannelId: Word パラメーターを追加しました。 |
sgcAMQP.pas |
| エラー定数のタイポ | 'Now Allowed' を 'Not Allowed' に修正しました。 |
sgcAMQP_Const.pas |
| QueueUnBind のリクエストデータが欠落していた | DoWrite_QueueUnBind がチャンネルリクエストに QueueUnBindQueue と QueueUnBindExchange を保存していなかったため、OnAMQPQueueUnBind イベントが空の値を報告していました。 |
sgcAMQP_Client.pas |
| DoRead 後に残余バイトが破棄されていた | 読み取りループが 1〜7 バイトの残余バイト(部分フレーム)で終了した場合、それらが無音で失われていました。次の読み取りサイクルのために FBytes に保存されるようになりました。 |
sgcAMQP.pas |
| GetChannel が aRaiseIfNotFound を無視していた | aRaiseIfNotFound パラメーターが一切チェックされていませんでした。フラグが True のときのみ例外を発生させるようになりました。 |
sgcAMQP.pas |
2. 新機能
広く使用されている RabbitMQ 拡張と追加の仕様メソッドをカバーする 6 つの新しい AMQP 0-9-1 プロトコル機能が実装されました。
Basic.Nack — 否定応答
Basic.Nack(クラス 60、メソッド 120)は、オプションの再キューイングで 1 つ以上のメッセージを一度に拒否できる RabbitMQ 拡張です。Basic.Reject とは異なり、指定された配信タグまでのすべてのメッセージを拒否する multiple フラグをサポートしています。
| Method | Description | Direction |
|---|---|---|
NackMessage |
ブローカーに否定応答を送信します。 | Client → Server |
OnAMQPBasicNack |
サーバーが Nack を送信したとき(パブリッシャー確認モード)に発火します。 | Server → Client |
NackMessage
procedure NackMessage(const aChannel: string;
aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);
OnAMQPBasicNack Event
procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
const aChannel: string;
const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
', Multiple: ' + BoolToStr(aNack.Multiple, True) +
', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;
Exchange.Bind/Unbind — エクスチェンジ間バインディング
エクスチェンジ間バインディング(クラス 40、メソッド 30/31 および 40/51)は、中間キューなしにエクスチェンジ間でメッセージをルーティングできます。これはファンアウト階層やトピックパーティショニングなど強力なトポロジーパターンを実現する RabbitMQ 拡張です。
| Method | Description |
|---|---|
BindExchange / BindExchangeEx |
ルーティングキーを使用して送信先エクスチェンジを送信元エクスチェンジにバインドします。Ex バリアントはブローカーの応答を同期的に待機します。 |
UnbindExchange / UnbindExchangeEx |
エクスチェンジ間バインディングを削除します。 |
OnAMQPExchangeBind |
ブローカーがエクスチェンジのバインドを確認したときに発火します。 |
OnAMQPExchangeUnbind |
ブローカーがエクスチェンジのアンバインドを確認したときに発火します。 |
BindExchange / BindExchangeEx
procedure BindExchange(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
aRoutingKey: string; aNoWait: Boolean;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
'downstream-exchange', // destination
'upstream-exchange', // source
'orders.#', // routing key
False) // wait for confirmation
then
Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
'downstream-exchange', 'upstream-exchange', 'orders.#', False);
Confirm クラス — パブリッシャー確認
パブリッシャー確認(クラス 85、メソッド 10/11)により、ブローカーがパブリッシュされたメッセージの受信を確認できます。Confirm.Select でチャンネルが確認モードに設定されると、ブローカーはパブリッシュされた各メッセージに対して Basic.Ack または Basic.Nack を送信し、トランザクションなしの信頼性の高いパブリッシングを実現します。
| Method / Event | Description |
|---|---|
SelectConfirm / SelectConfirmEx |
チャンネルでパブリッシャー確認モードを有効にします。 |
OnAMQPConfirmSelectOk |
ブローカーが確認モードのアクティブ化を確認したときに発火します。 |
OnAMQPBasicAck |
ブローカーがパブリッシュされたメッセージを肯定応答したときに発火します。 |
OnAMQPBasicNack |
ブローカーがパブリッシュされたメッセージを否定応答したときに発火します。 |
SelectConfirm / SelectConfirmEx
procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
サンプル:確認を使用した信頼性の高いパブリッシング
// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
const aChannel: string;
const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;
Connection.Blocked/Unblocked — リソースアラーム
ブローカーのリソース(メモリ・ディスク)が不足すると、理由文字列とともに Connection.Blocked(クラス 10、メソッド 60)を送信します。状態が解消されると Connection.Unblocked(メソッド 61)を送信します。これらはサーバーからクライアントへの通知のみです。この機能はベースクラス TsgcAMQP で処理されるため、すべての AMQP コンポーネントで利用できます。
| Event | Description |
|---|---|
OnAMQPConnectionBlocked |
ブローカーがリソース制約により接続をブロックしたときに発火します。Reason 文字列(例:'low on memory')を含みます。 |
OnAMQPConnectionUnblocked |
ブローカーが接続ブロックを解除したときに発火します。 |
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
const aReason: string);
begin
Log('Connection BLOCKED: ' + aReason);
// Pause publishing to avoid message loss
FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
Log('Connection unblocked - resuming');
FPublishingPaused := False;
end;
Connection.UpdateSecret — トークン更新
Connection.UpdateSecret(クラス 10、メソッド 70)は、再接続することなくアクティブな接続で認証資格情報を更新できます。トークンが定期的に失効する OAuth2/JWT ベースの認証に不可欠です。
| Method / Event | Description |
|---|---|
UpdateSecret / UpdateSecretEx |
オプションの理由文字列とともに新しいシークレット(トークン)をブローカーに送信します。 |
OnAMQPConnectionUpdateSecretOk |
ブローカーが新しいシークレットを受け入れたときに発火します。 |
UpdateSecret / UpdateSecretEx
procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
vNewToken: string;
begin
vNewToken := GetRefreshedOAuthToken();
if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
Log('Token refreshed successfully')
else
Log('Token refresh failed - reconnecting');
end;
3. 変更されたファイル
| ファイル | 変更内容 |
|---|---|
sgcAMQP_Const.pas |
タイポ修正('Not Allowed')。 |
sgcAMQP_Helpers.pas |
新しい sgcGetAMQPMethodValue 関数、Confirm クラスヘルパー(sgcGetAMQPConfirm/sgcGetAMQPConfirmValue)、すべての新メソッドのメソッド ID マッピングを追加。 |
sgcAMQP_ReadWrite.pas |
フィールドテーブル型バイト修正 — 各値の型が正しい型インジケーターを書き込むようになりました。 |
sgcAMQP_Classes.pas |
新しい列挙型(amqpClassConfirm、12 の新メソッド)、13 の新しいペイロードクラス、ディスパッチテーブルの更新、スレッドセーフティ修正、仕様準拠修正、新しいリクエスト保存フィールド。 |
sgcAMQP.pas |
8 つの新しいイベント型、Connection.Blocked/Unblocked 処理、DoWrite_ChannCloseOk チャンネル ID 修正、残余バイト保持、GetChannel フラグ修正。 |
sgcAMQP_Client.pas |
6 つの新しい読み取りハンドラー、5 つの新しい書き込みメソッド、11 の新しいパブリックメソッド、6 つの新しいイベント、ディスパッチテーブルの更新、パラメーター順序修正、リクエストデータ修正。 |
