AsyncProducer (Select) in Go
This example shows how to use the producer while simultaneously reading the Errors channel to know about any failures.
package main
import (
"log"
"os"
"os/signal"
)
func main() {
producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
}
defer func() {
if err := producer.Close(); err != nil {
log.Fatalln(err)
}
}()
// Trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
var enqueued, errors int
ProducerLoop:
for {
select {
case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
enqueued++
case err := <-producer.Errors():
log.Println("Failed to produce message", err)
errors++
case <-signals:
break ProducerLoop
}
}
log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
}