Delphi / .NET 런타임이 실행되는 모든 곳에서 동작하는 일급 Kafka 구현이에요 — 데스크톱 서비스부터 모바일 기기까지, 표준 Kafka 브로커와 레코드를 주고받아요.
TsgcWSPClient_Kafka
TCP 위의 Apache Kafka 와이어 프로토콜
Windows, macOS, Linux, iOS, Android
Standard / Professional / Enterprise
네이티브 Kafka 프로토콜로 레코드를 스트리밍하는 데 필요한 모든 기능을, 일반 Delphi / .NET 메서드와 이벤트로 제공해요.
Produce(topic, value, key, partition)를 호출하면 선택적 키와 파티션과 함께 레코드를 발행하고, 바이너리 페이로드는 ProduceBytes로 보낼 수 있어요. 전달 보장은 KafkaOptions.Producer.Acks(kafkaAcksNone, kafkaAcksLeader 또는 kafkaAcksAll)로 선택하며, 각 결과는 토픽, 파티션, 저장된 오프셋과 함께 OnKafkaProduce로 도착해요.
Subscribe([topic]) 후 Poll(timeoutMs)가 사용 가능한 레코드를 가져와 TsgcKafkaMessages 목록을 반환하고 레코드마다 OnKafkaMessage를 발생시켜요. 각 레코드는 GetKeyString, GetValueString, Topic, Partition, Offset으로 읽어요. GroupId를 비워 두면 그룹 없이 모든 파티션을 직접 읽어요.
KafkaOptions.Consumer.GroupId를 설정하면 클라이언트가 코디네이터 검색, 조인 및 동기화, 파티션 할당, 오프셋 커밋/조회를 자동으로 수행해요. CommitSync를 호출하면 마지막 Poll의 오프셋을 커밋하여 다음 세션에서 그 이후부터 그룹이 재개돼요.
GetEarliestOffset, GetLatestOffset, GetCommittedOffset은 토픽과 파티션별로 가장 오래된 오프셋, 다음에 기록될 오프셋, 마지막으로 커밋된 오프셋을 반환해요. CommitOffset(topic, partition, offset)은 명시적 위치를 지정하여 원하는 지점부터 정확히 소비를 재개해요.
CreateTopic(name, partitions, replication)과 DeleteTopic(name)으로 브로커의 토픽을 관리하고, GetMetadata([topics])는 클러스터와 파티션 레이아웃을 반환해요(전체 클러스터를 보려면 빈 배열을 전달하세요). 클라이언트는 컨슈머 그룹 목록도 조회할 수 있어요.
Apache Kafka 0.11에서 도입된 v2 레코드 배치 포맷을 읽고 쓰며, 선택적으로 kafkaCompressionGzip 압축을 지원해요. 기본 브로커 포트 9092에서 Apache Kafka 3.x로 테스트했어요.
TsgcWebSocketClient와 TsgcWSPClient_Kafka를 짝지어 일반 TCP를 위해 Specifications.RFC6455를 false로 설정하고, KafkaOptions를 구성한 다음 Produce와 Poll을 호출하세요.
uses
sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client;
var
WSClient: TsgcWebSocketClient;
Kafka: TsgcWSPClient_Kafka;
begin
Kafka := TsgcWSPClient_Kafka.Create(nil);
Kafka.KafkaOptions.ClientId := 'my-delphi-app';
Kafka.KafkaOptions.Producer.Acks := kafkaAcksLeader;
Kafka.KafkaOptions.Consumer.GroupId := 'my-group';
Kafka.KafkaOptions.Consumer.OffsetReset := kafkaOffsetEarliest;
Kafka.OnKafkaMessage := KafkaMessage;
Kafka.OnKafkaProduce := KafkaProduce;
WSClient := TsgcWebSocketClient.Create(nil);
Kafka.Client := WSClient;
WSClient.Specifications.RFC6455 := False;
WSClient.Host := '127.0.0.1';
WSClient.Port := 9092;
WSClient.Active := True;
// 토픽에 레코드 생산
Kafka.Produce('my-topic', 'Hello Kafka', 'key-1');
// 소비: 한 번 구독한 뒤 반복해서 Poll(예: 타이머에서)
Kafka.Subscribe(['my-topic']);
end;
procedure TForm1.KafkaMessage(Sender: TObject;
const Message: TsgcKafkaMessage);
begin
Memo1.Lines.Add(Message.GetKeyString + ' = ' + Message.GetValueString);
end;
procedure TForm1.KafkaProduce(Sender: TObject; const Topic: string;
Partition: Integer; Offset: Int64; ErrorCode: Integer);
begin
Memo1.Lines.Add(Format('Produced to %s [%d] at offset %d',
[Topic, Partition, Offset]));
end;
// 레코드를 가져오고, 배치 처리가 끝나면 커밋
var
Messages: TsgcKafkaMessages;
begin
Messages := Kafka.Poll(1000);
try
if Messages.Count > 0 then
Kafka.CommitSync;
finally
Messages.Free;
end;
end;
// uses: sgcWebSocket, sgcWebSocket_Protocol_Kafka_Client
TsgcWSPClient_Kafka *Kafka = new TsgcWSPClient_Kafka(this);
Kafka->KafkaOptions->ClientId = "my-cbuilder-app";
Kafka->KafkaOptions->Producer->Acks = kafkaAcksLeader;
Kafka->KafkaOptions->Consumer->GroupId = "my-group";
Kafka->KafkaOptions->Consumer->OffsetReset = kafkaOffsetEarliest;
Kafka->OnKafkaMessage = KafkaMessage;
Kafka->OnKafkaProduce = KafkaProduce;
TsgcWebSocketClient *WSClient = new TsgcWebSocketClient(this);
Kafka->Client = WSClient;
WSClient->Specifications->RFC6455 = false;
WSClient->Host = "127.0.0.1";
WSClient->Port = 9092;
WSClient->Active = true;
// 토픽에 레코드 생산
Kafka->Produce("my-topic", "Hello Kafka", "key-1");
// 소비: 한 번 구독한 뒤 반복해서 Poll
Kafka->Subscribe(ARRAYOFCONST(("my-topic")));
void __fastcall TForm1::KafkaMessage(TObject *Sender, TsgcKafkaMessage *Message)
{
Memo1->Lines->Add(Message->GetKeyString() + " = " + Message->GetValueString());
}
// 레코드를 가져오고, 배치 처리가 끝나면 커밋
TsgcKafkaMessages *Messages = Kafka->Poll(1000);
try {
if (Messages->Count > 0)
Kafka->CommitSync();
} __finally {
Messages->Free();
}
using esegece.sgcWebSockets;
var Kafka = new TsgcWSPClient_Kafka();
Kafka.KafkaOptions.ClientId = "my-net-app";
Kafka.KafkaOptions.Producer.Acks = TsgcKafkaAcks.kafkaAcksLeader;
Kafka.KafkaOptions.Consumer.GroupId = "my-group";
Kafka.KafkaOptions.Consumer.OffsetReset = TsgcKafkaOffsetReset.kafkaOffsetEarliest;
Kafka.OnKafkaMessage += (sender, message) =>
Console.WriteLine(message.GetKeyString() + " = " + message.GetValueString());
Kafka.OnKafkaProduce += (sender, topic, partition, offset, error) =>
Console.WriteLine($"Produced to {topic} [{partition}] at offset {offset}");
var WSClient = new TsgcWebSocketClient();
Kafka.Client = WSClient;
WSClient.Specifications.RFC6455 = false;
WSClient.Host = "127.0.0.1";
WSClient.Port = 9092;
WSClient.Active = true;
// 토픽에 레코드 생산
Kafka.Produce("my-topic", "Hello Kafka", "key-1");
// 소비: 한 번 구독한 뒤 반복해서 Poll
Kafka.Subscribe(new string[] { "my-topic" });
TsgcKafkaMessages Messages = Kafka.Poll(1000);
try {
if (Messages.Count > 0)
Kafka.CommitSync();
} finally {
Messages.Free();
}
이 컴포넌트가 구현하는 프로토콜의 공인된 원문 자료예요.