기억하기 프로젝트
[Kafka] Producer 에 대한 정리 본문
요약
- 카프카에 메시지를 전송하는 프로듀서
- 동기식, 비동기식 프로듀서
- 프로듀서 구성 매개변수
- 직렬처리기 - Avro
- 파티션에 메시지를 쓰는 원리
카프카 프로듀서란
- 메시지를 생산(프로듀스) 해서 카프카에 토픽으로 메시지를 보내는 애플리케이션, 서버 등을 프로듀서라고 한다.
카프카 프로듀서 구성에 필요한 필수 속성 세가지
메시지를 담은 ProducerRecord 객체를 생성했고, kafka broker로 보내려 한다. 이때 꼭 필요한 세 가지 속성
- bootstrap.servers
- broker 목록(host:port)
- 최소한 두 개의 브로커를 포함하는게 좋다 (장애대비)
- key.serializer
- 메시지 키를 직렬화 하기 위한 클래스 이름을 설정
- kafka client package에는 ByteArraySerializer, StringSerializer, IntegerSerializer 기본 제공
- value.serializer
- 메시지 값을 직렬화 하기 위한 클래스 이름을 설정
- key serializer와 마찬가지로, 기본 제공해주는 Serializer
Properties kafkaProperties = new Properties(); // 속성과 값을 갖는 properties 객체 생성
kafkaProperties.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProperties.put("key.serializer", StringSerializer.class);
kafkaProperties.put("value.serializer", StringSerializer.class);
Producer<String, String> producer = new KafkaProducer<String, String>(kafkaProperties); // 새로운 프로듀서 객체를 생성. 이때 메시지 키와 값의 타입을 설정하고 Properties 객체를 생성자 인자로 전달
카프카에 메시지 전송하기
메시지 전송 방식 세가지
- Fire-and-forget(전송 후 망각)
- send() 메시지로 전송만 하고, 성공 또는 실패에 따른 후속 조치를 취하지 않음
-
일부 메시지 유실 가능성 있음
<< 아래 1-a 예시로 참고 >>
- Syncronous send(동기식 전송)
- 메시지 전송 후 자바의 Future 객체 반환됨
- .get() 하면 작업이 완료될 때 까지 기다림
-
브로커로부터 처리 결과가 반환 되므로 성공 여부를 알 수 있음
-
에러없이 전송될 경우, RecordMetadata 객체를 받게 되고, 카프카에 쓴 메시지의 topic, offset.. 정보를 알아낼 수 있음
<< 아래 2-a 예시로 참고 >>
* RecordMetadata 참고 : https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/RecordMetadata.html
- Asyncronous send(비동기식 전송)
- 카프카 브로커의 응답을 기다리지 않고 처리
- 메시지 전송 후 콜백 메서드를 통해 성공 실패 여부 확인 가능
-
메시지 전송에 완전 실패했을 경우, 예외 발생시키거나, 에러를 로그에 쓰거나, 향후에 분석하기 위해 에러 파일에 메시지를 쓸 수 있음.
<< 아래 3-a 예시로 참고>>
/* 1.a Fire-and-forget(전송 후 망각) 예시 */
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); // 데이터를 저장할 토픽 이름, 카프카로 전송할 키와 값을 인자로 받는 생성자를 사용한다.
try {
producer.send(record);
} catch (Exception e) { // 카프카에게 메시지를 전송하기 전에 프로듀서에서 에러가 생기면 예외 발생
e.printStackTrace(); // SerializationException(메시지 직렬화 처리 실패), BufferExhaustedException TimeoutException(버퍼가 가득차면), InterruptException(메시지 전송하는 스레드 중단)
}
/* 2.a Syncronous send(동기식 전송) 예시 */
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) { // 재시도 가능한(retriable) 에러 - connection 에러, no leader 에러 : 자동으로 재시도 하다가, 재시도 횟수 소진 or 해결되지 않았을 경우 예외 반환
e.printStackTrace(); // 재시도로 해결되지 않는 에러 - 메시지 크기가 너무 클 때.. : 재시도 하지 않고 즉시 예외 반환
}
/* 3.a Asyncronous send(비동기식 전송) 예시 */
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) { // 카프카가 예외를 반환한다면 이곳에서 예외를 받게 된다.
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback()); // send() 호출하여 메시지 전송시 콜백 객체를 인자로 전달한다.
프로듀서 구성하기
- 대부분의 구성 매개변수는 기본값을 갖고 있지만, 일부 매개변수는 메모리 사용, 성능, 신뢰성 등에 큰 영향을 준다.
- producer 설정 참고 : https://docs.confluent.io/current/installation/configuration/producer-configs.html
acks
- 전송된 ProducerRecord를 수신하는 파티션 리플리카(복제 서버로 동작하는 브로커)의 수를 제어
- 프로듀서가 서버에 메시지를 보낸 후 요청을 완료하기 전 승인의 수
- 메시지 유실 가능성에 큰 영향을 주며, 다음 세 가지로 설정 가능
- acks = 0 : 프로듀서는 브로커의 응답을 기다리지 않는다
- acks = 1 : 리더는 데이터를 기록, 팔로워는 신경쓰지 않음
- acks = all : 무손실, 동기화된 모든 리플리카가 메시지를 받으면 프로듀서가 브로커의 성공 응답을 받음
buffer.memory
- 브로커에게 전송될 메시지의 버퍼로 사용할 메모리 양(byte)
compression.type
- 기본적인 메시지는 압축되지 않은 상태로 전송되지만, 이 값을 설정하면 압축되어 전송 - snappy, gzip, lz4
retries
- 최대 재전송 회수. (default=2147483647) retry.backoff.ms(100ms)로 재전송간에 시간을 조정할 수 있음.
batch.size
- 같은 파티션에 쓰는 다수의 레코드를 배치 단위로 관리하는데, 이 배치에 사용될 메모리양을 말함.(byte)
- 너무 작게 설정할 경우, 프로듀서가 자주 메시지를 전송해야 하므로 성능 저하 가능성이 있음.
linger.ms
- 현재의 배치를 전송하기 전까지 기다리는 시간(default=0)
- batch.size 가 가득 차서 전송이 되거나, linger.ms 시간이 다 되어 전송이 되거나.
client.id
- 어떤 클라이언트에서 전송된 메시지인지 식별하기 위해 브로커가 사용
max.in.flight.requests.per.connection
- blocking 되기 전까지 응답이 오지 않은 메시지들을 몇 개까지 허용할 것인지. 이값을 1로 설정하면 메시지의 전송 순서대로 브로커가 쓰게 된다. (default = 5)
timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms
- 데이터 전송시(request.timeout.ms),
메타데이터 요청할 때(metadata.fetch.timeout.ms) - 프로듀서가 서버의 응답을 기다리는 제한 시간
timeout.ms는 동기화된 리플리카들이 메시지를 인지하는동안 브로커가 대기하는 시간acks 매개변수의 설정에 따라 달라진다
- https://kafka.apache.org/documentation/#upgrade_1100_notable
- 만약 서버의 응답 없이 제한 시간이 경과되면 프로듀서는 재전송을 하거나 예외, 콜백을 전달하여 에러에 응답한다.
max.block.ms
- send() 메서드 호출시, 프로듀서의 전송 버퍼가 가득 차거나 메타데이터를 요청했지만 사용할 수 없을 때 이 시간동안 일시 중단됨
- 그 다음에 max.block.ms의 시간이 되면 시간 경과 예외가 발생
max.request.size
- 전송될 수 있는 가장 큰 메시지의 크기, 프로듀서가 하나의 요청으로 전송할 수 있는 메시지의 최대 개수 모두를 이 매개변수로 제한함
- 브로커에 message.max.bytes 메시지의 최대 크기 제한값과 max.request.size의 값이 일치되도록 설정하는 것이 좋음
- 브로커가 거부하는 크기의 메시지를 프로듀서가 전송하지 않을 것이기 때문
receive.buffer.bytes와 send.buffer.bytes
- TCP 소켓이 사용하는 (송수신) 버퍼 크기
- -1로 사용하면 운영체제 기본값 사용
- 프로듀서, 컨슈머가 서로 다른 데이터센터의 브로커들과 통신할 때 이 값을 증가시키는 것이 좋음
직렬처리기
- 프로듀서의 필수 구성에는 직렬처리기가 포함됨
- ByteArraySerializer, StringSerializer, IntegerSerializer 기본 제공
- 커스텀 직렬처리기
- 취약점 - 타입 변경시 기존 메시지와 새로운 메시지 간의 호환성을 유지하는데 문제가 생길 가능성
- 범용 직렬처리기&역직렬처리기 사용을 권장
- 스키마 레지스트리
- 스키마를 관리하는 중앙 저장소 역할
- RESFTful 인터페이스를 사용해서 스키마를 관리하거나 조회하는 기능 담당
- 내부적으로 아파치 Avro를 사용
파티션
- ProducerRecord 객체에서의 키 목적
- 메시지를 식별하는 추가 정보를 갖는 것
- 메시지를 쓰는 토픽의 여러 파티션 중 하나를 결정하기 위해
- 같은 키를 갖는 모든 메시지는 같은 파티션에 저장됨
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");
// 키가 없는 메시지
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "USA"); // 이 경우 키는 null로 저장
- 키가 null 이고, 카프카 기본 파티셔너 사용시
- 사용 가능한 토픽의 파티션들 중 하나가 무작위로 선택되어 해당 레코드가 저장됨
- RR 알고리즘을 사용하여, 각 파티션에 저장되는 메시지 개수의 균형을 맞춤
- 키가 있으면서, 카프카 기본 파티셔너 사용시
- 키의 해시 값을 구하고 그 값에 따라 특정 파티션에 메시지를 저장
- 선택된 파티션에 데이터를 쓸 수 없다면 에러가 발생 하겠지만, 거의 발생하지 않음.
- 같은 키와 대응되는 파티션이 변경되지 않게 하려면, 충분한 수의 파티션을 갖는 토픽을 생성하고 이후로는 파티션을 추가하지 않는 것이 가장 쉬운 방법임.
- 커스텀 파티셔너 구현