Вопросы

Луч — Прочтите AVRO и преобразуйте

Мне нужно прочитать файл AVRO из облачного хранилища, а затем записать запись в большую таблицу с ключом строки и AVRO в виде байтов в ячейке столбца .. Я использую AVROIO.read для чтения данных как GenericRecord .. Как сделать я применяю функцию pardo, чтобы преобразовать данные во что-то, что можно записать в bigtable

// Read AVRO from GCS

pipeline
  .apply("Read from Avro",
    AvroIO
       .readGenericRecords(schema)
       .from(options.getInputFilePattern()))

//.apply - pardo transformation 

.apply("Write to Bigtable", write);

Любая помощь на втором этапе конвейера будет очень признательна.

Обновление:

Спасибо, Антон за быструю помощь, теперь я понимаю, что мне нужно делать, и придумал ниже для pardo

 pipeline
   .apply("Read from Avro",
               AvroIO
                 .readGenericRecords(schema)
                 .from(options.getInputFilePattern()))
   .apply(ParDo.of(new DoFn<GenericRecord,  Iterable<Mutation> >() {
       @ProcessElement
       public void processElement(ProcessContext c) {
            GenericRecord gen = c.element();
            byte[] fieldNameByte = null;
            byte[] fieldValueByte = null;

            // ImmutableList.Builder<Mutation> mutations = ImmutableList.builder();
            for (Schema.Field field : fields) {

                try {
                   String fieldName = field.name();
                   fieldNameByte = fieldName.getBytes("UTF-8");
                   String value = String.valueOf(gen.get(fieldName));
                   fieldValueByte = value.getBytes("UTF-8");
                } catch (Exception e) {
                   e.printStackTrace();
                }

                Iterable<Mutation> mutations =
                  ImmutableList.of(
                     Mutation.newBuilder()
                         .setSetCell(
                           Mutation.SetCell.newBuilder()
                              .setValue(
                                   ByteString.copyFrom(fieldValueByte))
                               .setFamilyName(COLUMN_FAMILY_NAME))
                         .build());
                c.output(,mutations));
              }
          }
       }))
   .apply("Write to Bigtable", write);
 return pipeline.run();

Это всего лишь псевдокод, и я только учусь и пробую. Мне нужна помощь по добавлению мутаций в ProcessContext и написанию. Пожалуйста, взгляните и дайте мне знать, в правильном ли я направлении и как это сделать. я добавляю мутацию в контекст

Читать:
Установите зависимости apt-get в Google Dataflow с помощью Beam Java SDK

Похожие записи

Различные регионы для Firestore DB и Firebase Cloud Functions

admin

как переопределить шаблон генератора драгоценных камней в приложении Rails

admin

Flutter / Dart — Парсинг JSON на модель

admin

Примечание: неопределенный индекс: родительский при добавлении множественного выбора в админке Sonata

admin

Есть ли размер операнда по умолчанию в архитектуре x86-64 (AMD64)?

admin

Интеграция Wix с MSBuild

admin