module PG::Replication::Handler

Defined in:

pg/replication.cr

Instance Method Summary

Instance Method Detail

abstract def received(data : PG::Replication::XLogData, connection : PG::Replication::Connection, &) #

This method must be defined in order to tell the Connection how much of the WAL has been flushed and applied.

connection = PG.listen_replication db_url,
  handler: MyHandler.new,
  publication_name: "my_publication",
  slot_name: "my_replication_slot"

class MyHandler
  include PG::Replication::Handler

  def received(data : PG::Replication::XLogData, connection : PG::Replication::Connection, &)
    yield
    connection.last_wal_byte_flushed = data.wal_end
    if data.message.is_a? PG::Replication::Commit
      connection.last_wal_byte_applied = data.wal_end
    end
  end
end

[View source]
def received(frame) #

Override this method with any of the WALMessage subclasses to handle receiving replication messages of that type.

class MyHandler
  include PG::Replication::Handler

  # Using some hypothetical Kafka client for CDC
  def initialize(@kafka : Kafka::Client)
  end

  def received(insert : PG::Replication::Insert)
    @kafka.publisher.publish({
      oid:  insert.oid,
      data: insert.tuple_data,
    }.to_msgpack)
  end
end

[View source]