Client Apache Kafka

Client Apache Kafka nativo per Delphi, C++Builder e .NET. Dialoga direttamente con il broker tramite il wire protocol Kafka su TCP puro, senza proxy REST né librerie esterne: produce e consuma record, coordina i consumer group, gestisce gli offset e amministra i topic.

Client per il subprotocollo Apache Kafka

Un'implementazione Kafka di prima classe che gira ovunque giri il runtime Delphi / .NET — dai servizi desktop ai dispositivi mobili, trasmettendo record da e verso qualsiasi broker Kafka conforme allo standard.

Classe del componente

TsgcWSPClient_Kafka

Protocollo

Wire protocol Apache Kafka su TCP

Piattaforme

Windows, macOS, Linux, iOS, Android

Edizione

Standard / Professional / Enterprise

Produci, consuma e amministra Kafka

Tutto ciò che serve per trasmettere record sul protocollo Kafka nativo, esposto tramite semplici metodi ed eventi Delphi / .NET.

Produci

Chiama Produce(topic, value, key, partition) per pubblicare un record con chiave e partizione opzionali, oppure ProduceBytes per un payload binario. Scegli la garanzia di consegna tramite KafkaOptions.Producer.Acks (kafkaAcksNone, kafkaAcksLeader o kafkaAcksAll); ogni risultato arriva in OnKafkaProduce con il topic, la partizione e l'offset memorizzato.

Consuma

Subscribe([topic]) e poi Poll(timeoutMs) recupera i record disponibili, restituendo una lista TsgcKafkaMessages e sollevando OnKafkaMessage per ogni record. Leggi ciascuno con GetKeyString, GetValueString, Topic, Partition e Offset. Lascia GroupId vuoto per leggere tutte le partizioni direttamente senza un gruppo.

Coordinamento dei consumer group

Imposta KafkaOptions.Consumer.GroupId e il client esegue automaticamente la scoperta del coordinator, il join e il sync, l'assegnazione delle partizioni e il commit/fetch degli offset. Chiama CommitSync per fare il commit degli offset dell'ultimo Poll, così il gruppo riprende da essi nella sessione successiva.

Gestione degli offset

GetEarliestOffset, GetLatestOffset e GetCommittedOffset restituiscono il più vecchio, il prossimo da scrivere e l'ultimo offset committato per topic e partizione. CommitOffset(topic, partition, offset) imposta una posizione esplicita così il consumo riprende esattamente dove vuoi.

Amministrazione dei topic

CreateTopic(name, partitions, replication) e DeleteTopic(name) gestiscono i topic sul broker, mentre GetMetadata([topics]) restituisce il layout di cluster e partizioni (passa un array vuoto per l'intero cluster). Il client può anche elencare i consumer group.

Formato record batch v2

Legge e scrive il formato record batch v2 introdotto in Apache Kafka 0.11, con compressione kafkaCompressionGzip opzionale. Testato con Apache Kafka 3.x sulla porta broker predefinita 9092.

Trascina il componente, imposta qualche proprietà e via

Abbina un TsgcWebSocketClient a un TsgcWSPClient_Kafka, imposta Specifications.RFC6455 a false per il TCP puro, configura KafkaOptions, quindi richiama Produce e Poll.

uses
  sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client;

var
  WSClient: TsgcWebSocketClient;
  Kafka: TsgcWSPClient_Kafka;
begin
  Kafka := TsgcWSPClient_Kafka.Create(nil);
  Kafka.KafkaOptions.ClientId := 'my-delphi-app';
  Kafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
  Kafka.KafkaOptions.Consumer.GroupId := 'my-group';
  Kafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
  Kafka.OnKafkaMessage := KafkaMessage;
  Kafka.OnKafkaProduce := KafkaProduce;

  WSClient := TsgcWebSocketClient.Create(nil);
  Kafka.Client := WSClient;
  WSClient.Specifications.RFC6455 := False;
  WSClient.Host := '127.0.0.1';
  WSClient.Port := 9092;
  WSClient.Active := True;

  // produce un record verso un topic
  Kafka.Produce('my-topic', 'Hello Kafka', 'key-1');

  // consuma: subscribe una volta, poi Poll ripetutamente (es. da un timer)
  Kafka.Subscribe(['my-topic']);
end;

procedure TForm1.KafkaMessage(Sender: TObject;
  const Message: TsgcKafkaMessage);
begin
  Memo1.Lines.Add(Message.GetKeyString + ' = ' + Message.GetValueString);
end;

procedure TForm1.KafkaProduce(Sender: TObject; const Topic: string;
  Partition: Integer; Offset: Int64; ErrorCode: Integer);
begin
  Memo1.Lines.Add(Format('Produced to %s [%d] at offset %d',
    [Topic, Partition, Offset]));
end;

// recupera i record, fai il commit quando un batch è elaborato
var
  Messages: TsgcKafkaMessages;
begin
  Messages := Kafka.Poll(1000);
  try
    if Messages.Count > 0 then
      Kafka.CommitSync;
  finally
    Messages.Free;
  end;
end;
// uses: sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client
TsgcWSPClient_Kafka *Kafka = new TsgcWSPClient_Kafka(this);
Kafka->KafkaOptions->ClientId = "my-cbuilder-app";
Kafka->KafkaOptions->Producer->Acks = kafkaAcksLeader;
Kafka->KafkaOptions->Consumer->GroupId = "my-group";
Kafka->KafkaOptions->Consumer->OffsetReset = kafkaOffsetEarliest;
Kafka->OnKafkaMessage = KafkaMessage;
Kafka->OnKafkaProduce = KafkaProduce;

TsgcWebSocketClient *WSClient = new TsgcWebSocketClient(this);
Kafka->Client = WSClient;
WSClient->Specifications->RFC6455 = false;
WSClient->Host = "127.0.0.1";
WSClient->Port = 9092;
WSClient->Active = true;

// produce un record verso un topic
Kafka->Produce("my-topic", "Hello Kafka", "key-1");

// consuma: subscribe una volta, poi Poll ripetutamente
Kafka->Subscribe(ARRAYOFCONST(("my-topic")));

void __fastcall TForm1::KafkaMessage(TObject *Sender, TsgcKafkaMessage *Message)
{
  Memo1->Lines->Add(Message->GetKeyString() + " = " + Message->GetValueString());
}

// recupera i record, fai il commit quando un batch è elaborato
TsgcKafkaMessages *Messages = Kafka->Poll(1000);
try {
  if (Messages->Count > 0)
    Kafka->CommitSync();
} __finally {
  Messages->Free();
}
using esegece.sgcWebSockets;

var Kafka = new TsgcWSPClient_Kafka();
Kafka.KafkaOptions.ClientId = "my-net-app";
Kafka.KafkaOptions.Producer.Acks = TsgcKafkaAcks.kafkaAcksLeader;
Kafka.KafkaOptions.Consumer.GroupId = "my-group";
Kafka.KafkaOptions.Consumer.OffsetReset = TsgcKafkaOffsetReset.kafkaOffsetEarliest;
Kafka.OnKafkaMessage += (sender, message) =>
  Console.WriteLine(message.GetKeyString() + " = " + message.GetValueString());
Kafka.OnKafkaProduce += (sender, topic, partition, offset, error) =>
  Console.WriteLine($"Produced to {topic} [{partition}] at offset {offset}");

var WSClient = new TsgcWebSocketClient();
Kafka.Client = WSClient;
WSClient.Specifications.RFC6455 = false;
WSClient.Host = "127.0.0.1";
WSClient.Port = 9092;
WSClient.Active = true;

// produce un record verso un topic
Kafka.Produce("my-topic", "Hello Kafka", "key-1");

// consuma: subscribe una volta, poi Poll ripetutamente
Kafka.Subscribe(new string[] { "my-topic" });

TsgcKafkaMessages Messages = Kafka.Poll(1000);
try {
  if (Messages.Count > 0)
    Kafka.CommitSync();
} finally {
  Messages.Free();
}

Specifiche e riferimenti

Fonti autorevoli per i protocolli implementati da questo componente.

Documentazione e Demo

Collegamenti diretti al riferimento del componente, al progetto demo pronto all'uso e al download della versione di prova.

Guida online — Kafka Riferimento completo di proprietà, metodi ed eventi di questo componente.
Progetto demo — Demos\Protocols\Kafka Progetto di esempio pronto all'uso. Incluso nel pacchetto sgcWebSockets — scarica la versione di prova qui sotto.
Manuale utente (PDF) Manuale completo che copre ogni componente della libreria.

Pronto a fare streaming con Apache Kafka?

Scarica la versione di prova gratuita e inizia a costruire soluzioni di event streaming in pochi minuti.