Klient Apache Kafka

Natywny klient Apache Kafka dla Delphi, C++Builder i .NET. Komunikuj się z brokerem bezpośrednio przez protokół przewodowy Kafka na zwykłym TCP, bez proxy REST ani zewnętrznej biblioteki: produkuj i konsumuj rekordy, koordynuj grupy konsumentów, zarządzaj offsetami i administruj tematami.

Klient subprotokołu Apache Kafka

Pierwszorzędna implementacja Kafka, która działa wszędzie tam, gdzie działa środowisko uruchomieniowe Delphi / .NET — od usług na komputerach stacjonarnych po urządzenia mobilne, przesyłając strumieniowo rekordy do i z dowolnego standardowego brokera Kafka.

Klasa komponentu

TsgcWSPClient_Kafka

Protokół

Protokół przewodowy Apache Kafka przez TCP

Platformy

Windows, macOS, Linux, iOS, Android

Edycja

Standard / Professional / Enterprise

Produkuj, konsumuj i administruj Kafka

Wszystko, czego potrzebujesz, aby przesyłać strumieniowo rekordy przez natywny protokół Kafka, udostępnione przez zwykłe metody i zdarzenia Delphi / .NET.

Produkcja

Wywołaj Produce(topic, value, key, partition), aby opublikować rekord z opcjonalnym kluczem i partycją, lub ProduceBytes dla ładunku binarnego. Wybierz gwarancję dostarczenia poprzez KafkaOptions.Producer.Acks (kafkaAcksNone, kafkaAcksLeader lub kafkaAcksAll); każdy wynik trafia do OnKafkaProduce wraz z tematem, partycją i zapisanym offsetem.

Konsumpcja

Subscribe([topic]), a następnie Poll(timeoutMs) pobiera dostępne rekordy, zwracając listę TsgcKafkaMessages i wywołując OnKafkaMessage dla każdego rekordu. Odczytaj każdy z nich za pomocą GetKeyString, GetValueString, Topic, Partition i Offset. Pozostaw GroupId puste, aby odczytywać wszystkie partycje bezpośrednio bez grupy.

Koordynacja grupy konsumentów

Ustaw KafkaOptions.Consumer.GroupId, a klient automatycznie wykonuje wykrywanie koordynatora, dołączenie i synchronizację, przypisanie partycji oraz commit/fetch offsetów. Wywołaj CommitSync, aby zatwierdzić offsety ostatniego Poll, dzięki czemu grupa wznawia pracę po nich w następnej sesji.

Zarządzanie offsetami

GetEarliestOffset, GetLatestOffset i GetCommittedOffset zwracają najstarszy, kolejny do zapisu i ostatnio zatwierdzony offset dla każdego tematu i partycji. CommitOffset(topic, partition, offset) ustawia jawną pozycję, dzięki czemu konsumpcja wznawia się dokładnie tam, gdzie chcesz.

Administracja tematami

CreateTopic(name, partitions, replication) i DeleteTopic(name) zarządzają tematami na brokerze, podczas gdy GetMetadata([topics]) zwraca układ klastra i partycji (przekaż pustą tablicę dla całego klastra). Klient może również wyświetlać listę grup konsumentów.

Format batcha rekordów v2

Odczytuje i zapisuje format batcha rekordów v2 wprowadzony w Apache Kafka 0.11, z opcjonalną kompresją kafkaCompressionGzip. Przetestowany z Apache Kafka 3.x na domyślnym porcie brokera 9092.

Upuść komponent, ustaw kilka właściwości, gotowe

Sparuj TsgcWebSocketClient z TsgcWSPClient_Kafka, ustaw Specifications.RFC6455 na false dla zwykłego TCP, skonfiguruj KafkaOptions, a następnie Produce i Poll.

uses
  sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client;

var
  WSClient: TsgcWebSocketClient;
  Kafka: TsgcWSPClient_Kafka;
begin
  Kafka := TsgcWSPClient_Kafka.Create(nil);
  Kafka.KafkaOptions.ClientId := 'my-delphi-app';
  Kafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
  Kafka.KafkaOptions.Consumer.GroupId := 'my-group';
  Kafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
  Kafka.OnKafkaMessage := KafkaMessage;
  Kafka.OnKafkaProduce := KafkaProduce;

  WSClient := TsgcWebSocketClient.Create(nil);
  Kafka.Client := WSClient;
  WSClient.Specifications.RFC6455 := False;
  WSClient.Host := '127.0.0.1';
  WSClient.Port := 9092;
  WSClient.Active := True;

  // produkuj rekord do tematu
  Kafka.Produce('my-topic', 'Hello Kafka', 'key-1');

  // konsumuj: zasubskrybuj raz, następnie wielokrotnie Poll (np. z timera)
  Kafka.Subscribe(['my-topic']);
end;

procedure TForm1.KafkaMessage(Sender: TObject;
  const Message: TsgcKafkaMessage);
begin
  Memo1.Lines.Add(Message.GetKeyString + ' = ' + Message.GetValueString);
end;

procedure TForm1.KafkaProduce(Sender: TObject; const Topic: string;
  Partition: Integer; Offset: Int64; ErrorCode: Integer);
begin
  Memo1.Lines.Add(Format('Produced to %s [%d] at offset %d',
    [Topic, Partition, Offset]));
end;

// pobierz rekordy, zatwierdź po przetworzeniu batcha
var
  Messages: TsgcKafkaMessages;
begin
  Messages := Kafka.Poll(1000);
  try
    if Messages.Count > 0 then
      Kafka.CommitSync;
  finally
    Messages.Free;
  end;
end;
// uses: sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client
TsgcWSPClient_Kafka *Kafka = new TsgcWSPClient_Kafka(this);
Kafka->KafkaOptions->ClientId = "my-cbuilder-app";
Kafka->KafkaOptions->Producer->Acks = kafkaAcksLeader;
Kafka->KafkaOptions->Consumer->GroupId = "my-group";
Kafka->KafkaOptions->Consumer->OffsetReset = kafkaOffsetEarliest;
Kafka->OnKafkaMessage = KafkaMessage;
Kafka->OnKafkaProduce = KafkaProduce;

TsgcWebSocketClient *WSClient = new TsgcWebSocketClient(this);
Kafka->Client = WSClient;
WSClient->Specifications->RFC6455 = false;
WSClient->Host = "127.0.0.1";
WSClient->Port = 9092;
WSClient->Active = true;

// produkuj rekord do tematu
Kafka->Produce("my-topic", "Hello Kafka", "key-1");

// konsumuj: zasubskrybuj raz, następnie wielokrotnie Poll
Kafka->Subscribe(ARRAYOFCONST(("my-topic")));

void __fastcall TForm1::KafkaMessage(TObject *Sender, TsgcKafkaMessage *Message)
{
  Memo1->Lines->Add(Message->GetKeyString() + " = " + Message->GetValueString());
}

// pobierz rekordy, zatwierdź po przetworzeniu batcha
TsgcKafkaMessages *Messages = Kafka->Poll(1000);
try {
  if (Messages->Count > 0)
    Kafka->CommitSync();
} __finally {
  Messages->Free();
}
using esegece.sgcWebSockets;

var Kafka = new TsgcWSPClient_Kafka();
Kafka.KafkaOptions.ClientId = "my-net-app";
Kafka.KafkaOptions.Producer.Acks = TsgcKafkaAcks.kafkaAcksLeader;
Kafka.KafkaOptions.Consumer.GroupId = "my-group";
Kafka.KafkaOptions.Consumer.OffsetReset = TsgcKafkaOffsetReset.kafkaOffsetEarliest;
Kafka.OnKafkaMessage += (sender, message) =>
  Console.WriteLine(message.GetKeyString() + " = " + message.GetValueString());
Kafka.OnKafkaProduce += (sender, topic, partition, offset, error) =>
  Console.WriteLine($"Produced to {topic} [{partition}] at offset {offset}");

var WSClient = new TsgcWebSocketClient();
Kafka.Client = WSClient;
WSClient.Specifications.RFC6455 = false;
WSClient.Host = "127.0.0.1";
WSClient.Port = 9092;
WSClient.Active = true;

// produkuj rekord do tematu
Kafka.Produce("my-topic", "Hello Kafka", "key-1");

// konsumuj: zasubskrybuj raz, następnie wielokrotnie Poll
Kafka.Subscribe(new string[] { "my-topic" });

TsgcKafkaMessages Messages = Kafka.Poll(1000);
try {
  if (Messages.Count > 0)
    Kafka.CommitSync();
} finally {
  Messages.Free();
}

Specyfikacje i odniesienia

Wiarygodne źródła dla protokołów, które implementuje ten komponent.

Dokumentacja i wersje demo

Przejdź bezpośrednio do dokumentacji komponentu, pobierz gotowy do uruchomienia projekt demo i wersję próbną.

Pomoc online — Kafka Pełna dokumentacja właściwości, metod i zdarzeń tego komponentu.
Projekt demo — Demos\Protocols\Kafka Gotowy do uruchomienia projekt przykładowy. Dostarczany w pakiecie sgcWebSockets — pobierz wersję próbną poniżej.
Podręcznik użytkownika (PDF) Kompleksowy podręcznik obejmujący każdy komponent w bibliotece.

Gotowy, aby przesyłać strumieniowo z Apache Kafka?

Pobierz bezpłatną wersję próbną i zacznij budować rozwiązania do strumieniowania zdarzeń w kilka minut.