AMQP 1.0.0 is centered around the concept of messages. Messages can carry data, instructions, or commands and are the fundamental units of communication.
The protocol operates on a brokered messaging model. Brokers, which can be servers or intermediary entities, manage the routing and delivery of messages between producers and consumers.
Queues are storage entities within the broker where messages are temporarily stored. Exchanges define the rules for routing messages from producers to queues based on criteria like message content or routing keys.
AMQP 1.0.0 supports various security mechanisms, including authentication and authorization, to ensure secure communication between clients and brokers.
AMQP 1.0.0 includes mechanisms for flow control, allowing consumers to indicate their ability to handle incoming messages at a given rate. This helps prevent overwhelming consumers with a large number of messages.
A queue acts as a buffer that stores messages that are consumed later. A queue can also be declared with a number of attributes during creation. For instance, it can be marked as durable, auto-delete and exclusive, where exclusive means that it can be used by only one connection and this queue will be deleted when that connection closes.
A channel routes messages to a queue depending on the exchange type and bindings between the exchange and the queue. For a queue to receive messages, it must be bound to at least one exchange. AMQP 0.9.1 brokers should provide four exchange types - direct exchange, fanout exchange, topic exchange, and header exchange. An exchange can be declared with a number of attributes during creation. For instance, it can be marked as durable so that it survives a broker restart, or it can be marked as auto-delete meaning that it’s automatically deleted when the last queue is unbound.
A binding is a relation between a queue and an exchange consisting of a set of rules that the exchange uses (among other things) to route messages to queues.
A message is an entity sent from the publisher to the queue and finally subscribed to by the consumer. Each message contains a set of headers defining properties such as life duration, durability, and priority. AMQP 0.9.1 also has a built-in feature called message acknowledgment that is used to confirm message delivery and/or processing.
A channel is a virtual connection inside a connection, between two AMQP peers. Message publishing or consuming to or from a queue is performed over a channel (AMQP). A channel is multiplexed, one single connection can have multiple channels.
// Creating AMQP client
oAMQP := TsgcWSPClient_AMQP1.Create(nil);
// Setting AMQP authentication options
oAMQP.AMQPOptions.Authentication.AuthType := amqp1authSASLPlain;
oAMQP.AMQPOptions.Authentication.Username := 'sgc';
oAMQP.AMQPOptions.Authentication.Password := 'sgc';
// Creating WebSocket client
oClient := TsgcWebSocketClient.Create(nil);
// Setting WebSocket specifications
oClient.Specifications.RFC6455 := False;
// Setting WebSocket client properties
oClient.Host := 'www.esegece.com';
oClient.Port := 5671;
oClient.TLS := True;
// Assigning WebSocket client to AMQP client
oAMQP.Client := oClient;
// Activating WebSocket client
oClient.Active := True;
AMQP1.CreateSession('MySession');
procedure OnAMQPSessionOpen(Sender: TObject; const aSession: TsgcAMQP1Session; const aBegin: TsgcAMQP1FrameBegin);
begin
ShowMessage('#session-open: ' + aSession.Id);
end;
AMQP1.CreateSenderLink('MySession', 'MySenderLink');
procedure procedure TfrmClientAMQP1.AMQP1AMQPLinkOpen(Sender: TObject; const aSession: TsgcAMQP1Session; const aLink: TsgcAMQP1Link; const aAttach: TsgcAMQP1FrameAttach);
begin
ShowMessage('#link-open: ' + aLink.Name);
end;
AMQP1.SendMessage('MySession', 'MySenderLink', 'My first AMQP Message');
// Create websocket client and set server options
oClient := TsgcWebSocketClient.Create(nil);
oClient.Host := 'www.esegece.com';
oClient.Port := 5672;
// Create AMQP protocol and assign to websocket client
oAMQP := TsgcWSPClient_AMQP.Create(nil);
oAMQP.Client := oClient;
// AQMP Authentication
procedure OnAMQPAuthentication(Sender: TObject; aMechanisms: TsgcAMQPAuthentications; var Mechanism: TsgcAMQPAuthentication; var User, Password: string);
begin
User := 'sgc';
Password := 'sgc';
end;
// Handle AMQP methods
oAMQP.OnAMQPConnect := OnAMQPConnectHandler;
oAMQP.OnAMQPDisconnect := OnAMQPDisconnectHandler;
// connect to server
oClient.Active := True;
AMQP.Consume('channel_name', 'exchange_name', 'consumer_tag');
procedure OnAMQPBasicGetOk(Sender: TObject; const aChannel: string; const aGetOk: TsgcAMQPFramePayload_Method_BasicGetOk; const aContent: TsgcAMQPMessageContent);
begin
DoLog('#AMQP_basic_GetOk: ' + aChannel + ' ' + IntToStr(aGetOk.MessageCount) + ' ' + aContent.Body.AsString);
end;
AMQP.PublishMessage('channel_name', 'exchange_name', 'routing_key', 'Hello from sgcWebSockets!!!');
procedure OnAMQPBasicReturn(Sender: TObject; const aChannel: string; const aReturn: TsgcAMQPFramePayload_Method_BasicReturn; const aContent: TsgcAMQPMessageContent);
begin
DoLog('#AMQP_basic_return: ' + aChannel + ' ' + IntToStr(aReturn.ReplyCode) + ' ' + aReturn.ReplyText + ' ' + aContent.Body.AsString);
end;
AMQP.SetQoS('channel_name', 1024000, 100, false);
procedure OnAMQPBasicQoS(Sender: TObject; const aChannel: string; const aQoS: TsgcAMQPFramePayload_Method_BasicQoS);
begin
DoLog('#AMQP_basic_qos: ' + aChannel + ' ' + IntToStr(aQoS.PrefetchSize) + ' ' + IntToStr(aQoS.PrefetchCount) + ' ' + BoolToStr(aQoS.Global>;
end;