Потребителям Apache Beam KafkaIO в группе потребителей назначается уникальный идентификатор группы

Я запускаю несколько экземпляров Apache Beam KafkaIO с помощью DirectRunner, которые читаются из той же темы. Но сообщение доставляется во все запущенные экземпляры. После просмотра конфигурации Kafka, которую я нашел, к имени группы добавляется уникальный префикс, и каждый экземпляр имеет уникальное имя группы.

  1. group.id = Reader-0_offset_consumer_559337182_ моя_группа
  2. group.id = Reader-0_offset_consumer_559337345_ my_group

Таким образом, каждому экземпляру назначен уникальный group.id, и поэтому сообщения доставляются во все экземпляры.

pipeline.apply("ReadFromKafka", KafkaIO.<String, String>read().withReadCommitted()
            .withConsumerConfigUpdates(
                    new ImmutableMap.Builder<String, Object>().put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
                            .put(ConsumerConfig.GROUP_ID_CONFIG, "my_group")
                            .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5).build())
            .withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class)
            .withBootstrapServers(servers).withTopics(Collections.singletonList(topicName)).withoutMetadata()

Итак, какую конфигурацию я должен предоставить, чтобы все потребители в группе не читали одно и то же сообщение

В чем причина для запуска нескольких экземпляров KafkaIO с DirectRunner и чтения из одной и той же темы?   —  person Aditya    schedule 21.07.2020

@AlexeyRomanenko, мы не используем GCP и запускаем его на собственном голом железе. поэтому мы не можем использовать поток данных. Итак, мы хотим масштабироваться за счет развертывания в модуле k8s и увеличения количества модулей. Но проблема здесь в том, что я вижу, так как каждому экземпляру назначается уникальный groupId, когда когда-либо я отправляю сообщение, сообщение отправляется во все группы / экземпляры. Надеюсь, это проясняет проблему   —  person Aditya    schedule 23.07.2020

Я бы не рекомендовал вам использовать DirectRunner в производстве для значительного объема данных, поскольку этот бегун должен использоваться в основном для тестирования, он содержит и выполняет множество дополнительных проверок во время работы конвейера, поэтому он может быть довольно медленным по сравнению с другими бегуны. Можно ли использовать бегуны Spark или Flink поверх распределенных кластеров Spark или Flink?   —  person Aditya    schedule 23.07.2020

См. также:  Возникли проблемы с перемещением объекта на экране в Python с помощью pygame

@AlexeyRomanenko Нет, на данный момент у нас нет возможности использовать Spark of Flink. Кроме того, отмените отрицательный голос, так как это допустимый сценарий.   —  person Aditya    schedule 27.07.2020

Я не голосовал отрицательно, но поставил +1 к вашему посту. Я ожидаю, что у людей могут быть разные случаи, я просто рекомендую, как их лучше использовать.   —  person Aditya    schedule 27.07.2020

@Aditya, вы когда-нибудь придумывали решение этой проблемы? У меня похожая ситуация. В моем случае я хочу, чтобы тот же groupId оставался после перезапуска на случай, если Beam Job выйдет из строя. Будем признательны за любые выводы с вашей стороны. Спасибо   —  person Aditya    schedule 18.12.2020

@ user3693309 Мы перешли в поток данных.   —  person Aditya    schedule 19.12.2020

Понравилась статья? Поделиться с друзьями:
IT Шеф
Комментарии: 1
  1. Aditya

    Да, это происходит потому, что к имени группы добавляется уникальный префикс, и каждый экземпляр имеет уникальное имя группы. Из-за этого кафка не знает, запускаете ли вы еще один экземпляр. Следовательно, всем потребителям доставляются одни и те же сообщения.

    Следовательно, я мог придумать один способ решения проблемы: вместо того, чтобы указывать тему и позволять лучу определять количество потребителей для всех разделов, вы можете явно указать разделы темы для каждого экземпляра apache beam KafkaIO с помощью DirectRunner.

    Вам нужно будет передать List типа TopicPartition методу withTopicPartitions.

    KafkaIO.<String, String>read()
                    .withCreateTime(Duration.standardMinutes(1))
                    .withReadCommitted()
                    .withBootstrapServers(endPoint)
                    .withConsumerConfigUpdates(new ImmutableMap.Builder<String, Object>()
                            .put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
                            .put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5)
                            .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
                            .build())
                    .withTopicPartitions(Arrays.asList(new TopicPartition(topicName, 0)))
                    .withKeyDeserializer(StringDeserializer.class)
                    .withValueDeserializer(StringDeserializer.class)
                    .withoutMetadata();
    

    Приведенный выше код будет читать сообщения только от partition 0. Следовательно, таким образом вы можете запускать несколько экземпляров одной и той же программы, не доставляя одинаковые сообщения всем потребителям.

Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: