Pub/Sub brings the flexibility and reliability of enterprise message-oriented middleware to the cloud. At the same time, Pub/Sub is a scalable, durable event ingestion and delivery system that serves as a foundation for modern stream analytics pipelines. By providing many-to-many, asynchronous messaging that decouples senders and receivers, it allows for secure and highly available communication among independently written applications. Pub/Sub delivers low-latency, durable messaging that helps developers quickly integrate systems hosted on the Google Cloud Platform and externally.
At-least-once delivery Synchronous, cross-zone message replication and per-message receipt tracking ensures at-least-once delivery at any scale. | Open Open APIs and client libraries in seven languages support cross-cloud and hybrid deployments. | Exactly-once processing Cloud Dataflow supports reliable, expressive, exactly-once processing of Cloud Pub/Sub streams. |
Global by default Publish from anywhere in the world and consume from anywhere, with consistent latency. No replication necessary. | No provisioning, auto-everything Cloud Pub/Sub does not have shards or partitions. Just set your quota, publish, and consume. | Compliance and security Cloud Pub/Sub is a HIPAA-compliant service, offering fine-grained access controls and end-to-end encryption. |
Integrated Take advantage of integrations with multiple services, such as Cloud Storage and Gmail update events and Cloud Functions for serverless event-driven computing. | Seek and replay Rewind your backlog to any point in time or a snapshot, giving the ability to reprocess the messages. Fast forward to discard outdated data. |
A publisher application creates and sends messages to a topic. Subscriber applications create a subscription to a topic to receive messages from it. Communication can be one-to-many (fan-out), many-to-one (fan-in), and many-to-many.
Google Pub/Sub component client can login to Google Servers using the following methods:
OAuth2
The login is done using a webbrowser where the user logins with his own user and authorizes the PubSub requests.
Service Accounts
The login is done signing the requests using a private key provided by google, these method is recommended for automated services or applications without user interaction.
When a new service account is created, you can download a JSON file with all configurations. This file can be processed by the PubSub component, just call the method LoadSettingsFromFile and pass the JSON filename as argument.
OAuth2
In order to work with Google Pub/Sub API, sgcWebSockets Pub/Sub component uses OAuth2 as default authentication, so first you must set your ClientId and ClientSecret from your google account.
oPubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
oPubSub.GoogleCloudOptions.Authorization := gcaOAuth2;
oPubSub.GoogleCloudOptions.OAuth2.ClientId := '... your google client id...';
oPubSub.GoogleCloudOptions.OAuth2.ClientSecret := '... your google client secret...';
Service Accounts
Service Accounts requires to build a JWT and pass as an Authorization Token
oPubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
oPubSub.GoogleCloudOptions.Authorization := gcaJWT;
oPubSub.GoogleCloudOptions.JWT.ClientEmail := '...google email...';
oPubSub.GoogleCloudOptions.JWT.PrivateKeyId := '...private key id...';
oPubSub.GoogleCloudOptions.JWT.PrivateKey.Lines.Text := '...private key certificate...';
This is required in order to get an Authorization Token Key from Google which will be used for all Rest API calls.
All methods return a response, which may be successful or return an error.
Method | Parameters | Description | Example |
CreateSnapshot | project, snapshot, subscription | Creates a snapshot from the requested subscription. Snapshots are used in subscriptions.seek operations, which allow you to manage message acknowledgments in bulk. That is, you can set the acknowledgment state of messages in an existing subscription to the state captured by a snapshot. | CreateSnapshot('pubsub-270909', 'snapshot-1', 'subscription-1') |
DeleteSnapshot | project, snapshot | Removes an existing snapshot | DeleteSnapshot('pubsub-270909', 'snapshot-1') |
ListSnapshots | project | Lists the existing snapshots | ListSnapshots('pubsub-270909') |
Method | Parameters | Description | Example |
AcknowledgeSubscription | |||
CreateSubscription | project, subscription, topic | Creates a subscription to a given topic. If the subscription already exists, returns ALREADY_EXISTS. If the corresponding topic doesn't exist, returns NOT_FOUND. | CreateSubscription('pubsub-270909', 'subscription-1', 'topic-1') |
DeleteSubscripton | project, subscription | Deletes an existing subscription. All messages retained in the subscription are immediately dropped. | DeleteSubscription('pubsub-270909', 'subscription-1') |
GetSubscription | project, subscription | Gets the configuration details of a subscription. | GetSubscription('pubsub-270909', 'subscription-1') |
ListSubscriptions | project | Lists matching subscriptions. | ListSubscriptions('pubsub-270909', 'subscription-1') |
ModifyAckDeadlineSubscription | project, subscription, AckIds | Modifies the ack deadline for a specific message. This method is useful to indicate that more time is needed to process a message by the subscriber, or to make the message available for redelivery if the processing was interrupted. Note that this does not modify the subscription-level ackDeadlineSeconds used for subsequent messages. | |
ModifyPushConfigSubscription | project, subscription | Modifies the PushConfig for a specified subscription.This may be used to change a push subscription to a pull one (signified by an empty PushConfig) or vice versa, or change the endpoint URL and other attributes of a push subscription. Messages will accumulate for delivery continuously through the call regardless of changes to the PushConfig. | |
Pull | project, subscription | Pulls messages from the server. The server may return UNAVAILABLE if there are too many concurrent pull requests pending for the given subscription. | pull('pubsub-270909', 'subscription-1') |
Seek | project, subscription, timeUTC, snapshot | Seeks an existing subscription to a point in time or to a given snapshot, whichever is provided in the request. Snapshots are used in subscriptions.seek operations, which allow you to manage message acknowledgments in bulk. That is, you can set the acknowledgment state of messages in an existing subscription to the state captured by a snapshot. Note that both the subscription and the snapshot must be on the same topic. |
Method | Parameters | Description | Example |
CreateTopic | project, topic | Creates the given topic with the given name | CreateTopic('pubsub-270909', 'topic-1') |
DeleteTopic | project, topic | Deletes the topic with the given name. Returns NOT_FOUND if the topic does not exist. After a topic is deleted, a new topic may be created with the same name; this is an entirely new topic with none of the old configuration or subscriptions. | DeleteTopic('pubsub-270909', 'topic-1') |
GetTopic | project, topic | Gets the configuration of a topic. | GetTopic('pubsub-270909', 'topic-1') |
ListTopics | project | Lists matching topics. | ListTopics('pubsub-270909') |
Publish | project, topic, message | Adds one or more messages to the topic. Returns NOT_FOUND if the topic does not exist. | Publish('pubsub-270909', 'topic-1', 'My First PubSub Message.') |
Method | Parameters | Description | Example |
ListTopicSubscriptions | project, topic | Lists the names of the subscriptions on this topic. | ListTopicSubscriptions('pubsub-270909', 'topic-1') |
Find below the most common methods used with Google Cloud Pub/Sub API
Create a new topic for project with id: pubsub-270909 and topic name topic-1.
oPubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
oPubSub.GoogleCloudOptions.OAuth2.ClientId := '... your google client id...';
oPubSub.GoogleCloudOptions.OAuth2.ClientSecret := '... your google client secret...';
oPubSub.CreateTopic('pubsub-270909', 'topic-1');
Response from Server
{
"name": "projects/pubsub-270909/topics/topic-1"
}
Publish a new message in new topic created
oPubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
oPubSub.GoogleCloudOptions.OAuth2.ClientId := '... your google client id...';
oPubSub.GoogleCloudOptions.OAuth2.ClientSecret := '... your google client secret...';
oPubSub.Publish('pubsub-270909', 'topic-1', 'My First Message from sgcWebSockets.'));
Response from Server
{
"messageIds": [
"1050732082561505"
]
}
oPubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
oPubSub.GoogleCloudOptions.OAuth2.ClientId := '... your google client id...';
oPubSub.GoogleCloudOptions.OAuth2.ClientSecret := '... your google client secret...';
oAttributes := TStringList.Create;
Try
oAttributes.CommaText := 'origin=gcloud-sample,username=gcp';
oPubSub.Publish('pubsub-270909', 'topic-1', 'My First Message from sgcWebSockets.', oAttributes, 'username'));
Finally
oAttributes.Free;
end;
Create a new subscription for project with id: pubsub-270909, with subscription name subscription-1 and topic-1
oPubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
oPubSub.GoogleCloudOptions.OAuth2.ClientId := '... your google client id...';
oPubSub.GoogleCloudOptions.OAuth2.ClientSecret := '... your google client secret...';
oPubSub.CreateSubscription('pubsub-270909', 'subscription-1', 'topic-1');
Response from Server
{
"name": "projects/pubsub-270909/subscriptions/subscription-1",
"topic": "projects/pubsub-270909/topics/topic-1",
"pushConfig": {},
"ackDeadlineSeconds": 10,
"messageRetentionDuration": "604800s",
"expirationPolicy": {
"ttl": "2678400s"
}
}
Read messages from previous subscription created.
oPubSub := TsgcHTTPGoogleCloud_PubSub_Client.Create(nil);
oPubSub.GoogleCloudOptions.OAuth2.ClientId := '... your google client id...';
oPubSub.GoogleCloudOptions.OAuth2.ClientSecret := '... your google client secret...';
oPubSub.pubsub.Pull('pubsub-270909', 'subscription-1');
Response from Server
{
"receivedMessages": [
{
"ackId": "PjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRREFC08CKF15MEorQVh0Dj4N",
"message": {
"data": "TXkgRmlyc3QgTWVzc2FnZSBmcm9tIHNnY1dlYlNvY2tldHMu",
"messageId": "1050732082561505",
"publishTime": "2020-03-14T15:25:31.505Z"
}
}
]
}
Message is received Encode in Base64, so you must decode first to read contents.
sgcBase_Helpers.DecodeBase64('TXkgRmlyc3QgTWVzc2FnZSBmcm9tIHNnY1dlYlNvY2tldHMu=');