Apache Kafka Client

Nativer Apache-Kafka-Client für Delphi, C++Builder und .NET. Kommunizieren Sie direkt über das Kafka-Wire-Protokoll auf reinem TCP mit dem Broker, ohne REST-Proxy oder externe Bibliothek: produzieren und konsumieren Sie Datensätze, koordinieren Sie Consumer-Gruppen, verwalten Sie Offsets und administrieren Sie Topics.

Apache-Kafka-Subprotokoll-Client

Eine erstklassige Kafka-Implementierung, die überall dort läuft, wo die Delphi- bzw. .NET-Laufzeitumgebung läuft — von Desktop-Diensten bis zu Mobilgeräten und streamt Datensätze zu und von jedem standardkonformen Kafka-Broker.

Komponentenklasse

TsgcWSPClient_Kafka

Protokoll

Apache Kafka Wire-Protokoll über TCP

Plattformen

Windows, macOS, Linux, iOS, Android

Edition

Standard / Professional / Enterprise

Kafka produzieren, konsumieren und administrieren

Alles, was Sie zum Streamen von Datensätzen über das native Kafka-Protokoll benötigen, bereitgestellt über schlichte Delphi-/.NET-Methoden und -Events.

Produce

Rufen Sie Produce(topic, value, key, partition) auf, um einen Datensatz mit optionalem Schlüssel und optionaler Partition zu veröffentlichen, oder ProduceBytes für einen binären Payload. Wählen Sie die Zustellgarantie über KafkaOptions.Producer.Acks (kafkaAcksNone, kafkaAcksLeader oder kafkaAcksAll); jedes Ergebnis trifft in OnKafkaProduce mit Topic, Partition und gespeichertem Offset ein.

Consume

Subscribe([topic]) und anschließend Poll(timeoutMs) ruft die verfügbaren Datensätze ab, liefert eine TsgcKafkaMessages-Liste zurück und löst pro Datensatz OnKafkaMessage aus. Lesen Sie jeden mit GetKeyString, GetValueString, Topic, Partition und Offset. Lassen Sie GroupId leer, um alle Partitionen direkt ohne Gruppe zu lesen.

Koordination von Consumer-Gruppen

Setzen Sie KafkaOptions.Consumer.GroupId und der Client führt Coordinator-Discovery, Join und Sync, Partitionszuweisung sowie Offset-Commit/-Fetch automatisch aus. Rufen Sie CommitSync auf, um die Offsets des letzten Poll zu committen, sodass die Gruppe in der nächsten Session danach fortsetzt.

Offset-Verwaltung

GetEarliestOffset, GetLatestOffset und GetCommittedOffset liefern den ältesten, den als Nächstes zu schreibenden und den zuletzt committeten Offset pro Topic und Partition zurück. CommitOffset(topic, partition, offset) setzt eine explizite Position, sodass der Konsum genau dort fortsetzt, wo Sie es möchten.

Topic-Administration

CreateTopic(name, partitions, replication) und DeleteTopic(name) verwalten Topics auf dem Broker, während GetMetadata([topics]) das Cluster- und Partitionslayout zurückliefert (übergeben Sie ein leeres Array für das gesamte Cluster). Der Client kann zudem Consumer-Gruppen auflisten.

v2-Record-Batch-Format

Liest und schreibt das in Apache Kafka 0.11 eingeführte v2-Record-Batch-Format, mit optionaler kafkaCompressionGzip-Komprimierung. Getestet gegen Apache Kafka 3.x auf dem standardmäßigen Broker-Port 9092.

Komponente einfügen, einige Eigenschaften setzen, loslegen

Kombinieren Sie einen TsgcWebSocketClient mit einem TsgcWSPClient_Kafka, setzen Sie Specifications.RFC6455 für reines TCP auf false, konfigurieren Sie KafkaOptions und führen Sie anschließend Produce und Poll aus.

uses
  sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client;

var
  WSClient: TsgcWebSocketClient;
  Kafka: TsgcWSPClient_Kafka;
begin
  Kafka := TsgcWSPClient_Kafka.Create(nil);
  Kafka.KafkaOptions.ClientId := 'my-delphi-app';
  Kafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
  Kafka.KafkaOptions.Consumer.GroupId := 'my-group';
  Kafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
  Kafka.OnKafkaMessage := KafkaMessage;
  Kafka.OnKafkaProduce := KafkaProduce;

  WSClient := TsgcWebSocketClient.Create(nil);
  Kafka.Client := WSClient;
  WSClient.Specifications.RFC6455 := False;
  WSClient.Host := '127.0.0.1';
  WSClient.Port := 9092;
  WSClient.Active := True;

  // einen Datensatz an ein Topic produzieren
  Kafka.Produce('my-topic', 'Hello Kafka', 'key-1');

  // konsumieren: einmal abonnieren, dann wiederholt Poll (z. B. per Timer)
  Kafka.Subscribe(['my-topic']);
end;

procedure TForm1.KafkaMessage(Sender: TObject;
  const Message: TsgcKafkaMessage);
begin
  Memo1.Lines.Add(Message.GetKeyString + ' = ' + Message.GetValueString);
end;

procedure TForm1.KafkaProduce(Sender: TObject; const Topic: string;
  Partition: Integer; Offset: Int64; ErrorCode: Integer);
begin
  Memo1.Lines.Add(Format('Produced to %s [%d] at offset %d',
    [Topic, Partition, Offset]));
end;

// Datensätze abrufen, committen, sobald ein Batch verarbeitet wurde
var
  Messages: TsgcKafkaMessages;
begin
  Messages := Kafka.Poll(1000);
  try
    if Messages.Count > 0 then
      Kafka.CommitSync;
  finally
    Messages.Free;
  end;
end;
// uses: sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client
TsgcWSPClient_Kafka *Kafka = new TsgcWSPClient_Kafka(this);
Kafka->KafkaOptions->ClientId = "my-cbuilder-app";
Kafka->KafkaOptions->Producer->Acks = kafkaAcksLeader;
Kafka->KafkaOptions->Consumer->GroupId = "my-group";
Kafka->KafkaOptions->Consumer->OffsetReset = kafkaOffsetEarliest;
Kafka->OnKafkaMessage = KafkaMessage;
Kafka->OnKafkaProduce = KafkaProduce;

TsgcWebSocketClient *WSClient = new TsgcWebSocketClient(this);
Kafka->Client = WSClient;
WSClient->Specifications->RFC6455 = false;
WSClient->Host = "127.0.0.1";
WSClient->Port = 9092;
WSClient->Active = true;

// einen Datensatz an ein Topic produzieren
Kafka->Produce("my-topic", "Hello Kafka", "key-1");

// konsumieren: einmal abonnieren, dann wiederholt Poll
Kafka->Subscribe(ARRAYOFCONST(("my-topic")));

void __fastcall TForm1::KafkaMessage(TObject *Sender, TsgcKafkaMessage *Message)
{
  Memo1->Lines->Add(Message->GetKeyString() + " = " + Message->GetValueString());
}

// Datensätze abrufen, committen, sobald ein Batch verarbeitet wurde
TsgcKafkaMessages *Messages = Kafka->Poll(1000);
try {
  if (Messages->Count > 0)
    Kafka->CommitSync();
} __finally {
  Messages->Free();
}
using esegece.sgcWebSockets;

var Kafka = new TsgcWSPClient_Kafka();
Kafka.KafkaOptions.ClientId = "my-net-app";
Kafka.KafkaOptions.Producer.Acks = TsgcKafkaAcks.kafkaAcksLeader;
Kafka.KafkaOptions.Consumer.GroupId = "my-group";
Kafka.KafkaOptions.Consumer.OffsetReset = TsgcKafkaOffsetReset.kafkaOffsetEarliest;
Kafka.OnKafkaMessage += (sender, message) =>
  Console.WriteLine(message.GetKeyString() + " = " + message.GetValueString());
Kafka.OnKafkaProduce += (sender, topic, partition, offset, error) =>
  Console.WriteLine($"Produced to {topic} [{partition}] at offset {offset}");

var WSClient = new TsgcWebSocketClient();
Kafka.Client = WSClient;
WSClient.Specifications.RFC6455 = false;
WSClient.Host = "127.0.0.1";
WSClient.Port = 9092;
WSClient.Active = true;

// einen Datensatz an ein Topic produzieren
Kafka.Produce("my-topic", "Hello Kafka", "key-1");

// konsumieren: einmal abonnieren, dann wiederholt Poll
Kafka.Subscribe(new string[] { "my-topic" });

TsgcKafkaMessages Messages = Kafka.Poll(1000);
try {
  if (Messages.Count > 0)
    Kafka.CommitSync();
} finally {
  Messages.Free();
}

Spezifikationen & Referenzen

Maßgebliche Quellen für die von dieser Komponente implementierten Protokolle.

Dokumentation & Demos

Springen Sie direkt zur Komponentenreferenz, holen Sie sich das einsatzbereite Demoprojekt und laden Sie die Testversion herunter.

Online-Hilfe — Kafka Vollständige Referenz zu Eigenschaften, Methoden und Events dieser Komponente.
Demoprojekt — Demos\Protocols\Kafka Einsatzbereites Beispielprojekt. Im sgcWebSockets-Paket enthalten — laden Sie unten die Testversion herunter.
Benutzerhandbuch (PDF) Ausführliches Handbuch zu allen Komponenten der Bibliothek.

Bereit, mit Apache Kafka zu streamen?

Laden Sie die kostenlose Testversion herunter und entwickeln Sie in wenigen Minuten Event-Streaming-Lösungen.