Staram się ustawić scenę dokera z Kafka @ Wurstmeister.

Scenariusz : Rozwijam architekturę wielu mikrofonów. W betonie: mam aplikację na wiosnę, która wysyła JSON do mojego brokera Kafka. Usługa kolby zużywa dane. Działa to podczas prowadzenia całego myślenia na zewnątrz dokera. Jestem również w stanie wysłać dane do tematu Kafka w Docker.

kod: Kolba:

KafkaHost = "kafka:9092"
def initkafka():
    # connect to Kafka server and pass the topic we want to consume
    consumer = KafkaConsumer("TEST",
                             group_id='view',
                             bootstrap_servers=[Constants.KafkaHost]
                             )
    KafkaConsumer(auto_offset_reset='latest',
                  enable_auto_commit=False)

    KafkaConsumer(value_deserializer=lambda m: json.loads(m.dedoce('utf-8')))
    KafkaConsumer(consumer_timeout_ms=1000)
    return consumer

Docker Compose:

zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
    networks:
      - test-net

  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      #KAFKA_ADVERTISED_HOST_NAME: 172.17.0.1
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.17.0.1:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "TEST:1:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
    depends_on:
      - zookeeper
    networks:
      - test-net

BŁĄD

Traceback (most recent call last):
  File "run.py", line 1, in <module>
    from controller import Controller
  File "/app/controller/Controller.py", line 27, in <module>
    consumer = KafkaConfig.initkafka()
  File "/app/config/KafkaConfig.py", line 16, in initkafka
    enable_auto_commit=False)
  File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py", line 324, in __init__
    self._client = KafkaClient(metrics=self._metrics, **self.config)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 221, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line 826, in check_version
    raise Errors.NoBrokersAvailable()
kafka.errors.NoBrokersAvailable: NoBrokersAvailable

Myślę, że jest to problem konfiguracji środowiska. Przeczytałem Wurstmeister Doc, ale nie mogę dowiedzieć się, co muszę skonfigurować, aby mój serwis flask znaleźć broker Kafka. Dzienniki twierdzą, że Kafka jest w stanie uruchomić się, tworzony jest temat "Test". Czy muszę skonfigurować słuchacze na e.g. Powiedz, że IP i Port w mojej sieci będzie słuchać Kafki? Ponieważ w Dokumenty Kafka reklamowane.Listeners opisano jako

Słuchacze do publikowania zookeeper dla klientów do użycia, jeśli różni się niż właściwość Słuchajna. W środowiskach IAAS może trzeba różnić od interfejsu, do którego wiąże się broker. Jeśli nie zostanie to ustawione, zostanie użyta wartość słuchaczy. W przeciwieństwie do słuchaczy Nie jest ważne, aby reklamować adres Meta-Meta-Meta-Metare.0.

1
FishingIsLife 27 luty 2019, 16:50

2 odpowiedzi

Najlepsza odpowiedź

Chyba że jestem pomylony, KAFKA_ADVERTISED_LISTENERS musi mieć tę samą wartość, co host Kafka, który definiujesz w kliencie kolby. Tak więc, jeśli podłączasz się do Kafki z kontenera dokującego, powinieneś mieć KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092. Jeśli łączą się z hosta, powinno być KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092.

Alternatywnie, możesz pominąć ustawienie KAFKA_ADVERTISED_LISTENERS i zdefiniować zamiast KAFKA_ADVERTISED_HOST_NAME: kafka.

1
Régis B. 27 luty 2019, 14:28

Przeszedłem więc To wspomniane przez @ Cricket_007. Teraz jest trochę wyraźniejszy, ale nadal walczę z connection. Jako podsumowanie dla mojego scenariusza: prowadzę wszystkie moje usługi i pośrednik wiadomości w tej samej sieci dokującej. Więc nie ma połączenia zewnętrznego. W tym blogu znajduje się przykład podany przykład:

KAFKA_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka0:29092,LISTENER_FRED://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT,LISTENER_FRED:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB

Myślę, że wiem, co oznacza to konfiguracja. W moim przypadku myślałem, że muszę to zmienić w ten sposób:

      KAFKA_LISTENERS: LISTENER_PY://kafka:9092
      KAFKA_ADVERTISED_LISTENERS: LISTENER_PY://kafka:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_PY:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_PY

Myślę, że kafka_inter_broker_listener_name nie jest potrzebny, ponieważ dostałem tylko jeden broker. Ale czy nazwa słuchacza (nasłuchiwa_py) zależy od mojej nazwy serwisowej lub dowolnej innej nieruchomości? O ile rozumiem, mogę użyć "Kafka" jako IP, ponieważ prowadzę Kafkę jako usługę o nazwie "Kafka" w moim zadokometrze. Próbowałem tej konfiguracji, ale nadal nie działa. Zastanawiam się, jak działa z serwisu mojej wiosny, aby połączyć się jako producent bez definiowania dowolnej konfiguracji słuchacza.

0
medTech 28 luty 2019, 07:38