class Kafka::Consumer
- Kafka::Consumer
- Reference
- Object
Overview
represents a kafka (polling) consumer based on: https://github.com/edenhill/librdkafka/blob/master/examples/rdkafka_consumer_example.c
Should be initialized in this order before consuming:
kafka = Kafka::Consumer.new(conf) # conf should have group.id set
kafka.add_brokers "localhost:9092"
kafka.set_topic_partition "some topic"
Defined in:
kafka/consumer.crConstructors
-
.new(conf : Config)
creates a new kafka handle using provided config.
Instance Method Summary
- #add_brokers(brokerList : String)
-
#consume(timeout_ms : Int32 = 25) : Message | Nil
dequeues a single message Will start consume session, if not already started.
-
#running : Bool
returns true if a consumer session is active
-
#set_topic_partition(name : String, partition : Int32 = 0)
Set the topic to use for produce() calls.
-
#stop
closes the consumer if running.
Constructor Detail
creates a new kafka handle using provided config. Throws exception on error
Instance Method Detail
dequeues a single message Will start consume session, if not already started. returns message or nil
def set_topic_partition(name : String, partition : Int32 = 0)
#
Set the topic to use for produce() calls. Raises exception on error.