Ab sgcWebSockets 2022.1 wird das AMQP-0.9.1-Protokoll unterstützt. Das Advanced Message Queuing Protocol (AMQP) ist ein offenes Standardprotokoll der Anwendungsschicht für nachrichtenorientierte Middleware. Die definierenden Merkmale von AMQP sind Nachrichtenorientierung, Queuing, Routing (einschließlich Point-to-Point und Publish-and-Subscribe), Zuverlässigkeit und Sicherheit.
AMQP ist ein binäres Protokoll der Anwendungsschicht, das entwickelt wurde, um eine Vielzahl von Messaging-Anwendungen und Kommunikationsmustern effizient zu unterstützen. Es bietet flussgesteuerte, nachrichtenorientierte Kommunikation mit Zustellgarantien wie at-most-once (jede Nachricht wird einmal oder gar nicht zugestellt), at-least-once (jede Nachricht wird sicher zugestellt, aber möglicherweise mehrfach) und exactly-once (die Nachricht trifft garantiert genau einmal ein) sowie Authentifizierung und/oder Verschlüsselung auf Basis von SASL und/oder TLS. Es setzt eine zugrundeliegende zuverlässige Transportschicht wie das Transmission Control Protocol (TCP) voraus.
Kanäle
AMQP ist ein Multi-Channel-Protokoll. Kanäle bieten eine Möglichkeit, eine schwergewichtige TCP/IP-Verbindung in mehrere leichtgewichtige Verbindungen zu multiplexen. Dadurch wird das Protokoll "firewall-freundlicher", da die Portnutzung vorhersehbar ist. Außerdem bedeutet das, dass Traffic-Shaping und andere QoS-Netzwerkfunktionen leicht eingesetzt werden können.
Jeder Kanal läuft in seinem eigenen Thread. Sobald eine neue Nachricht eintrifft, identifiziert der Client zuerst den Kanal und reiht die Nachricht in eine Queue ein, die vom Kanal-Thread verarbeitet wird.
Der Lebenszyklus eines Kanals sieht so aus:
1. Der Client öffnet einen neuen Kanal (Open).
2. Der Server bestätigt, dass der neue Kanal bereit ist (Open-Ok).
3. Client und Server nutzen den Kanal nach Bedarf.
4. Ein Peer (Client oder Server) schließt den Kanal (Close).
5. Der andere Peer bestätigt die Kanalschließung (Close-Ok).
Um einen neuen Kanal zu erstellen, rufst du einfach die Methode OpenChannel auf und übergibst den Kanalnamen als Argument. Das Ereignis OnAMQPChannelOpen wird als Bestätigung des Servers ausgelöst, dass der Kanal geöffnet wurde.
AMQP.OpenChannel('channel_name');
procedure OnAMQPChannelOpen(Sender: TObject; const aChannel: string);
begin
DoLog('#AMQP_channel_open: ' + aChannel);
end;
Exchanges
Die Exchange-Klasse erlaubt einer Anwendung, Exchanges auf dem Server zu verwalten. Diese Klasse ermöglicht es der Anwendung, ihre eigene Verdrahtung zu skripten (statt sich auf eine Konfigurationsschnittstelle zu verlassen). Hinweis: Die meisten Anwendungen benötigen diesen Grad an Komplexität nicht, und ältere Middleware kann diese Semantik wahrscheinlich nicht unterstützen.
Der Lebenszyklus eines Exchanges:
1. Der Client bittet den Server sicherzustellen, dass der Exchange existiert (Declare). Der Client kann das verfeinern in "Erstelle den Exchange, falls er nicht existiert" oder "Warne mich, aber erstelle ihn nicht, falls er nicht existiert".
2. Der Client veröffentlicht Nachrichten an den Exchange.
3. Der Client kann den Exchange löschen (Delete).
Die Methode DeclareExchange erstellt einen neuen Exchange oder verifiziert, dass ein Exchange bereits existiert. Die Methode hat folgende Argumente:
- ChannelName: der Name des Kanals (muss vor dem Aufruf dieser Methode geöffnet sein).
- ExchangeName: der Name des Exchanges, darf nicht länger als 255 Zeichen sein und nicht mit "amq." beginnen (außer wenn der passive-Parameter true ist).
- ExchangeType: der Exchange-Typ, alle AMQP-Server unterstützen "direct"- und "fanout"-Exchanges. Schau in die Server-Dokumentation, um zu erfahren, welche Exchange-Typen unterstützt werden.
- Passive: wenn passive true ist, verifiziert der Server nur, dass der Exchange bereits deklariert ist. Wenn passive false ist und der Exchange nicht existiert, erstellt der Server einen neuen.
- Durable: wenn true, wird der Exchange beim Serverstart neu erstellt. Wenn false, wird der Exchange beim Server-Stopp gelöscht.
- AutoDelete: wenn true, wird der Exchange gelöscht, wenn alle Queues entbunden wurden.
- Internal: immer false.
- NoWait: wenn true, sendet der Server keine Bestätigung an den Client.
Um einen neuen Exchange zu deklarieren, rufst du einfach die Methode DeclareExchange auf und übergibst Kanalname, Exchange-Name und Exchange-Typ als Argumente. Das Ereignis OnAMQPExchangeDeclare wird als Bestätigung des Servers ausgelöst, dass der Exchange deklariert wurde.
AMQP.DeclareExchange('channel_name', 'exchange_name', 'direct');
procedure OnAMQPExchangeDeclare(Sender: TObject; const aChannel, aExchange: string);
begin
DoLog('#AMQP_exchange_declare: [' + aChannel + '] ' + aExchange);
end;
Queues
Die Queue-Klasse erlaubt einer Anwendung, Message-Queues auf dem Server zu verwalten. Dies ist ein grundlegender Schritt in fast allen Anwendungen, die Nachrichten konsumieren, mindestens um zu verifizieren, dass eine erwartete Message-Queue tatsächlich vorhanden ist.
Der Lebenszyklus einer dauerhaften Message-Queue ist relativ einfach:
1. Der Client behauptet, dass die Message-Queue existiert (Declare mit dem "passive"-Argument).
2. Der Server bestätigt, dass die Message-Queue existiert (Declare-Ok).
3. Der Client liest Nachrichten aus der Message-Queue.
Der Lebenszyklus einer temporären Message-Queue ist interessanter:
1. Der Client erstellt die Message-Queue (Declare, oft ohne Queue-Namen, sodass der Server einen Namen zuweist). Der Server bestätigt (Declare-Ok).
2. Der Client startet einen Consumer auf der Message-Queue. Die genaue Funktionalität eines Consumers wird durch die Basic-Klasse definiert.
3. Der Client storniert den Consumer, entweder explizit oder durch Schließen des Kanals und/oder der Verbindung.
4. Wenn der letzte Consumer aus der Message-Queue verschwindet, löscht der Server nach einer angemessenen Wartezeit die Message-Queue.
AMQP implementiert den Zustellmechanismus für Topic-Abonnements als Message-Queues. Dies ermöglicht interessante Strukturen, bei denen ein Abonnement zwischen einem Pool kooperierender Subscriber-
Anwendungen lastverteilt werden kann.
Der Lebenszyklus eines Abonnements umfasst eine zusätzliche Bind-Phase:
1. Der Client erstellt die Message-Queue (Declare), und der Server bestätigt (Declare-Ok).
2. Der Client bindet die Message-Queue an einen Topic-Exchange (Bind), und der Server bestätigt (Bind-Ok).
3. Der Client verwendet die Message-Queue wie in den vorherigen Beispielen.
Die Methode DeclareQueue erstellt eine neue Queue oder verifiziert, dass eine Queue bereits existiert. Die Methode hat folgende Argumente:
- ChannelName: der Name des Kanals (muss vor dem Aufruf dieser Methode geöffnet sein).
- QueueName: der Name der Queue, darf nicht länger als 255 Zeichen sein und nicht mit "amq." beginnen (außer wenn der passive-Parameter true ist).
- Passive: wenn passive true ist, verifiziert der Server nur, dass die Queue bereits deklariert ist. Wenn passive false ist und die Queue nicht existiert, erstellt der Server eine neue.
- Durable: wenn true, wird die Queue beim Serverstart neu erstellt. Wenn false, wird die Queue beim Server-Stopp gelöscht.
- Exclusive: wenn true, bedeutet das, dass auf die Queue nur die aktuelle Verbindung zugreift.
- AutoDelete: wenn true, wird die Queue gelöscht, wenn alle Consumer die Queue nicht mehr verwenden.
- NoWait: wenn true, sendet der Server keine Bestätigung an den Client.
Um eine neue Queue zu deklarieren, rufst du einfach die Methode DeclareQueue auf und übergibst Kanalname und Queue-Name als Argumente. Das Ereignis OnAMQPQueueDeclare wird als Bestätigung des Servers ausgelöst, dass die Queue deklariert wurde.
AMQP.DeclareQueue('channel_name', 'queue_name');
procedure OnAMQPQueueDeclare(Sender: TObject; const aChannel, aQueue: string; aMessageCount, aConsumerCount: Integer);
begin
DoLog('#AMQP_queue_declare: [' + aChannel + '] ' + aQueue);
end;
