Client Apache Kafka

Client Apache Kafka natif pour Delphi, C++Builder et .NET. Dialogue directement avec le broker via le protocole filaire Kafka sur TCP brut, sans proxy REST ni bibliothèque externe : produis et consomme des enregistrements, coordonne les groupes de consommateurs, gère les offsets et administre les topics.

Client de sous-protocole Apache Kafka

Une implémentation Kafka de premier ordre qui s'exécute partout où tourne le runtime Delphi / .NET — des services desktop aux appareils mobiles, en streamant des enregistrements depuis et vers tout broker Kafka standard.

Classe du composant

TsgcWSPClient_Kafka

Protocole

Protocole filaire Apache Kafka sur TCP

Plateformes

Windows, macOS, Linux, iOS, Android

Édition

Standard / Professional / Enterprise

Produis, consomme et administre Kafka

Tout ce qu'il faut pour streamer des enregistrements via le protocole Kafka natif, exposé à travers de simples méthodes et événements Delphi / .NET.

Produire

Appelle Produce(topic, value, key, partition) pour publier un enregistrement avec une clé et une partition optionnelles, ou ProduceBytes pour une charge utile binaire. Choisis la garantie de livraison via KafkaOptions.Producer.Acks (kafkaAcksNone, kafkaAcksLeader ou kafkaAcksAll) ; chaque résultat arrive dans OnKafkaProduce avec le topic, la partition et l'offset stocké.

Consommer

Subscribe([topic]) puis Poll(timeoutMs) récupère les enregistrements disponibles, retourne une liste TsgcKafkaMessages et déclenche OnKafkaMessage par enregistrement. Lis chacun d'eux avec GetKeyString, GetValueString, Topic, Partition et Offset. Laisse GroupId vide pour lire directement toutes les partitions sans groupe.

Coordination des groupes de consommateurs

Règle KafkaOptions.Consumer.GroupId et le client effectue automatiquement la découverte du coordinateur, le join et le sync, l'assignation des partitions et le commit/fetch des offsets. Appelle CommitSync pour valider les offsets du dernier Poll afin que le groupe reprenne après eux à la prochaine session.

Gestion des offsets

GetEarliestOffset, GetLatestOffset et GetCommittedOffset retournent les offsets le plus ancien, prochain à écrire et dernier validé par topic et partition. CommitOffset(topic, partition, offset) fixe une position explicite afin que la consommation reprenne exactement où tu le souhaites.

Administration des topics

CreateTopic(name, partitions, replication) et DeleteTopic(name) gèrent les topics sur le broker, tandis que GetMetadata([topics]) retourne la disposition du cluster et des partitions (passe un tableau vide pour tout le cluster). Le client peut aussi lister les groupes de consommateurs.

Format de batch d'enregistrements v2

Lit et écrit le format de batch d'enregistrements v2 introduit dans Apache Kafka 0.11, avec compression kafkaCompressionGzip optionnelle. Testé avec Apache Kafka 3.x sur le port broker par défaut 9092.

Dépose le composant, configure quelques propriétés, c'est parti

Associe un TsgcWebSocketClient à TsgcWSPClient_Kafka, mets Specifications.RFC6455 à false pour du TCP brut, configure KafkaOptions, puis Produce et Poll.

uses
  sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client;

var
  WSClient: TsgcWebSocketClient;
  Kafka: TsgcWSPClient_Kafka;
begin
  Kafka := TsgcWSPClient_Kafka.Create(nil);
  Kafka.KafkaOptions.ClientId := 'my-delphi-app';
  Kafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
  Kafka.KafkaOptions.Consumer.GroupId := 'my-group';
  Kafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
  Kafka.OnKafkaMessage := KafkaMessage;
  Kafka.OnKafkaProduce := KafkaProduce;

  WSClient := TsgcWebSocketClient.Create(nil);
  Kafka.Client := WSClient;
  WSClient.Specifications.RFC6455 := False;
  WSClient.Host := '127.0.0.1';
  WSClient.Port := 9092;
  WSClient.Active := True;

  // produire un enregistrement vers un topic
  Kafka.Produce('my-topic', 'Hello Kafka', 'key-1');

  // consommer : subscribe une fois, puis Poll en boucle (p. ex. depuis un timer)
  Kafka.Subscribe(['my-topic']);
end;

procedure TForm1.KafkaMessage(Sender: TObject;
  const Message: TsgcKafkaMessage);
begin
  Memo1.Lines.Add(Message.GetKeyString + ' = ' + Message.GetValueString);
end;

procedure TForm1.KafkaProduce(Sender: TObject; const Topic: string;
  Partition: Integer; Offset: Int64; ErrorCode: Integer);
begin
  Memo1.Lines.Add(Format('Produced to %s [%d] at offset %d',
    [Topic, Partition, Offset]));
end;

// récupérer les enregistrements, valider quand un lot est traité
var
  Messages: TsgcKafkaMessages;
begin
  Messages := Kafka.Poll(1000);
  try
    if Messages.Count > 0 then
      Kafka.CommitSync;
  finally
    Messages.Free;
  end;
end;
// uses: sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client
TsgcWSPClient_Kafka *Kafka = new TsgcWSPClient_Kafka(this);
Kafka->KafkaOptions->ClientId = "my-cbuilder-app";
Kafka->KafkaOptions->Producer->Acks = kafkaAcksLeader;
Kafka->KafkaOptions->Consumer->GroupId = "my-group";
Kafka->KafkaOptions->Consumer->OffsetReset = kafkaOffsetEarliest;
Kafka->OnKafkaMessage = KafkaMessage;
Kafka->OnKafkaProduce = KafkaProduce;

TsgcWebSocketClient *WSClient = new TsgcWebSocketClient(this);
Kafka->Client = WSClient;
WSClient->Specifications->RFC6455 = false;
WSClient->Host = "127.0.0.1";
WSClient->Port = 9092;
WSClient->Active = true;

// produire un enregistrement vers un topic
Kafka->Produce("my-topic", "Hello Kafka", "key-1");

// consommer : subscribe une fois, puis Poll en boucle
Kafka->Subscribe(ARRAYOFCONST(("my-topic")));

void __fastcall TForm1::KafkaMessage(TObject *Sender, TsgcKafkaMessage *Message)
{
  Memo1->Lines->Add(Message->GetKeyString() + " = " + Message->GetValueString());
}

// récupérer les enregistrements, valider quand un lot est traité
TsgcKafkaMessages *Messages = Kafka->Poll(1000);
try {
  if (Messages->Count > 0)
    Kafka->CommitSync();
} __finally {
  Messages->Free();
}
using esegece.sgcWebSockets;

var Kafka = new TsgcWSPClient_Kafka();
Kafka.KafkaOptions.ClientId = "my-net-app";
Kafka.KafkaOptions.Producer.Acks = TsgcKafkaAcks.kafkaAcksLeader;
Kafka.KafkaOptions.Consumer.GroupId = "my-group";
Kafka.KafkaOptions.Consumer.OffsetReset = TsgcKafkaOffsetReset.kafkaOffsetEarliest;
Kafka.OnKafkaMessage += (sender, message) =>
  Console.WriteLine(message.GetKeyString() + " = " + message.GetValueString());
Kafka.OnKafkaProduce += (sender, topic, partition, offset, error) =>
  Console.WriteLine($"Produced to {topic} [{partition}] at offset {offset}");

var WSClient = new TsgcWebSocketClient();
Kafka.Client = WSClient;
WSClient.Specifications.RFC6455 = false;
WSClient.Host = "127.0.0.1";
WSClient.Port = 9092;
WSClient.Active = true;

// produire un enregistrement vers un topic
Kafka.Produce("my-topic", "Hello Kafka", "key-1");

// consommer : subscribe une fois, puis Poll en boucle
Kafka.Subscribe(new string[] { "my-topic" });

TsgcKafkaMessages Messages = Kafka.Poll(1000);
try {
  if (Messages.Count > 0)
    Kafka.CommitSync();
} finally {
  Messages.Free();
}

Spécifications & références

Sources faisant autorité pour les protocoles que ce composant implémente.

Documentation & démos

Lien direct vers la référence du composant, récupère le projet de démo prêt à l'emploi et télécharge l'essai.

Aide en ligne — Kafka Référence complète des propriétés, méthodes et événements pour ce composant.
Projet de démo — Demos\Protocols\Kafka Projet d'exemple prêt à l'emploi. Livré dans le package sgcWebSockets — télécharge l'essai ci-dessous.
Manuel utilisateur (PDF) Manuel exhaustif couvrant chaque composant de la bibliothèque.

Prêt à streamer avec Apache Kafka ?

Télécharge l'essai gratuit et commence à bâtir des solutions d'event-streaming en quelques minutes.