Apache Kafka 클라이언트

Delphi, C++Builder 및 .NET용 네이티브 Apache Kafka 클라이언트예요. REST 프록시나 외부 라이브러리 없이 일반 TCP 위의 Kafka 와이어 프로토콜로 브로커와 직접 통신해요. 레코드를 생산·소비하고, 컨슈머 그룹을 조율하며, 오프셋을 관리하고, 토픽을 운영할 수 있어요.

Apache Kafka 서브프로토콜 클라이언트

Delphi / .NET 런타임이 실행되는 모든 곳에서 동작하는 일급 Kafka 구현이에요 — 데스크톱 서비스부터 모바일 기기까지, 표준 Kafka 브로커와 레코드를 주고받아요.

컴포넌트 클래스

TsgcWSPClient_Kafka

프로토콜

TCP 위의 Apache Kafka 와이어 프로토콜

플랫폼

Windows, macOS, Linux, iOS, Android

에디션

Standard / Professional / Enterprise

Kafka 생산, 소비, 운영

네이티브 Kafka 프로토콜로 레코드를 스트리밍하는 데 필요한 모든 기능을, 일반 Delphi / .NET 메서드와 이벤트로 제공해요.

생산(Produce)

Produce(topic, value, key, partition)를 호출하면 선택적 키와 파티션과 함께 레코드를 발행하고, 바이너리 페이로드는 ProduceBytes로 보낼 수 있어요. 전달 보장은 KafkaOptions.Producer.Acks(kafkaAcksNone, kafkaAcksLeader 또는 kafkaAcksAll)로 선택하며, 각 결과는 토픽, 파티션, 저장된 오프셋과 함께 OnKafkaProduce로 도착해요.

소비(Consume)

Subscribe([topic])Poll(timeoutMs)가 사용 가능한 레코드를 가져와 TsgcKafkaMessages 목록을 반환하고 레코드마다 OnKafkaMessage를 발생시켜요. 각 레코드는 GetKeyString, GetValueString, Topic, Partition, Offset으로 읽어요. GroupId를 비워 두면 그룹 없이 모든 파티션을 직접 읽어요.

컨슈머 그룹 조율

KafkaOptions.Consumer.GroupId를 설정하면 클라이언트가 코디네이터 검색, 조인 및 동기화, 파티션 할당, 오프셋 커밋/조회를 자동으로 수행해요. CommitSync를 호출하면 마지막 Poll의 오프셋을 커밋하여 다음 세션에서 그 이후부터 그룹이 재개돼요.

오프셋 관리

GetEarliestOffset, GetLatestOffset, GetCommittedOffset은 토픽과 파티션별로 가장 오래된 오프셋, 다음에 기록될 오프셋, 마지막으로 커밋된 오프셋을 반환해요. CommitOffset(topic, partition, offset)은 명시적 위치를 지정하여 원하는 지점부터 정확히 소비를 재개해요.

토픽 운영

CreateTopic(name, partitions, replication)DeleteTopic(name)으로 브로커의 토픽을 관리하고, GetMetadata([topics])는 클러스터와 파티션 레이아웃을 반환해요(전체 클러스터를 보려면 빈 배열을 전달하세요). 클라이언트는 컨슈머 그룹 목록도 조회할 수 있어요.

v2 레코드 배치 포맷

Apache Kafka 0.11에서 도입된 v2 레코드 배치 포맷을 읽고 쓰며, 선택적으로 kafkaCompressionGzip 압축을 지원해요. 기본 브로커 포트 9092에서 Apache Kafka 3.x로 테스트했어요.

컴포넌트를 배치하고 몇 가지 속성만 설정하면 끝

TsgcWebSocketClient와 TsgcWSPClient_Kafka를 짝지어 일반 TCP를 위해 Specifications.RFC6455를 false로 설정하고, KafkaOptions를 구성한 다음 Produce와 Poll을 호출하세요.

uses
  sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client;

var
  WSClient: TsgcWebSocketClient;
  Kafka: TsgcWSPClient_Kafka;
begin
  Kafka := TsgcWSPClient_Kafka.Create(nil);
  Kafka.KafkaOptions.ClientId := 'my-delphi-app';
  Kafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
  Kafka.KafkaOptions.Consumer.GroupId := 'my-group';
  Kafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
  Kafka.OnKafkaMessage := KafkaMessage;
  Kafka.OnKafkaProduce := KafkaProduce;

  WSClient := TsgcWebSocketClient.Create(nil);
  Kafka.Client := WSClient;
  WSClient.Specifications.RFC6455 := False;
  WSClient.Host := '127.0.0.1';
  WSClient.Port := 9092;
  WSClient.Active := True;

  // 토픽에 레코드 생산
  Kafka.Produce('my-topic', 'Hello Kafka', 'key-1');

  // 소비: 한 번 구독한 뒤 반복해서 Poll(예: 타이머에서)
  Kafka.Subscribe(['my-topic']);
end;

procedure TForm1.KafkaMessage(Sender: TObject;
  const Message: TsgcKafkaMessage);
begin
  Memo1.Lines.Add(Message.GetKeyString + ' = ' + Message.GetValueString);
end;

procedure TForm1.KafkaProduce(Sender: TObject; const Topic: string;
  Partition: Integer; Offset: Int64; ErrorCode: Integer);
begin
  Memo1.Lines.Add(Format('Produced to %s [%d] at offset %d',
    [Topic, Partition, Offset]));
end;

// 레코드를 가져오고, 배치 처리가 끝나면 커밋
var
  Messages: TsgcKafkaMessages;
begin
  Messages := Kafka.Poll(1000);
  try
    if Messages.Count > 0 then
      Kafka.CommitSync;
  finally
    Messages.Free;
  end;
end;
// uses: sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client
TsgcWSPClient_Kafka *Kafka = new TsgcWSPClient_Kafka(this);
Kafka->KafkaOptions->ClientId = "my-cbuilder-app";
Kafka->KafkaOptions->Producer->Acks = kafkaAcksLeader;
Kafka->KafkaOptions->Consumer->GroupId = "my-group";
Kafka->KafkaOptions->Consumer->OffsetReset = kafkaOffsetEarliest;
Kafka->OnKafkaMessage = KafkaMessage;
Kafka->OnKafkaProduce = KafkaProduce;

TsgcWebSocketClient *WSClient = new TsgcWebSocketClient(this);
Kafka->Client = WSClient;
WSClient->Specifications->RFC6455 = false;
WSClient->Host = "127.0.0.1";
WSClient->Port = 9092;
WSClient->Active = true;

// 토픽에 레코드 생산
Kafka->Produce("my-topic", "Hello Kafka", "key-1");

// 소비: 한 번 구독한 뒤 반복해서 Poll
Kafka->Subscribe(ARRAYOFCONST(("my-topic")));

void __fastcall TForm1::KafkaMessage(TObject *Sender, TsgcKafkaMessage *Message)
{
  Memo1->Lines->Add(Message->GetKeyString() + " = " + Message->GetValueString());
}

// 레코드를 가져오고, 배치 처리가 끝나면 커밋
TsgcKafkaMessages *Messages = Kafka->Poll(1000);
try {
  if (Messages->Count > 0)
    Kafka->CommitSync();
} __finally {
  Messages->Free();
}
using esegece.sgcWebSockets;

var Kafka = new TsgcWSPClient_Kafka();
Kafka.KafkaOptions.ClientId = "my-net-app";
Kafka.KafkaOptions.Producer.Acks = TsgcKafkaAcks.kafkaAcksLeader;
Kafka.KafkaOptions.Consumer.GroupId = "my-group";
Kafka.KafkaOptions.Consumer.OffsetReset = TsgcKafkaOffsetReset.kafkaOffsetEarliest;
Kafka.OnKafkaMessage += (sender, message) =>
  Console.WriteLine(message.GetKeyString() + " = " + message.GetValueString());
Kafka.OnKafkaProduce += (sender, topic, partition, offset, error) =>
  Console.WriteLine($"Produced to {topic} [{partition}] at offset {offset}");

var WSClient = new TsgcWebSocketClient();
Kafka.Client = WSClient;
WSClient.Specifications.RFC6455 = false;
WSClient.Host = "127.0.0.1";
WSClient.Port = 9092;
WSClient.Active = true;

// 토픽에 레코드 생산
Kafka.Produce("my-topic", "Hello Kafka", "key-1");

// 소비: 한 번 구독한 뒤 반복해서 Poll
Kafka.Subscribe(new string[] { "my-topic" });

TsgcKafkaMessages Messages = Kafka.Poll(1000);
try {
  if (Messages.Count > 0)
    Kafka.CommitSync();
} finally {
  Messages.Free();
}

사양 & 참고 자료

이 컴포넌트가 구현하는 프로토콜의 공인된 원문 자료예요.

문서 & 데모

컴포넌트 레퍼런스로 바로 이동하고, 바로 실행 가능한 데모 프로젝트를 받고, 평가판을 다운로드하세요.

온라인 도움말 — Kafka 이 컴포넌트의 전체 속성, 메서드, 이벤트 레퍼런스예요.
데모 프로젝트 — Demos\Protocols\Kafka 바로 실행 가능한 예제 프로젝트예요. sgcWebSockets 패키지에 포함돼 있어요 — 아래에서 평가판을 다운로드하세요.
사용자 매뉴얼 (PDF) 라이브러리의 모든 컴포넌트를 다루는 종합 매뉴얼이에요.

Apache Kafka로 스트리밍할 준비가 되셨나요?

무료 평가판을 다운로드하고 몇 분 만에 이벤트 스트리밍 솔루션 구축을 시작하세요.