KafkaコンシューマツールがDockerのクラスタに接続されていない

KafkaコンシューマツールがDockerのクラスタに接続されていない

KafkaコンシューマツールがDockerのクラスタに接続されていない

注文する

docker-compose -f docker-compose.yaml up --force-recreate --remove-orphans 

Docker Composeコマンドは3つの仮想マシン(zookeeper、、、kafka)を起動しますkafka-create-topics。 3番目の仮想マシンは新しいトピックを作成topicし、ここにメッセージをプッシュしてから使用しようとします。最初の2つのコマンドは機能し、エラーが発生しないようです。ところがkafka-console-consumer失敗したようだ。どの設定パラメータを使用しても関係ありません

  • cub kafka-ready- 成功
  • kafka-topics- 成功
  • kafka-console-producer- 成功
  • kafka-console-consumer- 毎回失敗

間違い

kafka-create-topics_1  | [2019-10-07 21:03:00,556] WARN [Consumer clientId=consumer-1, groupId=console-consumer-58118] Connection to node -1 (/172.21.0.3:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)     
kafka-create-topics_1  | [2019-10-07 20:58:24,047] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)                                                         
kafka-create-topics_1  | org.apache.kafka.common.errors.TimeoutException                                                                                                                                 
kafka-create-topics_1  | Processed a total of 0 messages  

2つのファイル

kafka-sample-generator.sh
docker-compose.yaml

ドッカーファイルdocker-compose.yaml

version: '2'
services:

  zookeeper:
    image: confluentinc/cp-zookeeper:5.3.1
    hostname: zookeeper
    ports:
      - '2181:2181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-enterprise-kafka:5.3.1
    hostname: kafka
    ports:
      - '9092:9092'
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:9092
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  kafka-create-topics:
    image: confluentinc/cp-enterprise-kafka:5.3.1
    depends_on:
      - zookeeper
      - kafka
    hostname: kafka-create-topics
    volumes:
      - './kafka-sample-generator.sh:/bin/kafka-sample-generator.sh'
    command: "/bin/kafka-sample-generator.sh"
    environment:
      KAFKA_TOPIC: topic
      KAFKA_BROKER: kafka

サンプルジェネレータスクリプトkafka-sample-generator.sh

#!/bin/sh

HOST=`host $KAFKA_BROKER | awk '/has address/ { print $4 ; exit }'`

## Wait until Kafka is ready then create demo topic
echo 'Waiting for Kafka to be ready...'
cub kafka-ready -b $HOST:9092 1 20 && \
sleep 1

echo "Creating Topic [$HOST:9092 <topic:'$KAFKA_TOPIC'>]"
kafka-topics --create                   \
             --topic $KAFKA_TOPIC       \ 
             --if-not-exists            \
             --zookeeper zookeeper:2181 \
             --partitions 1             
             #--replication-factor 1
sleep 1

echo "Availalbe Topics"
kafka-topics --list --zookeeper zookeeper:2181
sleep 1

## Emit sample data stream
while true
    do echo "Sending Data [$HOST:9092 <topic:'$KAFKA_TOPIC'>]"
    for i in `seq 1 10`;
        do echo "$HOST"
        DATA="{\"data\":\"sample-data-$i\"}"
        echo "$DATA"
        kafka-console-producer       \
            --broker-list $HOST:9092 \
            --topic $KAFKA_TOPIC     \
            "$DATA"
    done
    sleep 1.0


    echo ''
    echo "Receiving Data [$HOST:9092 <topic:'$KAFKA_TOPIC'>]"

    ## FAILES HERE
    ## FAILES HERE
    ## FAILES HERE
    kafka-console-consumer                     \
        --bootstrap-server $HOST:9092          \
        --topic $KAFKA_TOPIC                  #\
        #--partition 0                         \
        #--from-beginning                      \
        #--max-messages 1                      \
        #--timeout-ms 45000                    \
        #--skip-message-on-error
done

答え1

動作するように変更した内容は次のとおりです。

  • インジケーターを無効にします(複製引数が3のトピックを生成しようとしました)。
  • レプリケーションファクタ1を使用するようにkafka-topics createコマンドを変更しました。
  • STDIN経由でkafka-console-Producerにメッセージデータを渡す
  • kafka-console-consumerは最初から最大10個のメッセージを受信するため、新しいメッセージを待つことをブロックしません。
version: '2'
services:

  zookeeper:
    image: confluentinc/cp-zookeeper:5.3.1
    hostname: zookeeper
    ports:
      - '2181:2181'
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-enterprise-kafka:5.3.1
    hostname: kafka
    ports:
      - '9092:9092'
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: LISTENER_BOB://kafka:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
      CONFLUENT_METRICS_ENABLE: 'false'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_BOB:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_BOB  
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafka-create-topics:
    image: confluentinc/cp-enterprise-kafka:5.3.1
    depends_on:
      - zookeeper
      - kafka
    hostname: kafka-create-topics
    volumes:
      - './kafka-sample-generator.sh:/bin/kafka-sample-generator.sh'
    command: "/bin/kafka-sample-generator.sh"
    environment:
      KAFKA_TOPIC: topic
      KAFKA_BROKER: kafka
#!/bin/sh

HOST=`host $KAFKA_BROKER | awk '/has address/ { print $4 ; exit }'`

## Wait until Kafka is ready then create demo topic
echo 'Waiting for Kafka to be ready...'
cub kafka-ready -b $HOST:9092 1 20 && \
sleep 1

echo "Creating Topic [$HOST:9092 <topic:'$KAFKA_TOPIC'>]"
kafka-topics --create  --topic $KAFKA_TOPIC   --if-not-exists --zookeeper zookeeper:2181 --partitions 1  --replication-factor 1
sleep 1

echo "Availalbe Topics"
kafka-topics --list --zookeeper zookeeper:2181
sleep 1

## Emit sample data stream
while true
    do echo "Sending Data [$HOST:9092 <topic:'$KAFKA_TOPIC'>]"
    for i in `seq 1 10`;
    do

        echo "$HOST"
        echo "$DATA"

        DATA="{\"data\":\"sample-data-$i\"}"

        echo "$DATA" | kafka-console-producer   \
            --broker-list $HOST:9092            \
            --topic $KAFKA_TOPIC     
    done
    sleep 1.0

    echo ''
    echo "Receiving Data [$HOST:9092 <topic:'$KAFKA_TOPIC'>]"

kafka-console-consumer              \
    --bootstrap-server $HOST:9092   \
    --topic $KAFKA_TOPIC            \
    --from-beginning                \
    --max-messages 10

done

関連情報