class Kafka::Producer

Defined in:

kafka/producer.cr
kafka/producer/delivery_report.cr
kafka/producer/statistics.cr

Constant Summary

DEFAULT_POLL_INTERVAL_SECONDS = 5

Constructors

Instance Method Summary

Constructor Detail

def self.new(config : Hash(String, String), stats_path = "", poll_interval : Int32 = DEFAULT_POLL_INTERVAL_SECONDS) #

[View source]

Instance Method Detail

def finalize #

Destroy the Kafka handle.

Calls the rd_kafka_destroy C function.


[View source]
def flush(timeout = 1000) #

Wait until all outstanding produce requests, et.al, are completed. This should typically be done prior to destroying a producer instance to make sure all queued and in-flight produce requests are completed before terminating.

Calls the rd_kafka_flush C function.


[View source]
def poll(timeout = 500) #

Polls the Kafka handle for events

An application should make sure to call #poll at regular intervals to serve any queued callbacks waiting to be called.


[View source]
def produce(topic : String, key : Bytes, payload : Bytes, timestamp : Int64) #

Produce and send a single message to broker.

Raises a Kafka::ProducerException when produce fails. Calls the rd_kafka_producev C function.


[View source]
def produce(topic : String, key : Bytes, payload : Bytes) #

Produce and send a single message to broker.

Raises a Kafka::ProducerException when produce fails. Calls the rd_kafka_producev C function.


[View source]
def produce(topic : String, payload : Bytes) #

Produce and send a single message to broker.

Raises a Kafka::ProducerException when produce fails. Calls the rd_kafka_producev C function.


[View source]
def produce(topic : String, msg : Message) #

Produce and send a single message to broker.

Raises a Kafka::ProducerException when produce fails. Calls the rd_kafka_produce C function.


[View source]
def produce_batch(topic : String, batch : Array(NamedTuple(key: Array(UInt8), msg: Array(UInt8)))) #

Produce and send multiple messages to broker.

Raises a Kafka::ProducerException when produce fails. Calls the rd_kafka_produce C function.


[View source]