Próbuję napisać konsument Kafka z Groupid foo, który subskrybuje pewnego tematu i odczytuje od samego początku (, nawet jeśli istnieje poprzedni przesunięcie ). Próbowałem użyć Subscribe za pomocą zwrotnego zrównoważenia, ale nigdy nie wydaje się być nazywane (ustawienie go.application).

Czy jest jakiś przykład, jak to zrobiłoby tę pracę?


Edytuj : Dodano więcej szczegółów

1
abergmeier 23 luty 2019, 00:09

2 odpowiedzi

Najlepsza odpowiedź

Poszliśmy teraz z ustawieniem enable.auto.commit do false. W ten sposób nie będzie przechowywanych offsetów i spożywamy od początku od każdego biegu w porządku.

0
abergmeier 25 luty 2019, 14:09

Ten przykład pochodzi z Confluent Kafka Go Github , możesz jedynie ustawić wartość auto.offset.reset do kafka.OffsetBeginning.String():

package main

/**
 * Copyright 2016 Confluent Inc.
 */

// consumer_example implements a consumer using the non-channel Poll() API
// to retrieve messages and events.

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "os"
    "os/signal"
    "syscall"
)

func main() {

    broker := "YOUR_BROKER"
    group := "YOUR_GROUP"
    topics := "YOUR_TOPICS"
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":  broker,
        "group.id":           group,
        "session.timeout.ms": 6000,
        "auto.offset.reset":  kafka.OffsetBeginning.String()})

    if err != nil {
        fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
        os.Exit(1)
    }

    fmt.Printf("Created Consumer %v\n", c)

    err = c.SubscribeTopics(topics, nil)

    run := true

    for run == true {
        select {
        case sig := <-sigchan:
            fmt.Printf("Caught signal %v: terminating\n", sig)
            run = false
        default:
            ev := c.Poll(100)
            if ev == nil {
                continue
            }

            switch e := ev.(type) {
            case *kafka.Message:
                fmt.Printf("%% Message on %s:\n%s\n",
                    e.TopicPartition, string(e.Value))
                if e.Headers != nil {
                    fmt.Printf("%% Headers: %v\n", e.Headers)
                }
            case kafka.Error:
                // Errors should generally be considered as informational, the client will try to automatically recover
                fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
            default:
                fmt.Printf("Ignored %v\n", e)
            }
        }
    }

    fmt.Printf("Closing consumer\n")
    c.Close()
}
1
Amin 25 luty 2019, 14:02