Apache Kafka-client

Native Apache Kafka-client voor Delphi, C++Builder en .NET. Communiceer rechtstreeks met de broker via het Kafka wire protocol op plain TCP, zonder REST-proxy of externe bibliotheek: produceer en consumeer records, coördineer consumer groups, beheer offsets en administreer topics.

Apache Kafka-subprotocol-client

Een eersteklas Kafka-implementatie die overal draait waar de Delphi-/.NET-runtime draait — van desktopservices tot mobiele apparaten, records streamend van en naar elke standaard Kafka-broker.

Componentklasse

TsgcWSPClient_Kafka

Protocol

Apache Kafka wire protocol over TCP

Platforms

Windows, macOS, Linux, iOS, Android

Editie

Standard / Professional / Enterprise

Produceer, consumeer en administreer Kafka

Alles wat je nodig hebt om records te streamen via het native Kafka-protocol, blootgesteld via gewone Delphi-/.NET-methodes en -events.

Produceren

Roep Produce(topic, value, key, partition) aan om een record met een optionele key en partitie te publiceren, of ProduceBytes voor een binaire payload. Kies de afleveringsgarantie via KafkaOptions.Producer.Acks (kafkaAcksNone, kafkaAcksLeader of kafkaAcksAll); elk resultaat komt binnen in OnKafkaProduce met de topic, partitie en opgeslagen offset.

Consumeren

Subscribe([topic]) gevolgd door Poll(timeoutMs) haalt de beschikbare records op, retourneert een TsgcKafkaMessages-lijst en triggert OnKafkaMessage per record. Lees elk record met GetKeyString, GetValueString, Topic, Partition en Offset. Laat GroupId leeg om alle partities rechtstreeks te lezen zonder group.

Coördinatie van consumer groups

Stel KafkaOptions.Consumer.GroupId in en de client voert coordinator discovery, join en sync, partitietoewijzing en offset commit/fetch automatisch uit. Roep CommitSync aan om de offsets van de laatste Poll te committen zodat de group er bij de volgende sessie na hervat.

Offset-beheer

GetEarliestOffset, GetLatestOffset en GetCommittedOffset retourneren de oudste, de eerstvolgende-te-schrijven en de laatst-gecommitte offsets per topic en partitie. CommitOffset(topic, partition, offset) stelt een expliciete positie in zodat de consumptie precies hervat waar jij wilt.

Topic-administratie

CreateTopic(name, partitions, replication) en DeleteTopic(name) beheren topics op de broker, terwijl GetMetadata([topics]) de cluster- en partitie-layout retourneert (geef een lege array door voor de hele cluster). De client kan ook consumer groups oplijsten.

v2 record batch-formaat

Leest en schrijft het v2 record batch-formaat dat in Apache Kafka 0.11 is geïntroduceerd, met optionele kafkaCompressionGzip-compressie. Getest tegen Apache Kafka 3.x op de standaard broker-poort 9092.

Plaats het component, stel een paar eigenschappen in, klaar

Koppel een TsgcWebSocketClient aan TsgcWSPClient_Kafka, zet Specifications.RFC6455 op false voor plain TCP, configureer KafkaOptions en voer daarna Produce en Poll uit.

uses
  sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client;

var
  WSClient: TsgcWebSocketClient;
  Kafka: TsgcWSPClient_Kafka;
begin
  Kafka := TsgcWSPClient_Kafka.Create(nil);
  Kafka.KafkaOptions.ClientId := 'my-delphi-app';
  Kafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
  Kafka.KafkaOptions.Consumer.GroupId := 'my-group';
  Kafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
  Kafka.OnKafkaMessage := KafkaMessage;
  Kafka.OnKafkaProduce := KafkaProduce;

  WSClient := TsgcWebSocketClient.Create(nil);
  Kafka.Client := WSClient;
  WSClient.Specifications.RFC6455 := False;
  WSClient.Host := '127.0.0.1';
  WSClient.Port := 9092;
  WSClient.Active := True;

  // produceer een record naar een topic
  Kafka.Produce('my-topic', 'Hello Kafka', 'key-1');

  // consumeren: abonneer eenmalig, roep daarna herhaaldelijk Poll aan (bijv. vanuit een timer)
  Kafka.Subscribe(['my-topic']);
end;

procedure TForm1.KafkaMessage(Sender: TObject;
  const Message: TsgcKafkaMessage);
begin
  Memo1.Lines.Add(Message.GetKeyString + ' = ' + Message.GetValueString);
end;

procedure TForm1.KafkaProduce(Sender: TObject; const Topic: string;
  Partition: Integer; Offset: Int64; ErrorCode: Integer);
begin
  Memo1.Lines.Add(Format('Produced to %s [%d] at offset %d',
    [Topic, Partition, Offset]));
end;

// haal records op, commit wanneer een batch is verwerkt
var
  Messages: TsgcKafkaMessages;
begin
  Messages := Kafka.Poll(1000);
  try
    if Messages.Count > 0 then
      Kafka.CommitSync;
  finally
    Messages.Free;
  end;
end;
// uses: sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client
TsgcWSPClient_Kafka *Kafka = new TsgcWSPClient_Kafka(this);
Kafka->KafkaOptions->ClientId = "my-cbuilder-app";
Kafka->KafkaOptions->Producer->Acks = kafkaAcksLeader;
Kafka->KafkaOptions->Consumer->GroupId = "my-group";
Kafka->KafkaOptions->Consumer->OffsetReset = kafkaOffsetEarliest;
Kafka->OnKafkaMessage = KafkaMessage;
Kafka->OnKafkaProduce = KafkaProduce;

TsgcWebSocketClient *WSClient = new TsgcWebSocketClient(this);
Kafka->Client = WSClient;
WSClient->Specifications->RFC6455 = false;
WSClient->Host = "127.0.0.1";
WSClient->Port = 9092;
WSClient->Active = true;

// produceer een record naar een topic
Kafka->Produce("my-topic", "Hello Kafka", "key-1");

// consumeren: abonneer eenmalig, roep daarna herhaaldelijk Poll aan
Kafka->Subscribe(ARRAYOFCONST(("my-topic")));

void __fastcall TForm1::KafkaMessage(TObject *Sender, TsgcKafkaMessage *Message)
{
  Memo1->Lines->Add(Message->GetKeyString() + " = " + Message->GetValueString());
}

// haal records op, commit wanneer een batch is verwerkt
TsgcKafkaMessages *Messages = Kafka->Poll(1000);
try {
  if (Messages->Count > 0)
    Kafka->CommitSync();
} __finally {
  Messages->Free();
}
using esegece.sgcWebSockets;

var Kafka = new TsgcWSPClient_Kafka();
Kafka.KafkaOptions.ClientId = "my-net-app";
Kafka.KafkaOptions.Producer.Acks = TsgcKafkaAcks.kafkaAcksLeader;
Kafka.KafkaOptions.Consumer.GroupId = "my-group";
Kafka.KafkaOptions.Consumer.OffsetReset = TsgcKafkaOffsetReset.kafkaOffsetEarliest;
Kafka.OnKafkaMessage += (sender, message) =>
  Console.WriteLine(message.GetKeyString() + " = " + message.GetValueString());
Kafka.OnKafkaProduce += (sender, topic, partition, offset, error) =>
  Console.WriteLine($"Produced to {topic} [{partition}] at offset {offset}");

var WSClient = new TsgcWebSocketClient();
Kafka.Client = WSClient;
WSClient.Specifications.RFC6455 = false;
WSClient.Host = "127.0.0.1";
WSClient.Port = 9092;
WSClient.Active = true;

// produceer een record naar een topic
Kafka.Produce("my-topic", "Hello Kafka", "key-1");

// consumeren: abonneer eenmalig, roep daarna herhaaldelijk Poll aan
Kafka.Subscribe(new string[] { "my-topic" });

TsgcKafkaMessages Messages = Kafka.Poll(1000);
try {
  if (Messages.Count > 0)
    Kafka.CommitSync();
} finally {
  Messages.Free();
}

Specificaties & referenties

Gezaghebbende bronnen voor de protocollen die dit component implementeert.

Documentatie & demo's

Deep-link naar de componentreferentie, pak het kant-en-klare demoproject en download de proefversie.

Online-help — Kafka Volledige eigenschap-, methode- en eventreferentie voor dit component.
Demoproject — Demos\Protocols\Kafka Kant-en-klaar voorbeeldproject. Wordt meegeleverd met het sgcWebSockets-pakket — download hieronder de proefversie.
Gebruikershandleiding (PDF) Uitgebreide handleiding die elk component in de bibliotheek behandelt.

Klaar om te streamen met Apache Kafka?

Download de gratis proefversie en bouw binnen enkele minuten event-streaming-oplossingen.