Запуск конвейера Apache Beam в проекте Spring Boot в потоке данных Google

Я пытаюсь запустить конвейер Apache Beam в проекте Spring Boot в Google Data Flow, но эта ошибка не исчезает Failed to construct instance from factory method DataflowRunner#fromOptions(interfaceorg.apache.beam.sdk.options.PipelineOptions

Пример, который я пытаюсь запустить, — это базовое количество слов, указанное в официальной документации https://beam.apache.org/get-started/wordcount-example/. Проблема в том, что в этом примере используются разные классы для каждого примера, и каждый пример имеет свою собственную основную функцию, но я пытался запустить этот пример в проекте загрузки Spring с классом, реализующим CommandLineRunner.

Основной класс Spring загрузки:

 @SpringBootApplication
  public class BeamApplication {
public static void main(String[] args) {
    SpringApplication.run(BeamApplication.class, args);
}}  

CommandLineRunner:

@Component
public class Runner implements CommandLineRunner {
@Override
public void run(String[] args) throws Exception {

    WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class);
    runWordCount(options);
}

static void runWordCount(WordCountOptions options) throws InterruptedException {

    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
            .apply(new CountWords())
            .apply(MapElements.via(new FormatAsTextFn()))
            .apply("WriteCounts", TextIO.write().to(options.getOutput()));
    p.run().waitUntilFinish();
}}

Вариант Wordcount:

public interface WordCountOptions extends PipelineOptions {

@Description("Path of the file to read from")
@Default.String("./src/main/resources/input.txt")
String getInputFile();
void setInputFile(String value);

@Description("path of output file")
// @Validation.Required
// @Default.String("./target/ts_output/extracted_words")
@Default.String("Path of the file to write to")
String getOutput();
void setOutput(String value);
}

Выделите слова:

public class ExtractWordsFn extends DoFn<String, String> {
   public static final String TOKENIZER_PATTERN = "[^\\p{L}]+";

@ProcessElement
public void processElement(ProcessContext c) {
    for (String word : c.element().split(TOKENIZER_PATTERN)) {
        if (!word.isEmpty()) {
            c.output(word);
        }}}}

CountWords:

public  class CountWords extends    PTransform<PCollection<String>,PCollection<KV<String, Long>>> {

@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines){
    // Convert lines of text into individual words.
    PCollection<String> words = lines.apply(
            ParDo.of(new ExtractWordsFn()));

    // Count the number of times each word occurs.
    PCollection<KV<String, Long>> wordCounts =
            words.apply(Count.perElement());

    return wordCounts;
}}

Когда я использую средство запуска Direct, проект работает должным образом и генерирует файлы в корневом каталоге проекта, но когда я пытаюсь использовать средство запуска потока данных Google, передавая эти аргументы --runner=DataflowRunner --project=datalake-ng --stagingLocation=gs://data_transformer/staging/ --output=gs://data_transformer/output (при использовании java -jar или Intellij). я получаю ошибку, упомянутую в начале моего сообщения.

См. также:  Обработка примитивных типов объединения в (де) сериализации JSON с помощью Джексона в Java

Я использую Java 11 и после просмотра этого Не удалось создать экземпляр из фабричного метода DataflowRunner # fromOptions в beamSql, apache beam. Я попытался перенести свой код в новый загрузочный проект Java 8 Spring, но ошибка осталась той же.

При запуске проекта, представленного в документации Apache Beam (классы с разными сетями), он отлично работает с потоком данных Google, и я могу видеть сгенерированный результат в ведре Google. и мой WordCountOptions интерфейс такой же, как и в официальной документации.

Может ли проблема быть вызвана CommandLineRunner? Я думал, что аргументы не принимаются приложением, но когда я отладил эту строку,

WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(WordCountOptions.class); 

Переменная options имеет правильные значения, которые равны --runner=DataflowRunner --project=target-datalake-ng --stagingLocation=gs://data_transformer/staging/ --output=gs://data_transformer/output.

РЕДАКТИРОВАТЬ:

Я выяснил, что причиной ошибки является проблема с аутентификацией gcloud и доступом к облачному ведру Google (Anonymous caller does not have storage.buckets.list access to project 961543751). Я дважды проверил доступ, и он должен быть установлен правильно, поскольку он отлично работает в примере проекта Beam по умолчанию. Я отозвал весь доступ и снова настроил его, но проблема осталась. я взглянул на эти https://github.com/googleapis/google-cloud-node/issues/2456 https://github.com/googleapis/google-cloud-ruby/issues/1588, и я все еще пытаюсь определить проблему, но на данный момент это похоже на проблему с зависимой версией.

Да, на основе этой ошибки похоже на проблему аутентификации. Для работы Dataflow необходимо установить переменную среды GOOGLE_APPLICATION_CREDENTIALS, как указано здесь: cloud .google.com / dataflow / docs / quickstarts / Возможно ли, что вы запускаете оболочку, где это не установлено?   —  person med.b    schedule 06.08.2019

Да, я уже выполнил шаги, указанные в этой ссылке, и файл GOOGLE_APPLICATION_CREDENTIALS JSON должен быть установлен правильно (я дважды пытался его сгенерировать). Я больше думаю, что это проблема зависимости. Мой вопрос: может ли зависимость в Maven, отличная от beam-runners-direct-java или beam-runners-google-cloud-dataflow-java, не использоваться где-либо в коде, но по-прежнему необходима для правильной работы проекта? потому что в проекте Beam Example (созданном в разделе Get the WordCount code размещенной вами ссылки) есть много других зависимостей, и я не уверен, нужны ли они.   —  person med.b    schedule 07.08.2019

См. также:  Как блокчейн проверяет действительность суммы транзакции

Да, все баночки, от которых зависит Beam, тоже нужны. При нормальном исполнении все jar-файлы и jar-файлы Beam зависят от того, будут ли они размещены в GCS для выполнения конвейера.   —  person med.b    schedule 07.08.2019

Понравилась статья? Поделиться с друзьями:
IT Шеф
Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: