AsyncProducer (Goroutines) in Go
This example shows how to use the producer with separate goroutines reading from the Successes and Errors channels. Note that in order for the Successes channel to be populated, you have to set config.Producer.Return.Successes to true.
package main
import (
"log"
"os"
"os/signal"
"sync"
)
func main() {
config := NewConfig()
config.Producer.Return.Successes = true
producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}
// Trap SIGINT to trigger a graceful shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
var (
wg sync.WaitGroup
enqueued, successes, errors int
)
wg.Add(1)
go func() {
defer wg.Done()
for range producer.Successes() {
successes++
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for err := range producer.Errors() {
log.Println(err)
errors++
}
}()
ProducerLoop:
for {
message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
select {
case producer.Input() <- message:
enqueued++
case <-signals:
producer.AsyncClose() // Trigger a shutdown of the producer.
break ProducerLoop
}
}
wg.Wait()
log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
}