Я запускаю несколько экземпляров Apache Beam KafkaIO с помощью DirectRunner, которые читаются из той же темы. Но сообщение доставляется во все запущенные экземпляры. После просмотра конфигурации Kafka, которую я нашел, к имени группы добавляется уникальный префикс, и каждый экземпляр имеет уникальное имя группы.
- group.id = Reader-0_offset_consumer_559337182_ моя_группа
- group.id = Reader-0_offset_consumer_559337345_ my_group
Таким образом, каждому экземпляру назначен уникальный group.id, и поэтому сообщения доставляются во все экземпляры.
pipeline.apply("ReadFromKafka", KafkaIO.<String, String>read().withReadCommitted()
.withConsumerConfigUpdates(
new ImmutableMap.Builder<String, Object>().put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group")
.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5).build())
.withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class)
.withBootstrapServers(servers).withTopics(Collections.singletonList(topicName)).withoutMetadata()
Итак, какую конфигурацию я должен предоставить, чтобы все потребители в группе не читали одно и то же сообщение
В чем причина для запуска нескольких экземпляров KafkaIO с DirectRunner и чтения из одной и той же темы? — person Aditya schedule 21.07.2020
@AlexeyRomanenko, мы не используем GCP и запускаем его на собственном голом железе. поэтому мы не можем использовать поток данных. Итак, мы хотим масштабироваться за счет развертывания в модуле k8s и увеличения количества модулей. Но проблема здесь в том, что я вижу, так как каждому экземпляру назначается уникальный groupId, когда когда-либо я отправляю сообщение, сообщение отправляется во все группы / экземпляры. Надеюсь, это проясняет проблему — person Aditya schedule 23.07.2020
Я бы не рекомендовал вам использовать DirectRunner в производстве для значительного объема данных, поскольку этот бегун должен использоваться в основном для тестирования, он содержит и выполняет множество дополнительных проверок во время работы конвейера, поэтому он может быть довольно медленным по сравнению с другими бегуны. Можно ли использовать бегуны Spark или Flink поверх распределенных кластеров Spark или Flink? — person Aditya schedule 23.07.2020
@AlexeyRomanenko Нет, на данный момент у нас нет возможности использовать Spark of Flink. Кроме того, отмените отрицательный голос, так как это допустимый сценарий. — person Aditya schedule 27.07.2020
Я не голосовал отрицательно, но поставил +1 к вашему посту. Я ожидаю, что у людей могут быть разные случаи, я просто рекомендую, как их лучше использовать. — person Aditya schedule 27.07.2020
@Aditya, вы когда-нибудь придумывали решение этой проблемы? У меня похожая ситуация. В моем случае я хочу, чтобы тот же groupId оставался после перезапуска на случай, если Beam Job выйдет из строя. Будем признательны за любые выводы с вашей стороны. Спасибо — person Aditya schedule 18.12.2020
@ user3693309 Мы перешли в поток данных. — person Aditya schedule 19.12.2020
Да, это происходит потому, что к имени группы добавляется уникальный префикс, и каждый экземпляр имеет уникальное имя группы. Из-за этого кафка не знает, запускаете ли вы еще один экземпляр. Следовательно, всем потребителям доставляются одни и те же сообщения.
Следовательно, я мог придумать один способ решения проблемы: вместо того, чтобы указывать тему и позволять лучу определять количество потребителей для всех разделов, вы можете явно указать разделы темы для каждого экземпляра apache beam KafkaIO с помощью DirectRunner.
Вам нужно будет передать
List
типаTopicPartition
методуwithTopicPartitions
.Приведенный выше код будет читать сообщения только от
partition 0
. Следовательно, таким образом вы можете запускать несколько экземпляров одной и той же программы, не доставляя одинаковые сообщения всем потребителям.