AMQP Exchanges y Queues (1 / 3)

· Componentes

Desde sgcWebSockets 2022.1 se soporta el protocolo AMQP 0.9.1. El Advanced Message Queuing Protocol (AMQP) es un protocolo de capa de aplicación estándar abierto para middleware orientado a mensajes. Las características definitorias de AMQP son orientación a mensajes, encolado, enrutamiento (incluyendo punto a punto y publicación/suscripción), fiabilidad y seguridad.

AMQP es un protocolo binario de capa de aplicación, diseñado para soportar eficientemente una amplia variedad de aplicaciones de mensajería y patrones de comunicación. Ofrece comunicación orientada a mensajes con control de flujo y garantías de entrega como at-most-once (cada mensaje se entrega una vez o nunca), at-least-once (cada mensaje se entrega con seguridad, pero puede entregarse varias veces) y exactly-once (el mensaje siempre llegará y solo una vez), y autenticación y/o cifrado basados en SASL y/o TLS. Asume un protocolo de transporte fiable subyacente como Transmission Control Protocol (TCP).

Canales 

AMQP es un protocolo multicanal. Los canales proporcionan una forma de multiplexar una conexión TCP/IP pesada en varias conexiones ligeras. Esto hace al protocolo más amigable con los firewalls, ya que el uso de puertos es predecible. También significa que se puede aplicar fácilmente traffic shaping y otras características de QoS de red.

Cada canal corre en su propio hilo, por lo que cada vez que llega un nuevo mensaje, el cliente primero identifica el canal y encola el mensaje en una cola que procesa el hilo del canal.

El ciclo de vida del canal es este:

1. El cliente abre un nuevo canal (Open).

2. El servidor confirma que el nuevo canal está listo (Open-Ok).

3. El cliente y el servidor usan el canal según deseen.

4. Uno de los peers (cliente o servidor) cierra el canal (Close).

5. El otro peer confirma el cierre del canal (Close-Ok).


Para crear un nuevo canal, simplemente llama al método OpenChannel pasando el nombre del canal como argumento. El evento OnAMQPChannelOpen se dispara como confirmación enviada por el servidor de que el canal se ha abierto.

AMQP.OpenChannel('channel_name');
procedure OnAMQPChannelOpen(Sender: TObject; const aChannel: string);
begin
  DoLog('#AMQP_channel_open: ' + aChannel);
end; 

Exchanges 

La clase exchange permite a una aplicación gestionar exchanges en el servidor. Esta clase permite a la aplicación definir su propio cableado por código (en lugar de depender de alguna interfaz de configuración). Nota: la mayoría de aplicaciones no necesitan este nivel de sofisticación, y es poco probable que el middleware heredado pueda soportar esta semántica.

El ciclo de vida del exchange es:

1. El cliente pide al servidor asegurarse de que el exchange existe (Declare). El cliente puede afinar esto a "crear el exchange si no existe" o "avisarme pero no crearlo si no existe".

2. El cliente publica mensajes en el exchange.

3. El cliente puede optar por eliminar el exchange (Delete).

El método DeclareExchange crea un nuevo exchange o verifica que un Exchange ya existe. El método tiene los siguientes argumentos:


Para declarar un nuevo Exchange llama al método DeclareExchange pasando el nombre del canal, el nombre del exchange y el tipo de exchange como argumentos. El evento OnAMQPExchangeDeclare se dispara como confirmación del servidor de que el exchange ha sido declarado.

AMQP.DeclareExchange('channel_name', 'exchange_name', 'direct');
procedure OnAMQPExchangeDeclare(Sender: TObject; const aChannel, aExchange: string);
begin
  DoLog('#AMQP_exchange_declare: [' + aChannel + '] ' + aExchange);
end; 

Colas 

La clase queue permite a una aplicación gestionar colas de mensajes en el servidor. Es un paso básico en casi todas las aplicaciones que consumen mensajes, al menos para verificar que una cola de mensajes esperada está realmente presente.

El ciclo de vida de una cola de mensajes durable es bastante simple:

1. El cliente afirma que la cola de mensajes existe (Declare, con el argumento "passive").

2. El servidor confirma que la cola de mensajes existe (Declare-Ok).

3. El cliente lee mensajes de la cola.

El ciclo de vida de una cola temporal es más interesante:

1. El cliente crea la cola de mensajes (Declare, a menudo sin nombre para que el servidor asigne uno). El servidor confirma (Declare-Ok).

2. El cliente inicia un consumidor en la cola. La funcionalidad precisa de un consumidor la define la clase Basic.

3. El cliente cancela el consumidor, ya sea explícitamente o cerrando el canal y/o la conexión.

4. Cuando el último consumidor desaparece de la cola y tras un tiempo de cortesía, el servidor elimina la cola.

AMQP implementa el mecanismo de entrega para suscripciones a topics mediante colas de mensajes. Esto habilita estructuras interesantes donde una suscripción puede balancearse entre un pool de

aplicaciones suscriptoras cooperantes.

El ciclo de vida de una suscripción implica una etapa adicional de bind:

1. El cliente crea la cola de mensajes (Declare) y el servidor confirma (Declare-Ok).

2. El cliente vincula la cola a un topic exchange (Bind) y el servidor confirma (Bind-Ok).

3. El cliente usa la cola como en los ejemplos anteriores.

El método DeclareQueue crea una nueva cola o verifica que una Cola ya existe. El método tiene los siguientes argumentos:


Para declarar una nueva Cola llama al método DeclareQueue pasando el nombre del canal y el nombre de la cola como argumentos. El evento OnAMQPQueueDeclare se dispara como confirmación del servidor de que el exchange ha sido declarado.

AMQP.DeclareQueue('channel_name', 'queue_name');
procedure OnAMQPQueueDeclare(Sender: TObject; const aChannel, aQueue: string; aMessageCount, aConsumerCount: Integer);
begin
  DoLog('#AMQP_queue_declare: [' + aChannel + '] ' + aQueue);
end;