The TsgcWSClient_AMQP client implements the full AMQP 0.9.6 protocol following the OASIS specification. The client supports Plain TCP and WebSocket connections, TLS (secure) connections are supported too.
AMQP 0.9.6 protocols defines the concept of channels, which allows to share a single socket connection with several virtual channels, the client implements an internal thread which reads the bytes received and dispatch every message to the correct channel (which already runs in his own thread), so, if you are running an AMQP connection with 5 channels, the client will run 6 threads (5 threads which handle the data of every channel and 1 thread which handles the data of the connection).
Before connect to an AMQP server, configure the following properties of the AMQP protocol
AMQPOptions.Locale: it's the message locale to use, it's a negotiated value, so can change when compared with the supported locales supported by the server. The default value is "en_US".
AMQPOptions.MaxChannels: it's the maximum number of channels which can be opened, it's a negotiated value, so can change when compared with the server configuration. The default value is 65535.
AMQPOptions.MaxFrameSize: it's the maximum size in bytes of the AMQP frame, it's a negotiated value, so can change when compared with the server configuration. The default value is 2147483647.
AMQPOptions.VirtualHost: it's the name of the virtual host. The default value is "/".
The AMQP HeartBeat can be configured too before connect to server, you can enable or disable the use of heartbeats.
HeartBeat.Enabled: set to true if the client supports HeartBeats.
HeartBeat.Interval: the desired interval in seconds.
Once the AMQP client has been configured, attach to a TsgcWebSocketClient and now you can configure the server connection properties to connect to the AMQP Server.
Set the property value Specifications.RFC6455 to false if using Plain TCP connection instead of WebSocket connection.
oAMQP := TsgcWSPClient_AMQP.Create(nil);
oAMQP.AMQPOptions.Locale := 'en_US';
oAMQP.AMQPOptions.MaxChannels := 100;
oAMQP.AMQPOptions.MaxFrameSize := 16384;
oAMQP.AMQPOptions.VirtualHost := '/';
oAMQP.HeartBeat.Enabled := true;
oAMQP.HeartBeat.Interval := 60;
oClient := TsgcWebSocketClient.Create(nil);
oAMQP.Client := oClient;
oClient.Specifications.RFC6455 := false;
oClient.Host := 'www.esegece.com';
oClient.Port := 5672;
oClient.Active := True;
Once the AMQP client has connected, it can open the first channel.
oAMQP.OpenChannel('channel_name');
When a Channel is opened, the client can declare new exchanges, verify than exists... use the method DeclareExchange to declare a new exchange.
oAMQP.DeclareExchange('channel_name', 'exchange_name');
When a Channel is opened, the client can declare new queues, verify than exists... use the method DeclareQueue to declare a new Queue. The queues are not provided by default by the server (unlike the exchanges), so it's always required to declare a new queue (unless a queue has been already created by another client).
oAMQP.DeclareQueue('channel_name', 'queue_name');
Once the Exchanges and Queues are configured, you may need to bind queues to exchanges, this way the exchanges can know which messages will be dispatched to the queues.
AMQP Servers automatically bind the queues to "direct" exchange using the queue name as routing key. This allows to send a message to a specific queue without the need to declare a binding (just calling PublishMessage method and pasing the Exchange argument as empty value and the name of the queue in the RoutingKey argument).
oAMQP.BindQueue('channel_name', 'queue_name', 'exchange_name', 'routing_key');
Call the method PublishMessage to publish a new AMQP message. The method allows to publish a String or TStream message.
oAMQP.PublishMessage('channel_name', 'exchange_name', 'routing_key', 'Hello from sgcWebSockets!!!');
AMQP allows to receive the messages in 2 modes:
Request By Client
oAMQP.GetMessage('channel_name', 'queue_name');
procedure OnAMQPGetOk(Sender: TObject; const aChannel: string;
const aGetOk: TsgcAMQPFramePayload_Method_BasicGetOk; const aContent: TsgcAMQPMessageContent)
begin
DoLog('#AMQP_basic_GetOk: ' + aChannel + ' ' + IntToStr(aGetOk.MessageCount) + ' ' + aContent.Body.AsString);
end;
Pushed By Server
oAMQP.Consume('channel_name', 'queue_name');
procedure OnAMQPGetOk(Sender: TObject; const aChannel, aConsumerTag: string)
begin
DoLog('#AMQP_basic_GetOk: ' + aChannel + ' ' + IntToStr(aGetOk.MessageCount) + ' ' + aContent.Body.AsString);
end;