Kafka
kafka
go get -v github.com/confluentinc/confluent-kafka-go
func KafkaConsumer(consumerGroup string) {
broker := Broker
group := consumerGroup
topics := Topic
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": broker,
"group.id": group,
"auto.offset.reset": "latest"})
defer c.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
os.Exit(1)
}
fmt.Printf("Created Consumer %v\n", c)
err = c.SubscribeTopics([]string{topics}, nil)
run := true
for run {
ev := c.Poll(0)
if ev == nil {
continue
}
switch e := ev.(type) {
case *kafka.Message:
fmt.Printf("%s Message on %s:\n%s\n",
group, e.TopicPartition, string(e.Value))
if e.Headers != nil {
fmt.Printf("%% Headers: %v\n", e.Headers)
}
case kafka.Error:
fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
if e.Code() == kafka.ErrAllBrokersDown {
run = false
}
default:
fmt.Printf("Ignored %v\n", e)
}
}
}