Я пытаюсь запустить конвейер Apache Beam в проекте Spring Boot в Google Data Flow, но эта ошибка не исчезает Failed to construct instance from factory method DataflowRunner#fromOptions(interfaceorg.apache.beam.sdk.options.PipelineOptions
Пример, который я пытаюсь запустить, — это базовое количество слов, указанное в официальной документации https://beam.apache.org/get-started/wordcount-example/. Проблема в том, что в этом примере используются разные классы для каждого примера, и каждый пример имеет свою собственную основную функцию, но я пытался запустить этот пример в проекте загрузки Spring с классом, реализующим CommandLineRunner.
Основной класс Spring загрузки:
@SpringBootApplication
public class BeamApplication {
public static void main(String[] args) {
SpringApplication.run(BeamApplication.class, args);
}}
CommandLineRunner:
@Component
public class Runner implements CommandLineRunner {
@Override
public void run(String[] args) throws Exception {
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
runWordCount(options);
}
static void runWordCount(WordCountOptions options) throws InterruptedException {
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}}
Вариант Wordcount:
public interface WordCountOptions extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("./src/main/resources/input.txt")
String getInputFile();
void setInputFile(String value);
@Description("path of output file")
// @Validation.Required
// @Default.String("./target/ts_output/extracted_words")
@Default.String("Path of the file to write to")
String getOutput();
void setOutput(String value);
}
Выделите слова:
public class ExtractWordsFn extends DoFn<String, String> {
public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";
@ProcessElement
public void processElement(ProcessContext c) {
for (String word : c.element().split(TOKENIZER_PATTERN)) {
if (!word.isEmpty()) {
c.output(word);
}}}}
CountWords:
public class CountWords extends PTransform<PCollection<String>,PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines){
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(
ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts =
words.apply(Count.perElement());
return wordCounts;
}}
Когда я использую средство запуска Direct, проект работает должным образом и генерирует файлы в корневом каталоге проекта, но когда я пытаюсь использовать средство запуска потока данных Google, передавая эти аргументы --runner=DataflowRunner --project=datalake-ng --stagingLocation=gs://data_transformer/staging/ --output=gs://data_transformer/output
(при использовании java -jar или Intellij). я получаю ошибку, упомянутую в начале моего сообщения.
Я использую Java 11 и после просмотра этого Не удалось создать экземпляр из фабричного метода DataflowRunner # fromOptions в beamSql, apache beam. Я попытался перенести свой код в новый загрузочный проект Java 8 Spring, но ошибка осталась той же.
При запуске проекта, представленного в документации Apache Beam (классы с разными сетями), он отлично работает с потоком данных Google, и я могу видеть сгенерированный результат в ведре Google. и мой WordCountOptions
интерфейс такой же, как и в официальной документации.
Может ли проблема быть вызвана CommandLineRunner
? Я думал, что аргументы не принимаются приложением, но когда я отладил эту строку,
WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
Переменная options
имеет правильные значения, которые равны --runner=DataflowRunner --project=target-datalake-ng --stagingLocation=gs://data_transformer/staging/ --output=gs://data_transformer/output
.
РЕДАКТИРОВАТЬ:
Я выяснил, что причиной ошибки является проблема с аутентификацией gcloud и доступом к облачному ведру Google (Anonymous caller does not have storage.buckets.list access to project 961543751
). Я дважды проверил доступ, и он должен быть установлен правильно, поскольку он отлично работает в примере проекта Beam по умолчанию. Я отозвал весь доступ и снова настроил его, но проблема осталась. я взглянул на эти https://github.com/googleapis/google-cloud-node/issues/2456 https://github.com/googleapis/google-cloud-ruby/issues/1588, и я все еще пытаюсь определить проблему, но на данный момент это похоже на проблему с зависимой версией.
Да, на основе этой ошибки похоже на проблему аутентификации. Для работы Dataflow необходимо установить переменную среды GOOGLE_APPLICATION_CREDENTIALS, как указано здесь: cloud .google.com / dataflow / docs / quickstarts / Возможно ли, что вы запускаете оболочку, где это не установлено? — person med.b schedule 06.08.2019
Да, я уже выполнил шаги, указанные в этой ссылке, и файл GOOGLE_APPLICATION_CREDENTIALS JSON
должен быть установлен правильно (я дважды пытался его сгенерировать). Я больше думаю, что это проблема зависимости. Мой вопрос: может ли зависимость в Maven, отличная от beam-runners-direct-java
или beam-runners-google-cloud-dataflow-java
, не использоваться где-либо в коде, но по-прежнему необходима для правильной работы проекта? потому что в проекте Beam Example (созданном в разделе Get the WordCount code размещенной вами ссылки) есть много других зависимостей, и я не уверен, нужны ли они. — person med.b schedule 07.08.2019
Да, все баночки, от которых зависит Beam, тоже нужны. При нормальном исполнении все jar-файлы и jar-файлы Beam зависят от того, будут ли они размещены в GCS для выполнения конвейера. — person med.b schedule 07.08.2019