Apache Kafka İstemcisi

Delphi, C++Builder ve .NET için yerel Apache Kafka istemcisi. Düz TCP üzerinde Kafka tel protokolü aracılığıyla broker ile doğrudan iletişim kurun, REST proxy veya harici kütüphane olmadan: kayıtları üretin ve tüketin, tüketici gruplarını koordine edin, offset'leri yönetin ve topic'leri yönetin.

Apache Kafka alt protokol istemcisi

Delphi / .NET çalışma zamanının çalıştığı her yerde çalışan birinci sınıf bir Kafka uygulaması — masaüstü servislerinden mobil cihazlara, kayıtları herhangi bir standart Kafka broker'ına ve ondan akıtır.

Bileşen sınıfı

TsgcWSPClient_Kafka

Protokol

TCP üzerinden Apache Kafka tel protokolü

Platformlar

Windows, macOS, Linux, iOS, Android

Sürüm

Standard / Professional / Enterprise

Kafka üretin, tüketin ve yönetin

Yerel Kafka protokolü üzerinden kayıt akıtmak için ihtiyacınız olan her şey, düz Delphi / .NET metotları ve olayları aracılığıyla sunulur.

Üret

İsteğe bağlı bir key ve partition ile bir kayıt yayınlamak için Produce(topic, value, key, partition) çağırın veya ikili yük için ProduceBytes kullanın. İletim garantisini KafkaOptions.Producer.Acks (kafkaAcksNone, kafkaAcksLeader veya kafkaAcksAll) aracılığıyla seçin; her sonuç, topic, partition ve saklanan offset ile OnKafkaProduce içine ulaşır.

Tüket

Subscribe([topic]), ardından Poll(timeoutMs) mevcut kayıtları getirir, bir TsgcKafkaMessages listesi döndürür ve kayıt başına OnKafkaMessage tetikler. Her birini GetKeyString, GetValueString, Topic, Partition ve Offset ile okuyun. Tüm partition'ları grup olmadan doğrudan okumak için GroupId'yi boş bırakın.

Tüketici grubu koordinasyonu

KafkaOptions.Consumer.GroupId ayarlayın; istemci koordinatör keşfini, katılma ve senkronizasyonu, partition atamasını ve offset commit/fetch işlemini otomatik olarak gerçekleştirir. Son Poll'un offset'lerini commit etmek için CommitSync çağırın; böylece grup bir sonraki oturumda bunlardan sonra devam eder.

Offset yönetimi

GetEarliestOffset, GetLatestOffset ve GetCommittedOffset, topic ve partition başına en eski, yazılacak sonraki ve son commit edilen offset'leri döndürür. CommitOffset(topic, partition, offset) açık bir konum belirler; böylece tüketim tam olarak istediğiniz yerden devam eder.

Topic yönetimi

CreateTopic(name, partitions, replication) ve DeleteTopic(name) broker üzerindeki topic'leri yönetir; GetMetadata([topics]) ise küme ve partition düzenini döndürür (tüm küme için boş bir dizi geçirin). İstemci ayrıca tüketici gruplarını listeleyebilir.

v2 record batch biçimi

Apache Kafka 0.11'de tanıtılan v2 record batch biçimini, isteğe bağlı kafkaCompressionGzip sıkıştırmasıyla okur ve yazar. Varsayılan broker portu 9092'de Apache Kafka 3.x'e karşı test edilmiştir.

Bileşeni bırakın, birkaç özellik ayarlayın, başlayın

Bir TsgcWebSocketClient ile TsgcWSPClient_Kafka'yı eşleştirin, düz TCP için Specifications.RFC6455'i false yapın, KafkaOptions'ı yapılandırın, ardından Produce ve Poll kullanın.

uses
  sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client;

var
  WSClient: TsgcWebSocketClient;
  Kafka: TsgcWSPClient_Kafka;
begin
  Kafka := TsgcWSPClient_Kafka.Create(nil);
  Kafka.KafkaOptions.ClientId := 'my-delphi-app';
  Kafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
  Kafka.KafkaOptions.Consumer.GroupId := 'my-group';
  Kafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
  Kafka.OnKafkaMessage := KafkaMessage;
  Kafka.OnKafkaProduce := KafkaProduce;

  WSClient := TsgcWebSocketClient.Create(nil);
  Kafka.Client := WSClient;
  WSClient.Specifications.RFC6455 := False;
  WSClient.Host := '127.0.0.1';
  WSClient.Port := 9092;
  WSClient.Active := True;

  // bir topic'e bir kayıt üret
  Kafka.Produce('my-topic', 'Hello Kafka', 'key-1');

  // tüket: bir kez subscribe et, ardından tekrar tekrar Poll çağır (örn. bir zamanlayıcıdan)
  Kafka.Subscribe(['my-topic']);
end;

procedure TForm1.KafkaMessage(Sender: TObject;
  const Message: TsgcKafkaMessage);
begin
  Memo1.Lines.Add(Message.GetKeyString + ' = ' + Message.GetValueString);
end;

procedure TForm1.KafkaProduce(Sender: TObject; const Topic: string;
  Partition: Integer; Offset: Int64; ErrorCode: Integer);
begin
  Memo1.Lines.Add(Format('Produced to %s [%d] at offset %d',
    [Topic, Partition, Offset]));
end;

// kayıtları getir, bir parti işlendiğinde commit et
var
  Messages: TsgcKafkaMessages;
begin
  Messages := Kafka.Poll(1000);
  try
    if Messages.Count > 0 then
      Kafka.CommitSync;
  finally
    Messages.Free;
  end;
end;
// uses: sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client
TsgcWSPClient_Kafka *Kafka = new TsgcWSPClient_Kafka(this);
Kafka->KafkaOptions->ClientId = "my-cbuilder-app";
Kafka->KafkaOptions->Producer->Acks = kafkaAcksLeader;
Kafka->KafkaOptions->Consumer->GroupId = "my-group";
Kafka->KafkaOptions->Consumer->OffsetReset = kafkaOffsetEarliest;
Kafka->OnKafkaMessage = KafkaMessage;
Kafka->OnKafkaProduce = KafkaProduce;

TsgcWebSocketClient *WSClient = new TsgcWebSocketClient(this);
Kafka->Client = WSClient;
WSClient->Specifications->RFC6455 = false;
WSClient->Host = "127.0.0.1";
WSClient->Port = 9092;
WSClient->Active = true;

// bir topic'e bir kayıt üret
Kafka->Produce("my-topic", "Hello Kafka", "key-1");

// tüket: bir kez subscribe et, ardından tekrar tekrar Poll çağır
Kafka->Subscribe(ARRAYOFCONST(("my-topic")));

void __fastcall TForm1::KafkaMessage(TObject *Sender, TsgcKafkaMessage *Message)
{
  Memo1->Lines->Add(Message->GetKeyString() + " = " + Message->GetValueString());
}

// kayıtları getir, bir parti işlendiğinde commit et
TsgcKafkaMessages *Messages = Kafka->Poll(1000);
try {
  if (Messages->Count > 0)
    Kafka->CommitSync();
} __finally {
  Messages->Free();
}
using esegece.sgcWebSockets;

var Kafka = new TsgcWSPClient_Kafka();
Kafka.KafkaOptions.ClientId = "my-net-app";
Kafka.KafkaOptions.Producer.Acks = TsgcKafkaAcks.kafkaAcksLeader;
Kafka.KafkaOptions.Consumer.GroupId = "my-group";
Kafka.KafkaOptions.Consumer.OffsetReset = TsgcKafkaOffsetReset.kafkaOffsetEarliest;
Kafka.OnKafkaMessage += (sender, message) =>
  Console.WriteLine(message.GetKeyString() + " = " + message.GetValueString());
Kafka.OnKafkaProduce += (sender, topic, partition, offset, error) =>
  Console.WriteLine($"Produced to {topic} [{partition}] at offset {offset}");

var WSClient = new TsgcWebSocketClient();
Kafka.Client = WSClient;
WSClient.Specifications.RFC6455 = false;
WSClient.Host = "127.0.0.1";
WSClient.Port = 9092;
WSClient.Active = true;

// bir topic'e bir kayıt üret
Kafka.Produce("my-topic", "Hello Kafka", "key-1");

// tüket: bir kez subscribe et, ardından tekrar tekrar Poll çağır
Kafka.Subscribe(new string[] { "my-topic" });

TsgcKafkaMessages Messages = Kafka.Poll(1000);
try {
  if (Messages.Count > 0)
    Kafka.CommitSync();
} finally {
  Messages.Free();
}

Spesifikasyonlar ve referanslar

Bu bileşenin uyguladığı protokoller için yetkili kaynaklar.

Belgeler ve Demolar

Bileşen referansına doğrudan bağlanın, çalışmaya hazır demo projesini alın ve denemeyi indirin.

Çevrimiçi Yardım — Kafka Bu bileşen için tam özellik, metot ve olay referansı.
Demo Projesi — Demos\Protocols\Kafka Çalışmaya hazır örnek proje. sgcWebSockets paketinin içinde gelir — denemeyi aşağıdan indirin.
Kullanıcı Kılavuzu (PDF) Kütüphanedeki her bileşeni kapsayan kapsamlı kılavuz.

Apache Kafka ile Akıtmaya Hazır mısınız?

Ücretsiz denemeyi indirin ve dakikalar içinde olay akışı çözümleri oluşturmaya başlayın.