TsgcWSPClient_AMQP

The TsgcWSClient_AMQP client implements the full AMQP 0.9.6 protocol following the OASIS specification. The client supports Plain TCP and WebSocket connections, TLS (secure) connections are supported too.

 

Connection

AMQP 0.9.6 protocols defines the concept of channels, which allows to share a single socket connection with several virtual channels, the client implements an internal thread which reads the bytes received and dispatch every message to the correct channel (which already runs in his own thread), so, if you are running an AMQP connection with 5 channels, the client will run 6 threads (5 threads which handle the data of every channel and 1 thread which handles the data of the connection).

 

Before connect to an AMQP server, configure the following properties of the AMQP protocol

 

 

The AMQP HeartBeat can be configured too before connect to server, you can enable or disable the use of heartbeats.

 

 

Once the AMQP client has been configured, attach to a TsgcWebSocketClient and now you can configure the server connection properties to connect to the AMQP Server.

Set the property value Specifications.RFC6455 to false if using Plain TCP connection instead of WebSocket connection.

 


oAMQP := TsgcWSPClient_AMQP.Create(nil);
oAMQP.AMQPOptions.Locale := 'en_US';
oAMQP.AMQPOptions.MaxChannels := 100;
oAMQP.AMQPOptions.MaxFrameSize := 16384;
oAMQP.AMQPOptions.VirtualHost := '/';
oAMQP.HeartBeat.Enabled := true;
oAMQP.HeartBeat.Interval := 60;
 
oClient := TsgcWebSocketClient.Create(nil);
oAMQP.Client := oClient;
oClient.Specifications.RFC6455 := false;
oClient.Host := 'www.esegece.com';
oClient.Port := 5672;
oClient.Active := True;

Channels

Once the AMQP client has connected, it can open the first channel.


oAMQP.OpenChannel('channel_name');

Exchanges

When a Channel is opened, the client can declare new exchanges, verify than exists... use the method DeclareExchange to declare a new exchange.

 


oAMQP.DeclareExchange('channel_name', 'exchange_name');

Queues

When a Channel is opened, the client can declare new queues, verify than exists... use the method DeclareQueue to declare a new Queue. The queues are not provided by default by the server (unlike the exchanges), so it's always required to declare a new queue (unless a queue has been already created by another client).

 


oAMQP.DeclareQueue('channel_name', 'queue_name');

Binding Queues

Once the Exchanges and Queues are configured, you may need to bind queues to exchanges, this way the exchanges can know which messages will be dispatched to the queues.

 

AMQP Servers automatically bind the queues to "direct" exchange using the queue name as routing key. This allows to send a message to a specific queue without the need to declare a binding (just calling PublishMessage method and pasing the Exchange argument as empty value and the name of the queue in the RoutingKey argument).

 


oAMQP.BindQueue('channel_name', 'queue_name', 'exchange_name', 'routing_key');

Send Messages

Call the method PublishMessage to publish a new AMQP message. The method allows to publish a String or TStream message.

 


oAMQP.PublishMessage('channel_name', 'exchange_name', 'routing_key', 'Hello from sgcWebSockets!!!');

Receive Messages

AMQP allows to receive the messages in 2 modes:

 

 

Request By Client


oAMQP.GetMessage('channel_name', 'queue_name');
 
procedure OnAMQPGetOk(Sender: TObject; const aChannel: string; 
  const aGetOk: TsgcAMQPFramePayload_Method_BasicGetOk; const aContent: TsgcAMQPMessageContent)
begin
  DoLog('#AMQP_basic_GetOk: ' + aChannel + ' ' + IntToStr(aGetOk.MessageCount) + ' ' + aContent.Body.AsString);
end;

Pushed By Server


oAMQP.Consume('channel_name', 'queue_name');
 
procedure OnAMQPGetOk(Sender: TObject; const aChannel, aConsumerTag: string)
begin
  DoLog('#AMQP_basic_GetOk: ' + aChannel + ' ' + IntToStr(aGetOk.MessageCount) + ' ' + aContent.Body.AsString);
end;