AMQP 0.9.1 Delphi クライアントの更新

· 機能

sgcWebSockets の AMQP 0-9-1 実装は包括的なアップデートを受けました。重要なパラメーター順序、型の不一致、スレッドセーフティ、データ損失の問題に対する 11 件のバグ修正と、Basic.Nack、Exchange-to-Exchange バインディング、Publisher Confirms、Connection.Blocked/Unblocked 通知、トークン更新のための Connection.UpdateSecret を含む 6 件の新しいプロトコル機能を追加しました。本記事ではすべての変更点をコード例とともに詳しく解説します。

目次

  1. バグ修正
  2. 重大:DeclareExchange パラメーター順序
  3. フィールドテーブル型バイト
  4. 仕様準拠の修正
  5. その他のバグ修正
  6. 新機能
  7. Basic.Nack — 否定応答
  8. Exchange.Bind/Unbind — エクスチェンジ間バインディング
  9. Confirm クラス — パブリッシャー確認
  10. Connection.Blocked/Unblocked — リソースアラーム
  11. Connection.UpdateSecret — トークン更新
  12. 変更されたファイル

1. バグ修正

AMQP 0-9-1 実装全体で合計 11 件のバグが修正されました。重大なパラメーター順序の問題から仕様準拠の修正まで多岐にわたります。

重大:DeclareExchange パラメーター順序

DeclareExchange および DeclareExchangeEx メソッドが aNoWaitaAutoDeleteaInternal を誤った順序で DoWrite_ExchDeclare に渡していました。これにより auto-delete フラグが no-wait として送信されるなど、ブローカーで予期しない動作が発生していました。

修正前(誤り)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aNoWait, aAutoDelete, aInternal, aArguments);

修正後(正しい)

DoWrite_ExchDeclare(GetChannel(aChannel), aExchange, aExchangeType,
  aPassive, aDurable, aAutoDelete, aInternal, aNoWait, aArguments);

File: sgcAMQP_Client.pas


フィールドテーブル型バイト

sgcWriteAMQPFieldTable プロシージャは、実際の値の型に関わらず、すべてのフィールドテーブル値の型インジケーターとして常に $53'S' = 長文字列)を書き込んでいました。その結果、double・integer・boolean・int64 の値がすべてワイヤーフォーマット上で誤って文字列としてタグ付けされていました。

修正前(誤り)

sgcWriteAMQPByte($53, vBytes);  // Always wrote 'S' for ALL types
case vType of
  'd': sgcWriteAMQPDouble(...);
  'I': sgcWriteAMQPInt32(...);
  ...

修正後(正しい)

case vType of
  'd':
  begin
    sgcWriteAMQPByte(Byte('d'), vBytes);  // Correct type per value
    sgcWriteAMQPDouble(...);
  end;
  'I':
  begin
    sgcWriteAMQPByte(Byte('I'), vBytes);
    sgcWriteAMQPInt32(...);
  end;
  'L':
  begin
    sgcWriteAMQPByte(Byte('L'), vBytes);
    sgcWriteAMQPInt64(...);
  end;
  't':
  begin
    sgcWriteAMQPByte(Byte('t'), vBytes);
    sgcWriteAMQPBoolean(...);
  end;
  'S':
  begin
    sgcWriteAMQPByte(Byte('S'), vBytes);
    sgcWriteAMQPLongString(...);
  end;

File: sgcAMQP_ReadWrite.pas


仕様準拠の修正

問題 修正前 修正後 ファイル
BasicGetEmpty.Reserved1 の型が誤っていた UInt16 / sgcReadAMQPUInt16 string / sgcReadAMQPShortString sgcAMQP_Classes.pas
ChannelOpenOk.Reserved1 の読み取りが誤っていた sgcReadAMQPShortString sgcReadAMQPLongString sgcAMQP_Classes.pas
ConnectionClose/ChannelClose がクラス固有のメソッド値ゲッターを使用していた sgcGetAMQPConnectionValue(FailMethodId) sgcGetAMQPMethodValue(FailClassId, FailMethodId) sgcAMQP_Classes.pas, sgcAMQP_Helpers.pas

任意の AMQP クラスの正しいメソッド整数 ID を解決するための新しい汎用ヘルパー関数 sgcGetAMQPMethodValuesgcAMQP_Helpers.pas に追加されました。自分のクラスでしか機能しなかったクラス固有のゲッターを置き換えます。


その他のバグ修正

バグ 説明 File
Channel.CloseOk のチャンネル ID が欠落していた DoWrite_ChannCloseOkoFrame.Header.Channel を設定していなかったため、close-ok が対象チャンネルではなくチャンネル 0(接続レベル)で送信されていました。aChannelId: Word パラメーターを追加しました。 sgcAMQP.pas
エラー定数のタイポ 'Now Allowed''Not Allowed' に修正しました。 sgcAMQP_Const.pas
QueueUnBind のリクエストデータが欠落していた DoWrite_QueueUnBind がチャンネルリクエストに QueueUnBindQueueQueueUnBindExchange を保存していなかったため、OnAMQPQueueUnBind イベントが空の値を報告していました。 sgcAMQP_Client.pas
DoRead 後に残余バイトが破棄されていた 読み取りループが 1〜7 バイトの残余バイト(部分フレーム)で終了した場合、それらが無音で失われていました。次の読み取りサイクルのために FBytes に保存されるようになりました。 sgcAMQP.pas
GetChannel が aRaiseIfNotFound を無視していた aRaiseIfNotFound パラメーターが一切チェックされていませんでした。フラグが True のときのみ例外を発生させるようになりました。 sgcAMQP.pas

2. 新機能

広く使用されている RabbitMQ 拡張と追加の仕様メソッドをカバーする 6 つの新しい AMQP 0-9-1 プロトコル機能が実装されました。


Basic.Nack — 否定応答

Basic.Nack(クラス 60、メソッド 120)は、オプションの再キューイングで 1 つ以上のメッセージを一度に拒否できる RabbitMQ 拡張です。Basic.Reject とは異なり、指定された配信タグまでのすべてのメッセージを拒否する multiple フラグをサポートしています。

Method Description Direction
NackMessage ブローカーに否定応答を送信します。 Client → Server
OnAMQPBasicNack サーバーが Nack を送信したとき(パブリッシャー確認モード)に発火します。 Server → Client

NackMessage

procedure NackMessage(const aChannel: string;
  aDeliveryTag: UInt64; aMultiple, aRequeue: Boolean);
// Reject a single message and requeue it
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, False, True);
// Reject all unacknowledged messages up to this tag, discard them
sgcAMQPClient.NackMessage('my-channel', vDeliveryTag, True, False);

OnAMQPBasicNack Event

procedure TForm1.sgcAMQPClientBasicNack(Sender: TObject;
  const aChannel: string;
  const aNack: TsgcAMQPFramePayload_Method_BasicNack);
begin
  Log('Nack received - DeliveryTag: ' + IntToStr(aNack.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aNack.Multiple, True) +
    ', Requeue: ' + BoolToStr(aNack.Requeue, True));
end;

Exchange.Bind/Unbind — エクスチェンジ間バインディング

エクスチェンジ間バインディング(クラス 40、メソッド 30/31 および 40/51)は、中間キューなしにエクスチェンジ間でメッセージをルーティングできます。これはファンアウト階層やトピックパーティショニングなど強力なトポロジーパターンを実現する RabbitMQ 拡張です。

Method Description
BindExchange / BindExchangeEx ルーティングキーを使用して送信先エクスチェンジを送信元エクスチェンジにバインドします。Ex バリアントはブローカーの応答を同期的に待機します。
UnbindExchange / UnbindExchangeEx エクスチェンジ間バインディングを削除します。
OnAMQPExchangeBind ブローカーがエクスチェンジのバインドを確認したときに発火します。
OnAMQPExchangeUnbind ブローカーがエクスチェンジのアンバインドを確認したときに発火します。

BindExchange / BindExchangeEx

procedure BindExchange(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  const aArguments: string = '');
function BindExchangeEx(const aChannel, aDestination, aSource,
  aRoutingKey: string; aNoWait: Boolean;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT;
  const aArguments: string = ''): Boolean;
// Bind 'downstream-exchange' to 'upstream-exchange' with routing key
if sgcAMQPClient.BindExchangeEx('my-channel',
  'downstream-exchange',  // destination
  'upstream-exchange',    // source
  'orders.#',             // routing key
  False)                  // wait for confirmation
then
  Log('Exchange binding created successfully');
// Remove the binding
sgcAMQPClient.UnbindExchange('my-channel',
  'downstream-exchange', 'upstream-exchange', 'orders.#', False);

Confirm クラス — パブリッシャー確認

パブリッシャー確認(クラス 85、メソッド 10/11)により、ブローカーがパブリッシュされたメッセージの受信を確認できます。Confirm.Select でチャンネルが確認モードに設定されると、ブローカーはパブリッシュされた各メッセージに対して Basic.Ack または Basic.Nack を送信し、トランザクションなしの信頼性の高いパブリッシングを実現します。

Method / Event Description
SelectConfirm / SelectConfirmEx チャンネルでパブリッシャー確認モードを有効にします。
OnAMQPConfirmSelectOk ブローカーが確認モードのアクティブ化を確認したときに発火します。
OnAMQPBasicAck ブローカーがパブリッシュされたメッセージを肯定応答したときに発火します。
OnAMQPBasicNack ブローカーがパブリッシュされたメッセージを否定応答したときに発火します。

SelectConfirm / SelectConfirmEx

procedure SelectConfirm(const aChannel: string; aNoWait: Boolean);
function SelectConfirmEx(const aChannel: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;

サンプル:確認を使用した信頼性の高いパブリッシング

// 1. Enable confirm mode on the channel
if sgcAMQPClient.SelectConfirmEx('my-channel') then
  Log('Confirm mode enabled');
// 2. Publish a message - broker will send Ack or Nack
sgcAMQPClient.PublishMessage('my-channel', 'my-exchange',
  'routing.key', 'Hello World');
// 3. Handle server confirmations
procedure TForm1.sgcAMQPClientBasicAck(Sender: TObject;
  const aChannel: string;
  const aAck: TsgcAMQPFramePayload_Method_BasicAck);
begin
  Log('Message confirmed - DeliveryTag: ' + IntToStr(aAck.DeliveryTag) +
    ', Multiple: ' + BoolToStr(aAck.Multiple, True));
end;

Connection.Blocked/Unblocked — リソースアラーム

ブローカーのリソース(メモリ・ディスク)が不足すると、理由文字列とともに Connection.Blocked(クラス 10、メソッド 60)を送信します。状態が解消されると Connection.Unblocked(メソッド 61)を送信します。これらはサーバーからクライアントへの通知のみです。この機能はベースクラス TsgcAMQP で処理されるため、すべての AMQP コンポーネントで利用できます。

Event Description
OnAMQPConnectionBlocked ブローカーがリソース制約により接続をブロックしたときに発火します。Reason 文字列(例:'low on memory')を含みます。
OnAMQPConnectionUnblocked ブローカーが接続ブロックを解除したときに発火します。
procedure TForm1.sgcAMQPConnectionBlocked(Sender: TObject;
  const aReason: string);
begin
  Log('Connection BLOCKED: ' + aReason);
  // Pause publishing to avoid message loss
  FPublishingPaused := True;
end;
procedure TForm1.sgcAMQPConnectionUnblocked(Sender: TObject);
begin
  Log('Connection unblocked - resuming');
  FPublishingPaused := False;
end;

Connection.UpdateSecret — トークン更新

Connection.UpdateSecret(クラス 10、メソッド 70)は、再接続することなくアクティブな接続で認証資格情報を更新できます。トークンが定期的に失効する OAuth2/JWT ベースの認証に不可欠です。

Method / Event Description
UpdateSecret / UpdateSecretEx オプションの理由文字列とともに新しいシークレット(トークン)をブローカーに送信します。
OnAMQPConnectionUpdateSecretOk ブローカーが新しいシークレットを受け入れたときに発火します。

UpdateSecret / UpdateSecretEx

procedure UpdateSecret(const aNewSecret, aReason: string);
function UpdateSecretEx(const aNewSecret, aReason: string;
  aTimeout: Integer = CS_AMQP_DEFAULT_TIMEOUT): Boolean;
// Refresh the OAuth token before it expires
var
  vNewToken: string;
begin
  vNewToken := GetRefreshedOAuthToken();
  if sgcAMQPClient.UpdateSecretEx(vNewToken, 'token refresh') then
    Log('Token refreshed successfully')
  else
    Log('Token refresh failed - reconnecting');
end;

3. 変更されたファイル

ファイル 変更内容
sgcAMQP_Const.pas タイポ修正('Not Allowed')。
sgcAMQP_Helpers.pas 新しい sgcGetAMQPMethodValue 関数、Confirm クラスヘルパー(sgcGetAMQPConfirm/sgcGetAMQPConfirmValue)、すべての新メソッドのメソッド ID マッピングを追加。
sgcAMQP_ReadWrite.pas フィールドテーブル型バイト修正 — 各値の型が正しい型インジケーターを書き込むようになりました。
sgcAMQP_Classes.pas 新しい列挙型(amqpClassConfirm、12 の新メソッド)、13 の新しいペイロードクラス、ディスパッチテーブルの更新、スレッドセーフティ修正、仕様準拠修正、新しいリクエスト保存フィールド。
sgcAMQP.pas 8 つの新しいイベント型、Connection.Blocked/Unblocked 処理、DoWrite_ChannCloseOk チャンネル ID 修正、残余バイト保持、GetChannel フラグ修正。
sgcAMQP_Client.pas 6 つの新しい読み取りハンドラー、5 つの新しい書き込みメソッド、11 の新しいパブリックメソッド、6 つの新しいイベント、ディスパッチテーブルの更新、パラメーター順序修正、リクエストデータ修正。