관리 메뉴

기억하기 프로젝트

[Kafka] Producer 에 대한 정리 본문

카테고리 없음

[Kafka] Producer 에 대한 정리

sy89 2020. 5. 5. 14:50

요약

  • 카프카에 메시지를 전송하는 프로듀서
  • 동기식, 비동기식 프로듀서
  • 프로듀서 구성 매개변수
  • 직렬처리기 - Avro
  • 파티션에 메시지를 쓰는 원리

 

카프카 프로듀서란

  • 메시지를 생산(프로듀스) 해서 카프카에 토픽으로 메시지를 보내는 애플리케이션, 서버 등을 프로듀서라고 한다.

ProducerRecord 생성 → 데이터 직렬화 → 파티셔너로 메시지를 적절히 배정 → 카프카 브로커로 전송

 

카프카 프로듀서 구성에 필요한 필수 속성 세가지

메시지를 담은 ProducerRecord 객체를 생성했고, kafka broker로 보내려 한다. 이때 꼭 필요한 세 가지 속성

  • bootstrap.servers 
    • broker 목록(host:port)
    • 최소한 두 개의 브로커를 포함하는게 좋다 (장애대비)
  • key.serializer
    • 메시지 를 직렬화 하기 위한 클래스 이름을 설정
    • kafka client package에는 ByteArraySerializer, StringSerializer, IntegerSerializer 기본 제공
  • value.serializer
    • 메시지 을 직렬화 하기 위한 클래스 이름을 설정
    • key serializer와 마찬가지로, 기본 제공해주는 Serializer
Properties kafkaProperties = new Properties();		// 속성과 값을 갖는 properties 객체 생성
kafkaProperties.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProperties.put("key.serializer", StringSerializer.class);
kafkaProperties.put("value.serializer", StringSerializer.class);

Producer<String, String> producer = new KafkaProducer<String, String>(kafkaProperties);		// 새로운 프로듀서 객체를 생성. 이때 메시지 키와 값의 타입을 설정하고 Properties 객체를 생성자 인자로 전달

 

카프카에 메시지 전송하기 

메시지 전송 방식 세가지

    1. Fire-and-forget(전송 후 망각)
      1. send() 메시지로 전송만 하고, 성공 또는 실패에 따른 후속 조치를 취하지 않음
      2. 일부 메시지 유실 가능성 있음

        << 아래 1-a 예시로 참고 >>

    2. Syncronous send(동기식 전송)
      1. 메시지 전송 후 자바의 Future 객체 반환됨
      2. .get() 하면 작업이 완료될 때 까지 기다림
      3. 브로커로부터 처리 결과가 반환 되므로 성공 여부를 알 수 있음

      4. 에러없이 전송될 경우, RecordMetadata 객체를 받게 되고, 카프카에 쓴 메시지의 topic, offset.. 정보를 알아낼 수 있음

        << 아래 2-a 예시로 참고 >>

        * RecordMetadata 참고 : https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/RecordMetadata.html

         

    3. Asyncronous send(비동기식 전송)
      1. 카프카 브로커의 응답을 기다리지 않고 처리
      2. 메시지 전송 후 콜백 메서드를 통해 성공 실패 여부 확인 가능
      3. 메시지 전송에 완전 실패했을 경우, 예외 발생시키거나, 에러를 로그에 쓰거나, 향후에 분석하기 위해 에러 파일에 메시지를 쓸 수 있음.

        << 아래 3-a 예시로 참고>>

 

/* 1.a Fire-and-forget(전송 후 망각) 예시 */
ProducerRecord<String, String> record = 
		new ProducerRecord<>("CustomerCountry", "Precision Products", "France");	// 데이터를 저장할 토픽 이름, 카프카로 전송할 키와 값을 인자로 받는 생성자를 사용한다.
try {
	producer.send(record);
} catch (Exception e) {		// 카프카에게 메시지를 전송하기 전에 프로듀서에서 에러가 생기면 예외 발생
	e.printStackTrace();	// SerializationException(메시지 직렬화 처리 실패), BufferExhaustedException TimeoutException(버퍼가 가득차면), InterruptException(메시지 전송하는 스레드 중단)
}
/* 2.a Syncronous send(동기식 전송) 예시 */
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
	producer.send(record).get();	
} catch (Exception e) {		// 재시도 가능한(retriable) 에러 - connection 에러, no leader 에러 : 자동으로 재시도 하다가, 재시도 횟수 소진 or 해결되지 않았을 경우 예외 반환
	e.printStackTrace();	// 재시도로 해결되지 않는 에러 - 메시지 크기가 너무 클 때.. : 재시도 하지 않고 즉시 예외 반환
}
/* 3.a Asyncronous send(비동기식 전송) 예시 */
private class DemoProducerCallback implements Callback {
	@Override
	public void onCompletion(RecordMetadata recordMetadata, Exception e) {	// 카프카가 예외를 반환한다면 이곳에서 예외를 받게 된다.
		if (e != null) {
			e.printStackTrace();	
		}
	}
}

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());	// send() 호출하여 메시지 전송시 콜백 객체를 인자로 전달한다.

 

프로듀서 구성하기

acks

  • 전송된 ProducerRecord를 수신하는 파티션 리플리카(복제 서버로 동작하는 브로커)의 수를 제어
  • 프로듀서가 서버에 메시지를 보낸 후 요청을 완료하기 전 승인의 수
  • 메시지 유실 가능성에 큰 영향을 주며, 다음 세 가지로 설정 가능 
    • acks = 0 : 프로듀서는 브로커의 응답을 기다리지 않는다
    • acks = 1 : 리더는 데이터를 기록, 팔로워는 신경쓰지 않음
    • acks = all : 무손실, 동기화된 모든 리플리카가 메시지를 받으면 프로듀서가 브로커의 성공 응답을 받음

buffer.memory

  • 브로커에게 전송될 메시지의 버퍼로 사용할 메모리 양(byte)

compression.type

  • 기본적인 메시지는 압축되지 않은 상태로 전송되지만, 이 값을 설정하면 압축되어 전송 - snappy, gzip, lz4

retries

  • 최대 재전송 회수. (default=2147483647) retry.backoff.ms(100ms)로 재전송간에 시간을 조정할 수 있음.

batch.size

  • 같은 파티션에 쓰는 다수의 레코드를 배치 단위로 관리하는데, 이 배치에 사용될 메모리양을 말함.(byte)
  • 너무 작게 설정할 경우, 프로듀서가 자주 메시지를 전송해야 하므로 성능 저하 가능성이 있음.

linger.ms

  • 현재의 배치를 전송하기 전까지 기다리는 시간(default=0)
  • batch.size 가 가득 차서 전송이 되거나, linger.ms 시간이 다 되어 전송이 되거나.

client.id

  • 어떤 클라이언트에서 전송된 메시지인지 식별하기 위해 브로커가 사용

max.in.flight.requests.per.connection

  • blocking 되기 전까지 응답이 오지 않은 메시지들을 몇 개까지 허용할 것인지. 이값을 1로 설정하면 메시지의 전송 순서대로 브로커가 쓰게 된다. (default = 5)

timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms

  • 데이터 전송시(request.timeout.ms), 메타데이터 요청할 때(metadata.fetch.timeout.ms)
  • 프로듀서가 서버의 응답을 기다리는 제한 시간
  • timeout.ms는 동기화된 리플리카들이 메시지를 인지하는동안 브로커가 대기하는 시간
    • acks 매개변수의 설정에 따라 달라진다
  • https://kafka.apache.org/documentation/#upgrade_1100_notable
  • 만약 서버의 응답 없이 제한 시간이 경과되면 프로듀서는 재전송을 하거나 예외, 콜백을 전달하여 에러에 응답한다. 

max.block.ms

  • send() 메서드 호출시, 프로듀서의 전송 버퍼가 가득 차거나 메타데이터를 요청했지만 사용할 수 없을 때 이 시간동안 일시 중단됨 
  • 그 다음에 max.block.ms의 시간이 되면 시간 경과 예외가 발생

max.request.size

  • 전송될 수 있는 가장 큰 메시지의 크기, 프로듀서가 하나의 요청으로 전송할 수 있는 메시지의 최대 개수 모두를 이 매개변수로 제한함
  • 브로커에 message.max.bytes 메시지의 최대 크기 제한값과 max.request.size의 값이 일치되도록 설정하는 것이 좋음
  • 브로커가 거부하는 크기의 메시지를 프로듀서가 전송하지 않을 것이기 때문

receive.buffer.bytes와 send.buffer.bytes

  • TCP 소켓이 사용하는 (송수신) 버퍼 크기
  • -1로 사용하면 운영체제 기본값 사용
  • 프로듀서, 컨슈머가 서로 다른 데이터센터의 브로커들과 통신할 때 이 값을 증가시키는 것이 좋음

 

 

직렬처리기

  • 프로듀서의 필수 구성에는 직렬처리기가 포함됨
  • ByteArraySerializer, StringSerializer, IntegerSerializer 기본 제공
  • 커스텀 직렬처리기 
    • 취약점 - 타입 변경시 기존 메시지와 새로운 메시지 간의 호환성을 유지하는데 문제가 생길 가능성
    • 범용 직렬처리기&역직렬처리기 사용을 권장

 

  • 스키마 레지스트리
    • 스키마를 관리하는 중앙 저장소 역할
    • RESFTful 인터페이스를 사용해서 스키마를 관리하거나 조회하는 기능 담당
    • 내부적으로 아파치 Avro를 사용

 

파티션

  • ProducerRecord 객체에서의 키 목적
    • 메시지를 식별하는 추가 정보를 갖는 것
    • 메시지를 쓰는 토픽의 여러 파티션 중 하나를 결정하기 위해
    • 같은 키를 갖는 모든 메시지는 같은 파티션에 저장됨
ProducerRecord<String, String> record = 
	new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");

// 키가 없는 메시지
ProducerRecord<String, String> record = 
	new ProducerRecord<>("CustomerCountry", "USA");	// 이 경우 키는 null로 저장

 

  • 키가 null 이고, 카프카 기본 파티셔너 사용시 
    • 사용 가능한 토픽의 파티션들 중 하나가 무작위로 선택되어 해당 레코드가 저장됨
    • RR 알고리즘을 사용하여, 각 파티션에 저장되는 메시지 개수의 균형을 맞춤
  • 키가 있으면서, 카프카 기본 파티셔너 사용시
    • 키의 해시 값을 구하고 그 값에 따라 특정 파티션에 메시지를 저장
  • 선택된 파티션에 데이터를 쓸 수 없다면 에러가 발생 하겠지만, 거의 발생하지 않음.
  • 같은 키와 대응되는 파티션이 변경되지 않게 하려면, 충분한 수의 파티션을 갖는 토픽을 생성하고 이후로는 파티션을 추가하지 않는 것이 가장 쉬운 방법임.
  • 커스텀 파티셔너 구현