class Kafka::Consumer

Defined in:

kafka/consumer.cr
kafka/consumer/rebalance.cr

Constructors

Instance Method Summary

Constructor Detail

def self.new(config : Hash(String, String)) #

[View source]

Instance Method Detail

def close #

Close the consumer and destroy the Kafka handle.

Calls the rd_kafka_consumer_close and rd_kafka_destroy C functions.


[View source]
def each(timeout = 250, &) #

Loops indefinitely calling #poll at the given interval timeout.

At the beginning of each loop, Fiber.yield is called allow other Fibers to run.


[View source]
def poll(timeout_ms : Int32) : Message | Nil #

Poll the consumer for messages or events.

Calls the rd_kafka_consumer_poll C function.


[View source]
def subscribe(*topics) #

Subscribe to topics using balanced consumer groups.

Supports regex - start topic with '^'. For example:

consumer.subcribe("^foo") # will match any topics that starts with foo.

Raises a Kafka::ConsumerException when the subscribe fails.

Calls the rd_kafka_subscribe C function.


[View source]