Research

PostgreSQL 15 살펴보기 (부제: Logical Decoding Message 의 이점)

smileostrich 2023. 4. 2. 02:33

PostgreSQL 15를 살펴보다가 재밌는 내용들이 보여서 오랜만에 작성한 포스팅.

 

오늘 포스팅을 하게 된 핵심 콘텐츠는 바로 pg_logical_emit_message(transactional boolean, prefix text, content text) 메소드.

메서드명을 통해, logical decoding message를 emit 할 수 있음을 유추할 수 있는데,
메서드를 바로 설명하기에 앞서 몇 가지 개념을 살펴보고 가자.


1. Logical Decoding

Postgresql 공식 홈페이지 에는 아래와 같이 설명되어 있다.

Logical decoding is the process of extracting all persistent changes to a database's tables into a coherent, easy to understand format which can be interpreted without detailed knowledge of the database's internal state.
(logical decoding은 데이터베이스 테이블의 모든 지속적인 변경 사항을 데이터베이스의 내부 상태에 대한 자세한 지식 없이도 해석할 수 있는 일관되고 이해하기 쉬운 형식으로 추출하는 프로세스이다)

 

즉, postgres 의 변경 데이터 캡처 기능이라고 이해해도 무방하다.

 

2. Replication Slots

Logical Replication 에서의 슬롯(Streaming replication slot 과는 다름)은 원본 서버에서 변경된 순서대로 클라이언트로 재생할 수 있는 변경 사항의 스트림을 나타낸다. 즉, 각 슬롯은 단일 DB 에서 일련의 변경 사항을 스트리밍 한다.

복제 슬롯의 특성들을 한번 살펴보자.

  • crash-safe
    복제 슬롯은 PostgreSQL 클러스터의 모든 데이터베이스를 통틀어 고유한 식별자를 가진다. 또한, 슬롯은 각 슬롯 간의 연결이 독립적으로 유지되기에 crash-safe 한 특성을 지닌다.
  • emit once
    logical 슬롯은 정상 작동 시 각 변경 사항을 한 번만 내보낸다. 각 슬롯의 현재 위치는 체크포인트에서만 유지되므로 충돌이 발생하면 슬롯이 이전 LSN(log sequence number)으로 돌아가 서버가 재시작될 때 최근 변경 사항이 다시 전송될 수 있다. 클라이언트는 decoding할 때 마지막으로 본 LSN 을 기록하고 반복되는 데이터를 건너뛰거나 해당 LSN부터 decoding 을 시작할 수 있다.
  • independent
    단일 데이터베이스에 여러 개의 독립적인 슬롯이 존재할 수 있다. 즉, 각 슬롯은 고유한 상태를 가지므로 서로 다른 컨슈머가 DB 변경 스트림의 서로 다른 지점에서 변경 사항을 수신할 수 있다. (대부분의 애플리케이션의 경우 각 컨슈머 마다 별도의 슬롯이 필요함)
  • Only one receiver may consume changes from a slot at any given time
    (어쩌면 당연하게도) logical 복제 슬롯은 receiver 의 상태에 대해 아무것도 알지 못한다.
    따라서, 동일한 슬롯을 서로 다른 시간에 여러 개의 다른 receiver 가 사용해도 마지막 receiver 가 변경 내용 consume 을 중단한 시점의 변경 내용만 이어서 가져올 수 있게 된다.
    즉, 한 번에 하나의 receiver 만 슬롯 변경 내용을 consume 할 수 있다.

 

logical decoding 메시지, 복제 슬롯은 아래와 같은 메서드들을 통해 관리할 수 있다. (이외에도 LSN, 복제 슬롯 상태 등도 체크할 수 있음)

  • pg_logical_slot_get_binary_changes()
    슬롯에서 바이너리 메시지를 검색
  • pg_logical_slot_peek_changes()
    슬롯의 변경 사항 peek 
  • pg_replication_slot_advance()
    복제 슬롯 전진
  • pg_drop_replication_slot()
    복제 슬롯 삭제

 

어느 정도 개념을 파악했으니, 이제 다시 pg_logical_emit_message(transactional boolean, prefix text, content text) 메소드로 돌아가서, 파라미터를 한번 살펴보자.

  1. transactional boolean
    이 파라미터를 통해 메시지가 현재 트랜잭션의 일부가 되어야 하는지, 아니면 logical 디코더가 레코드를 읽는 즉시 쓰여지고 decoding되어야 하는지를 지정한다.
  2. prefix text
    logical decoding 플러그인이 관심 있는 메시지를 쉽게 인식하는 데 사용할 수 있는 텍스트 접두사
    (k8s label / selector 용도로 이해하면 됨)
  3. content text(or bytea)
    텍스트 또는 바이너리 형식으로 제공되는 메시지의 콘텐츠
    (메시지의 payload 라고 보면 됨)

조금 더 쉽게 얘기해 보자면, WAL 에 메시지를 쓸 수 있게 된다는 걸 의미한다!

logical decoding 을 사용하여 WAL에서 해당 메시지를 검색하고 처리한 후 외부 컨슈머에게 전달할 수 있다.

이 기능이 얼마나 매력적인지 이제 조금씩 감이 잡히기 시작할 것이다.

 

하지만 결국 우리가 궁금하고 관심 있는 건 결국 이걸로 뭘 할 수 있는데? 일 것이다.

이러한 궁금증은 아래 3가지 유즈케이스를 통해 해소해 보자.

  1. outbox pattern 을 통해 마이크로 서비스 간 데이터 propagate
  2. 로깅
  3. 감사 로그

(+TMI. 사실 이 포스트를 작성하기 시작한 몇 시간 전만 하더라도, 감사 로그, 로깅 2가지의 유즈케이스밖에 없었는데, 해당 내용 작성 중 좋은 글 를 발견하게 되었기에 outbox pattern 을 비롯해 내용이 전반적으로 수정되었다. 따라서 내용이 다소 중복될 수 있음)

 

The Wonders of Postgres Logical Decoding Messages

In this article, author Gunnar Morling discusses Postgres database's logical decoding function to retrieve the messages from write-ahead log, process them, and relay them to external consumers, with help of use cases like outbox, audit logs and replication

www.infoq.com

 

우선, outbox pattern 부터 살펴보도록 하겠다.

 

1. outbox pattern 을 통해 마이크로 서비스 간 데이터 propagate

마이크로서비스의 경우 요청을 처리할 때 서비스가 자체 데이터베이스를 업데이트하고 동시에 다른 서비스에 메시지를 보내야 하는 것이 일반적인 요구 사항일 것이다. 

MSA 예시로 흔히 사용되는 전자 상거래 시나리오를 통해 쉽게 이해해 보도록 하자.

전자 상거래 시나리오에서 "주문 처리" 서비스를 생각해 보면, 배송 상태가 "배송 준비중" 에서 '배송됨'으로 변경되면 주문 처리 서비스 DB 의 배송 기록도 그에 따라 업데이트되어야 하지만, '고객' 서비스에도 메시지를 전송하여 고객의 계정 기록을 업데이트하고 고객에게 이메일 알림을 트리거할 수 있도록 해야 한다.

이제 서비스를 연결하기 위해 Kafka 를 사용하는 경우, 주문 처리 서비스가 로컬 데이터베이스 트랜잭션을 발행한 다음 Kafka를 통해 메시지를 보내도록 하는 것만으로는 이 시나리오를 안정적으로 구현할 수 없다. 그 이유는 DB 와 카프카에 대한 공유 트랜잭션이 지원되지 않기 때문이다(기술적인 측면에서 카프카는 XA 와 같은 분산 트랜잭션 프로토콜에 참여할 수 없다). 표면적으로는 모든 것이 정상적으로 보이지만 장애가 발생할 경우 consistency 가 보장되지 않는다. 데이터베이스 트랜잭션은 커밋될 수 있지만 Kafka 를 통해 알림을 보내는 데 실패할 수 있다. (반대로 고객 서비스에는 알림이 전송되지만 로컬 데이터베이스 트랜잭션은 롤백되는 경우도 있음)

 

이 문제를 outbox 패턴을 통해 해결할 수 있게 된다.즉, 한 번에 두 개의 리소스(DB와 카프카)를 업데이트하는 대신 하나의 리소스(DB)만 업데이트하는 것. DB 에서 발송 상태를 업데이트할 때, 보낼 메시지도 outbox 테이블에 쓰게 되는데, 이 작업은 하나의 공유 트랜잭션의 일부로, ACID 트랜잭션에서 얻게 되는 원자성을 보장받게 된다. (즉, 발송 상태 업데이트와 outbox 메시지 모두 유지되거나, 둘 다 유지되지 않음) 이후, change data capture 를 사용해 데이터베이스의 outbox 에서 모든 삽입물을 검색하여 컨슈머에게 전파한다.

 

패턴에 대한 구현 접근 방식에 대해 자세히 살펴보자면,

outbox 이벤트를 전용 outbox 테이블에 삽입하는 대신 WAL에 대한 logical decoding Message로 내보내는 것이 핵심 아이디어이다.

 

두 접근 방식 모두 장단점이 있다.

  • logical decoding 메시지를 통해 라우팅 하는 방식
    장점 1. 관리가 필요하지 않다.
    (outbox 테이블과 달리 트랜잭션 로그에서 메시지가 소비된 후 메시지를 제거할 필요가 없다)
    장점 2. append-only 방식의 특성으로 인해, 메시지 "수정" 자체가 발생하지 않는다. (table-based 방식은 수정 발생 가능)
  • table-based approach
    outbox 메시지의 내용과 관련해, 일반적으로 유연성이 있다. (JSON, Apache Avro, Google 프로토콜 버퍼 등 원하는 형식으로 직렬화 가능)

일반적으로 사용되는 한 가지 접근 방식은 domain-driven design 관점에서 outbox 이벤트와 해당 스키마의 디자인을 살펴보는 것이다. 특히 Debezium 은 메시지에 아래와 같은 속성을 갖도록 권장.

  • id
    컨슈머가 유일성을 보장하기 위해 사용할 수 있는 고유한 메시지 ID (예시 : UUID)
  • aggregate type
    “customer,” “shipment,” “purchase order” 와 같이 이벤트에 대한 집계 종류를 의미한다. 
    Kafka 와 같은 스트리밍 플랫폼을 통해 outbox 이벤트를 전파할 때 하나의 aggregate type 이벤트를 특정 토픽으로 보내는 데 사용할 수 있다.
  • aggregate id
    고객 또는 주문 ID 와 같이 이벤트에 대한 집계 ID. 
    Kafka 에서 레코드 키로 사용될 수 있기에, 하나의 aggregate 와 관련된 모든 이벤트가 동일한 토픽 파티션으로 이동하고 컨슈머가 이러한 이벤트를 올바른 순서로 수신하도록 보장할 수 있다.
  • payload
    실제 메시지 페이로드. 
    "raw" tabel-level CDC 이벤트와 달리, DB 자체에서 여러 테이블에 분산될 수 있는 전체 집계 및 모든 부분을 나타내는 구조일 수 있다.

 

이론은 충분히 살펴봤으니, 이제 outbox 이벤트와 함께 logical decoding message 를 내보내는 과정을 실습해보도록 하자. (혹시 같이 실습해보고 싶으신 분을 위해 링크를 남겨놓겠습니다)

 

우선, 아래와 같이 outbox 메시지를 emit 해주자.

postgresuser@postgres:demodb> SELECT * FROM pg_logical_emit_message(
  true,
  'outbox',
  '{
    "id" : "298c2cc3-71bb-4d2b-b5b4-1b14006d56e6",
    "aggregate_type" : "shipment",
    "aggregate_id" : 42,
    "payload" : {
      "customer_id" : 7398,
      "item_id" : 8123,
      "status" : "SHIPPED",
      "numberOfPackages" : 3,
      "address" : "Bob Summers, 12 Main St., 90210, Los Angeles/CA, US"
    }
  }'
 );

이렇게 하면 트랜잭션 메시지가 생성된다. (동일한 트랜잭션에 삽입된 다른 레코드의 제약 조건 위반으로 인해 트랜잭션이 중단되는 경우 메시지가 전송되지 않음). 이 메시지는 다른 유형의 메시지와 구분할 수 있도록 "outbox" 접두사를 사용하며 실제 페이로드로 JSON 메시지를 포함한다.

변경 이벤트를 검색하여 Kafka 로 전파하는 것과 관련하여 세부 사항은 기본 CDC 도구인 Debezium이 어떻게 배포되는지에 따라 달라진다. Kafka Connect 와 함께 사용할 경우, Debezium 은 단일 메시지 변환(SMT)을 제공하여, (예를 들어) aggregate type 이 포함된 구성 가능한 열을 기반으로 outbox 테이블을 지원하고 아웃박스 이벤트를 Kafka의 여러 토픽으로 라우팅할 수 있음.

(다만, 이 SMT 는 아직 logical decoding message를 outbox 포맷으로 사용하는 것을 지원하지 않고 있기에 아쉬움)

 

아래 코드는 Flink CDC 를 통해 Debezium 를 사용했다. KafkaRecordSerializationSchema 를 implement 한 코드로, outbox 이벤트를 올바른 Kafka 토픽으로 라우팅하고 aggregate ID 를 Kafka 메시지 키로 propagate 해주고 올바른 순서 시맨틱을 보장한다.

private static class OutboxSerializer implements KafkaRecordSerializationSchema<ChangeEvent> {

    private static final long serialVersionUID = 1L;

    private ObjectMapper mapper;

    @Override
    public ProducerRecord<byte[], byte[]> serialize(ChangeEvent element, KafkaSinkContext context, Long timestamp) {
        try {
            JsonNode content = element.getMessage().getContent();

            ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>(
                    content.get("aggregate_type").asText(),
                    content.get("aggregate_id").asText().getBytes(Charsets.UTF_8),
                    mapper.writeValueAsBytes(content.get("payload")));
                    
            record.headers().add("message_id", content.get("id").asText().getBytes(Charsets.UTF_8));

            return record;
        }
        catch (JsonProcessingException e) {
            throw new IllegalArgumentException("Couldn't serialize outbox message", e);
        }
    }

    @Override
    public void open(InitializationContext context, KafkaSinkContext sinkContext) throws Exception {
        mapper = new ObjectMapper();
        SimpleModule module = new SimpleModule();
        module.addDeserializer(Message.class, new MessageDeserializer());
        mapper.registerModule(module);
    }
}

Flink job 을 적용하면, 다음과 같이 "shpipment" Kafka 토픽에 대한 outbox 메시지를 검사할 수 있다.

docker run --tty --rm \
  --network logical-decoding-network \
  quay.io/debezium/tooling:1.2 \
  kcat -b kafka:9092 -C -o beginning -q -t shipment \
  -f '%k -- %h -- %s\n'

42 -- message_id=298c2cc3-71bb-4d2b-b5b4-1b14006d56e6 -- {"customer_id":7398,"item_id":8123,"status":"SHIPPED","numberOfPackages":3,"address":"Bob Summers, 12 Main St., 90210, Los Angeles/CA, US"}

 

토픽 이름은 지정된 aggregate type 에 해당한다.

예를 들어, 다른 aggregate type 들에 대한 outbox 이벤트를 발행하는 경우, 그에 따라 다른 토픽으로 라우팅된다. (메시지 키는 42이며 집계 ID와 일치함) (aggregate ID 는 Kafka 메시지 헤더로 전파되므로, 컨슈머는 이미 수신 및 처리된 ID 를 추적하고 발생할 수 있는 잠재적 중복을 무시하여 효율적인 중복 제거를 구현할 수 있다.)

마지막으로, outbox 이벤트의 페이로드는 카프카 메시지 값으로 전파된다.

 

지금까지 outbox pattern 에 대해 살펴보았다.

좀 더 살펴보고 싶다면, outbox pattern 을 잘 설명하고 있는 이 포스트 를 참고해보면 좋을것 같다

(+ 여러 서비스 간 Sagas를 구현하기 위해 outbox 패턴을 사용하는 방법에 대한 포스트)

 

이어서 두 번째 usecase 인 로깅을 살펴보도록 하겠다.

 

 

2. 로깅

로그 관리는 일반적으로 전용 API 또는 파일 시스템의 로그에서 변경 사항을 수집하는 Datadog 또는 Splunk 을 주로 사용하지만, 때로는 애플리케이션의 데이터베이스에서 로그 메시지를 관리해야 할 경우도 있다.

Java 진영에서는 log4j2 나 logback 과 같은 라이브러리들이 JDBCAppender 를 제공하기에 로그 메시지를 편하게 관리할 수는 있지만,

문제점이 존재한다.

바로, 트랜잭션이 롤백되는 경우에도 로그 메시지를 기록하여 장애 분석에 도움이 되기를 원하기 때문에 일반적으로 로거에 대한 DB 에 추가로 엑세스 해야한다는 점이다.

 

Non-transactional logical decoding message는 단일 연결을 사용하면서도 트랜잭션이 롤백될 때에도 로그 메시지가 지속되도록 하는 좋은 수단이 될 수 있다.

 

이해를 돕기 위해, 트랜잭션이 2개 있고 그 중 하나는 커밋되고 하나는 롤백되는 상황으로 실습해보겠다.

 

우선, pgcli 셸에서 아래 명령문을 실행해보자.

–- Assuming this table: CREATE TABLE data (id INTEGER, value TEXT);
BEGIN;
INSERT INTO data(id, value) VALUES('1', 'foo');
SELECT * FROM pg_logical_emit_message(false, 'log', 'OK');
INSERT INTO data(id, value) VALUES('2', 'bar');
COMMIT;

BEGIN;
INSERT INTO data(id, value) VALUES('3', 'baz');
SELECT * FROM pg_logical_emit_message(false, 'log', 'ERROR');
INSERT INTO data(id, value) VALUES('4', 'qux');
ROLLBACK;

 

첫 번째 트랜잭션은 새 테이블 'data' 에 2개의 레코드를 삽입하고 logical decoding message 도 전송한다. (두 번째 트랜잭션은 비슷한 변경 사항을 적용하지만 롤백됨)

복제 슬롯에서 변경 이벤트를 검색할 때 아래와 같은 이벤트가 반환된다

postgresuser@postgres:demodb> SELECT * FROM pg_logical_slot_peek_changes('demo_slot', NULL, NULL) order by lsn;

+-----------+-------+------------------------------------------------------------+
| lsn       | xid   | data                                                       |
|-----------+-------+------------------------------------------------------------|
| 0/1A483F8 | 768   | BEGIN 768                                                  |
| 0/1A504B8 | 768   | table public.data: INSERT: id[integer]:1 value[text]:'foo' |
| 0/1A50530 | 768   | message: transactional: 0 prefix: log, sz: 2 content:OK    |
| 0/1A50530 | 768   | table public.data: INSERT: id[integer]:2 value[text]:'bar' |
| 0/1A509B8 | 768   | COMMIT 768                                                 |
| 0/1A50A38 | 769   | message: transactional: 0 prefix: log, sz: 5 content:ERROR |
+-----------+-------+------------------------------------------------------------+

 

예상대로 2개의 INSERT 이벤트와 첫 번째 트랜잭션에 대한 로그 메시지가 있다. (INSERT 문에 대한 중단된 트랜잭션은 롤백되었기 때문에 변경 이벤트가 없음)

그러나 logical decoding message 는 트랜잭션이 아니었기 때문에 여전히 WAL에 기록되어 검색할 수 있다.

 

 

3. 감사 로그

보통 개발을 하다보면, 귀찮지만 빼놓을 수 없는것 중 한 가지가 바로 감사 로그일 것이다.

이러한 감사 로그를 작성하는 방법에는 여러 가지가 있는데, 하나씩 살펴보도록 하자.

  1. 데이터가 변경될 때마다 이전 레코드 버전을 별도의 기록 테이블에 따로 로깅하기
    사실 이 방법은 복잡도를 높이고, (감사 로깅용 데이터가 트랜잭션에 추가되기에, write latency 도 증가) 성능에도 영향을 미친다.
  2. 변경 데이터 캡처
    이 방식을 채택하면, 1번 방법과는 달리, 어플리케이션 단에서 추가로 작업할 필요가 없다. 
    테이블에 대해 실행된 모든 삽입, 업데이트 및 삭제에 대한 이벤트가 포함된 변경 이벤트 스트림(예: 정의에 따라 레코드가 변경 불가능한 Apache Kafka의 항목으로 유지됨)은 간단한 형태의 감사 로그로 간주될 수 있다. CDC 프로세스는 비동기식으로 실행되므로 트랜잭션 쓰기에 대한 대기 시간 영향이 없다.

다만, 이 접근 방식에도 한 가지 단점이 있다.

적어도 가장 기본적인 형태로는 주어진 변경을 수행하는 애플리케이션 사용자, 장치 구성 또는 IP 주소와 같은 클라이언트 정보, 사용 사례 식별자 등과 같은 상황별 메타데이터를 캡처하지 않는다는 것이다. (이 데이터는 애플리케이션의 비즈니스 테이블에 저장되지 않으므로 원시 변경 데이터 이벤트에 노출되지 않음)

 

위 단점을 해결하기 위해, 방법을 찾다보니, 아주 좋은 을 통해 인사이트를 얻을 수 있었다.

바로, Apache Flink 를 활용하는 것. (logical decoding message와 스트림 처리를 Apache Flink 와 결합하면 해결됨)

각 트랜잭션이 시작될 때 소스 응용 프로그램은 필요한 모든 메타데이터를 메시지에 기록한다. (수정된 각 레코드에 대한 전체 기록 항목을 작성하는 것과 비교하면, 이 방식은 쓰기에 약간의 오버헤드만 추가되기에 훨씬 뛰어나다) 

이후, Flink job 을 통해 해당 메타데이터로 동일한 트랜잭션의 모든 후속 변경 이벤트를 보강할 수 있다. (Debezium 에서 emit 한 모든 변경 이벤트에는 logical decoding message를 포함하여 발생한 트랜잭션의 ID 가 포함되어 있으므로 한 트랜잭션의 이벤트를 연관시키는 것은 복잡하지 않다)

 

이론만 보다보니, 실제로 한번 테스트 해보고 싶어져 인터넷을 뒤져보다가 gunnarmorling 님께서 이미 좋은 예제를 만들어 주셨기에, 해당 Repo 를 기반으로 테스트 해보았다. (보다 자세한 설명과, 소스코드는 여기를 참고)

이제, 코드를살펴보며 결과를 보도록 하자.

 

아래는 Apache Flink 의 RichFlatMapFunctionimplement 한 코드 중 일부를 발췌했다.

로직은 간단하다.

  • 수신 이벤트의 유형이 "m"(=logical decoding message)이면서, 감사 메타데이터 이벤트인 경우
    -> 이벤트의 내용을 Flink value state 에 넣는다.
  • 들어오는 이벤트가 다른 유형이고 이전에 이벤트의 트랜잭션에 대한 감사 state 를 저장한 적이 있는 경우
    -> 해당 state 로 이벤트를 업데이트 한다.
  • 수신 이벤트의 트랜잭션 ID 가 감사 state 에 저장된 것과 일치하지 않는 경우(예를들어, 처음에 메타데이터 이벤트 없이 트랜잭션이 발행된 경우) -> state store 를 날리고, 이벤트를 그대로 propagate 해준다.
public void flatMap(String value, Collector<String> out) throws Exception {

    ChangeEvent changeEvent = mapper.readValue(value, ChangeEvent.class);
    String op = changeEvent.getOp();
    String txId = changeEvent.getSource().get("txId").asText();

    // logical decoding message
    if (op.equals("m")) {
    	Message message = changeEvent.getMessage();

        // audit metadata message -> remember it
        if (message.getPrefix().equals("audit")) {
            localAuditState = new AuditState(txId, message.getContent());
            return;
        }
        else
            out.collect(value);
    }
    // data change event -> enrich it with the metadata
    else {
        if (txId != null && localAuditState != null) {
            if (txId.equals(localAuditState.getTxId()))
                changeEvent.setAuditData(localAuditState.getState());
    	}
        else
            localAuditState = null;
    }
        
    changeEvent.setTransaction(null);
    out.collect(mapper.writeValueAsString(changeEvent));
}

 

우선 실습을 위해, Postgres용 Flink CDC connector 에 대해 해당 매핑 기능을 수행하는 Flink job 을 실행하고, 일부 데이터 변경을 트리거해준다.

(준비가 되었다면) 이제 트랜잭션 메타데이터(사용자 이름 및 클라이언트 IP 주소)가 포함된 logical decoding message를 emit 하고, 두 개의 INSERT 문을 전송하는 트랜잭션을 살펴보자.

BEGIN;
SELECT * FROM pg_logical_emit_message(true, 'audit', '{ "user" : "bob@example.com", "client" : "10.0.0.1" }');
INSERT INTO inventory.customer(first_name, last_name, email) VALUES ('Bob', 'Green', 'bob@example.com');
INSERT INTO inventory.address
  (customer_id, type, line_1, line_2, zip_code, city, country)
VALUES
  (currval('inventory.customer_id_seq'), 'Home', '12 Main St.', 'sdf', '90210', 'Los Angeles', 'US');
COMMIT;

 

이제, 결과를 살펴보자!

아래와 같이, Apache Flink 가 변경 이벤트를 정상적으로 emit 해주는것을 볼 수 있다 :)

{
  "op" : "c",
  "ts_ms" : 1673434483049,
  "source" : {
    "connector" : "postgresql",
    "snapshot" : false,
    "db" : "demodb",
    "table" : "customer"
    "lsn" : 24023128,
    "txId" : 555,
    ...
  },
  "before" : null,
  "after" : {
    "id" : 1018,
    "first_name" : "Bob",
    "last_name" : "Green",
    "email" : "bobasdf@example.com"
  },
  "auditData" : {
    "user" : "bob@example.com",
    "client" : "10.0.0.1"
  }
}
{
  "op" : "c",
  "ts_ms" : 1673434483050,
  "source" : {
    "connector" : "postgresql",
    "snapshot" : false,
    "db" : "demodb",
    "table" : "address"
    "lsn" : 24023129,
    "txId" : 555,
    ...
  },
  "before" : null,
  "after" : {
    "id" : 10007,
    "customer_id" : 1018,
    "type" : "Home",
    "line_1" : "12 Main St.",
    "line_2" : "sdf",
    "zip_code" : "90210",
    "city" : "Los Angeles",
    "country" : "US"
  },
  "auditData" : {
    "user" : "bob@example.com",
    "client" : "10.0.0.1"
  }
}

이제 동일한 Flink job 내에서 sink connector 를 추가하고, 내용이 반영된 이벤트를 Kafka 토픽에 기록할 수 있다. (혹은 필요에 따라, 변경 이벤트를 쿼리 가능한 OLAP 저장소로 propagate 할 수 있다고 함).


앞선 두 포스팅(etcd, cockroachD)과는 다르게, 이 글은 발표를 목적으로 작성하진 않았기에,

가볍게 살펴보려고 했는데, 늘 그렇듯 보다보면 궁금해지고 테스트해보고 싶어져서 글이 길어졌다.

오랜만에 살펴본 postgres 가 생각보다 많이 재미있어서 시간 가는줄 모르고 살펴봤는데,

이 글을 읽고 계신 분들도 이 즐거움을 함께 느꼈으면 좋겠다.

 

혹시 이 글을 읽다가, 궁금하신 내용이 있으시다면 언제든 편하게 연락(ian.ilminmoon@gmail.com)주시길 바랍니다 :)

(피드백도 매우 환영합니다!)

'Research' 카테고리의 다른 글

What is IO_uring? (Inside IO_uring)  (1) 2023.05.29