Kafka

kafka

go get -v github.com/confluentinc/confluent-kafka-go
func KafkaConsumer(consumerGroup string) {
    broker := Broker
    group := consumerGroup
    topics := Topic

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": broker,
        "group.id":          group,
        "auto.offset.reset": "latest"})
    defer c.Close()
    if err != nil {
        fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
        os.Exit(1)
    }

    fmt.Printf("Created Consumer %v\n", c)

    err = c.SubscribeTopics([]string{topics}, nil)

    run := true

    for run {
        ev := c.Poll(0)
        if ev == nil {
            continue
        }

        switch e := ev.(type) {
        case *kafka.Message:
            fmt.Printf("%s Message on %s:\n%s\n",
                group, e.TopicPartition, string(e.Value))
            if e.Headers != nil {
                fmt.Printf("%% Headers: %v\n", e.Headers)
            }
        case kafka.Error:
            fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
            if e.Code() == kafka.ErrAllBrokersDown {
                run = false
            }
        default:
            fmt.Printf("Ignored %v\n", e)
        }
    }
}