GuriLogs.

KAFKA CONNECT (w/ group.id) 본문

Data Engineering/Kafka

KAFKA CONNECT (w/ group.id)

guri-sh 2024. 6. 2. 17:13

KAFKA CONNECT

출처 : https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/

 

다양한 데이터 소스 시스템에서 발생한 데이터 이벤트를 다른 데이터 타겟 시스템으로 별도의 Kafka Client 코딩 없이 Seamless하게 실시간으로 전달하기 위해 만들어진 Kafka Component.

 

Kafka Connect의 구성요소

  • Connect Cluster - 여러 개의 Connect를 group.id로 묶어서 하나의 Connect Cluster가 됨
  • Connect
    • Connect가 기동되면 JVM Process가 띄워짐. 이거를 워커가 함
    • 그래서 Connect를 띄운다 = 워커 프로세스를 띄운다  (이 부분이 제일 헷갈렸던 부분)
  • Connector - REST API를 통해 Connect 위 기동을 시킬 수 있음. Connect 위에 올라가는 Source와 Sink를 연결하는 컴포넌트
  • Task - 커넥터들이 일을 수행하는 단위, Source와 직접 인터페이스를 하는 요소
  • Thread - Task는 Thread 단위로 구성됨. 소스에서 멀티 스레드가 가능하면 여러 개의 Task를 띄울 수 있음. 즉 Task = Thread

하나의 노드(인스턴스)에서 여러 개의 Connect / Connector를 띄울 수 있음 (대신 포트는 다르게 설정) 다른 노드에서 여러 개의 Connect를 띄워도 group.id로 묶으면 하나로 생성할 수 있음.

출처 : me

흠.. OK, 근데 나는 워커를 두 개 이상 띄우고 싶어

고가용성, 확장성을 위해 운영에서는 카프카 커넥트 워커를 한 개가 아닌 두 개 이상으로 운영할 것이다.

이 때, Group ID를 활용한다.

 

위에서 언급했듯이 Connect는 하나의 워커일 뿐이다.

그런데 우리가 운영을 할 때 워커를 한 개만 띄우면 고가용성, 확장성이 보장되지 않는다.

그럼 워커를 2개 이상을 띄우려면 어떻게 해야 하나?

- Connect 서버를 두 개 띄우면 된다.

어 그러면 두 개의 서버가 서로 통신은 어떻게 하는거고 두 개의 워커가 어떻게 하나의 커넥터로서 작동하게 하나?

 

- 이 연결 역할을 하는 것이 group.id 이다.

Connect 설정의 group.id를 동일하게 부여하고 connect를 띄우면 (같은 kafka bootstrap일 때) 두 개의 connect는 하나의 connector에 대한 워커로서 그룹화가 된다.

이는 kafka connect를 분산 모드(distributed mode)로 띄웠을 때만 적용된다. 왜냐면 분산 모드일 때는 offset, config, status를 kafka cluster의 토픽 안에다 저장하므로 각 서버(워커)들이 동일한 토픽 데이터에 접근할 수 있다.

이렇게 Connect(워커)를 2개 이상 띄우면, 멀티 스레드가 가능한 Connector라면 2개 이상의 워커들이 작동을 할 것이고 멀티 스레드를 지원하지 않는 Connector(ex. Debezium)라면 한 개만 작동하고 나머지는 메인 워커가 죽었을 때를 대비하는 대기열로 띄워져있는다.

 

주요 Kafka Connect configuration

 참고 : https://kafka.apache.org/27/documentation.html#connectconfigs

중요한 부분만 발췌하여 작성한다.

  • group.id
    • A unique string that identifies the Connect cluster group this worker belongs to.
  • config.storage.topic / offset.storage.topic / status.storage.topic
    • config, offset, status 정보를 저장할 토픽 이름 지정
  • heartbeat.interval.ms (default : 3000 (3 seconds))
    • 카프카의 그룹 관리 기능을 사용할 때 group coordinator에게 보내는 heartbeat (살아있다고 신호보내는 것)
    • session.timeout.ms보다 낮게 설정해야 하지만 일반적으로 이 값의 1/3 이하로 설정하는 것이 좋다.
  • rebalance.timeout.ms (default : 60000 (1 minute))
    • 리밸런싱이 시작된 후 각 작업자가 그룹에 참여할 수 있는 최대 허용 시간
    • 시간 제한을 초과하면 작업자가 그룹에서 제거되어 오프셋 커밋이 실패
  • session.timeout.ms (default : 10000 (10 seconds))
    • 워커 장애를 감지하는 데 사용되는 타임아웃. 워커는 주기적으로 하트비트를 전송하여 브로커에 활성 상태를 알린다.
    • 이 세션 시간 제한이 만료되기 전에 브로커가 하트비트를 수신하지 못하면 브로커는 그룹에서 워커를 제거하고 재밸런싱을 시작
  • connections.max.idle.ms (default : 540000 (9 minutes))
    • 이 구성에서 지정한 밀리초가 지나면 유휴 연결을 닫는다. (지정한 시간동안 아무 작업이 없으면 연결을 끊는다)
  • listeners
    • REST API가 수신 대기할 쉼표로 구분된 URI 목록. 지원되는 프로토콜은 HTTP와 HTTPS.
    • 모든 인터페이스에 바인딩하려면 호스트 이름을 0.0.0.0으로, 기본 인터페이스에 바인딩하려면 값을 비워둔다.
      ex) HTTP://myhost:8083,HTTPS://myhost:8084
  • reconnect.backoff.max.ms (default : 1000 (1 second))
    • 반복적으로 연결에 실패한 브로커에 다시 연결할 때 대기할 수 있는 최대 시간(밀리초).
    • 이 값을 제공하면 호스트당 백오프는 연결이 연속적으로 실패할 때마다 이 최대값까지 기하급수적으로 증가한다.
      백오프 증가를 계산한 후에는 연결 폭주를 방지하기 위해 20%의 무작위 지터가 추가된다.
  • reconnect.backoff.ms (default : 50)
    • 지정된 호스트에 다시 연결을 시도하기 전에 대기할 기본 시간. 호스트에 반복적으로 연결되는 것을 방지할 수 있다.
      이 백오프는 클라이언트가 브로커에 연결하려는 모든 시도에 적용된다.
  • rest.advertised.host.name / rest.advertised.port / rest.advertised.listener
    • 다른 워커에 전달하는 내 hostname, port, listener (protocol)
  • retry.backoff.ms (default : 100)
    • 지정된 토픽 파티션에 실패한 요청을 다시 시도하기 전에 대기할 시간. 일부 장애 시나리오에서 요청이 반복적으로 타이트하게 전송되는 것을 방지할 수 있다.