module
PG::Replication::Handler
Defined in:
pg/replication.crInstance Method Summary
-
#received(data : PG::Replication::XLogData, connection : PG::Replication::Connection, &)
This method must be defined in order to tell the
Connectionhow much of the WAL has been flushed and applied. -
#received(frame)
Override this method with any of the
WALMessagesubclasses to handle receiving replication messages of that type.
Instance Method Detail
abstract
def received(data : PG::Replication::XLogData, connection : PG::Replication::Connection, &)
#
This method must be defined in order to tell the Connection how much of
the WAL has been flushed and applied.
connection = PG.listen_replication db_url,
handler: MyHandler.new,
publication_name: "my_publication",
slot_name: "my_replication_slot"
class MyHandler
include PG::Replication::Handler
def received(data : PG::Replication::XLogData, connection : PG::Replication::Connection, &)
yield
connection.last_wal_byte_flushed = data.wal_end
if data.message.is_a? PG::Replication::Commit
connection.last_wal_byte_applied = data.wal_end
end
end
end
def received(frame)
#
Override this method with any of the WALMessage subclasses to handle
receiving replication messages of that type.
class MyHandler
include PG::Replication::Handler
# Using some hypothetical Kafka client for CDC
def initialize(@kafka : Kafka::Client)
end
def received(insert : PG::Replication::Insert)
@kafka.publisher.publish({
oid: insert.oid,
data: insert.tuple_data,
}.to_msgpack)
end
end