class Kafka::Consumer

Overview

represents a kafka (polling) consumer based on: https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_consumer_example.c

Should be initialized in this order before consuming:

   kafka = Kafka::Consumer.new(conf) # conf should have group.id set
   kafka.add_brokers "localhost:9092"
   kafka.set_topic_partition "some topic"

Defined in:

kafka/consumer.cr

Constructors

Instance Method Summary

Constructor Detail

def self.new(conf : Config) #

creates a new kafka handle using provided config. Throws exception on error


[View source]

Instance Method Detail

def add_brokers(brokerList : String) #

[View source]
def consume(timeout_ms : Int32 = 25) : Message | Nil #

dequeues a single message Will start consume session, if not already started. returns message or nil


[View source]
def running : Bool #

returns true if a consumer session is active


[View source]
def set_topic_partition(name : String, partition : Int32 = 0) #

Set the topic to use for produce() calls. Raises exception on error.


[View source]
def stop #

closes the consumer if running.


[View source]