class Kafka::Topic

Defined in:

kafka/topic.cr

Constructors

Instance Method Summary

Constructor Detail

def self.new(rk : Consumer | Producer, topic : String, conf : TopicConfiguration) #

Creates a new topic handle for topic named topic.


[View source]

Instance Method Detail

def consume(partition : Int32, timeout_ms = 1000) : Message | Nil #

Consume a single message from partition.


[View source]
def consume_batch(partition : Int32, timeout_ms = 1000, rkmessages_size = 64) : Array(Message) #

Consume up to rkmessages_size from partition putting a pointer to each message in the application provided array rkmessages (of size rkmessages_size entries).


[View source]
def consume_callback(partition : Int32, timeout_ms = 1000, &consume_cb : Message -> ) #

Consumes messages from partition, calling the provided callback for each consumed messsage.


[View source]
def consume_start(partition : Int32, offset : Int64, rkqu : Queue) #

Start consuming messages for partition at offset, but re-routes incoming messages to the provided queue rkqu.


[View source]
def consume_start(partition : Int32, offset : Int64) #

Start consuming messages for partition at offset.


[View source]
def consume_stop(partition : Int32) #

Stop consuming messages for partition, purging all messages currently in the local queue.


[View source]
def produce(partition : Int32, payload : String, key : String = nil, msg_opaque = nil) #

Produce and send a single message to broker.


[View source]
def produce_batch(partition : Int32) #

Produce multiple messages.


[View source]
def to_unsafe : LibKafka::RdKafkaTopicT #

[View source]