Луч — Прочтите AVRO и преобразуйте

Мне нужно прочитать файл AVRO из облачного хранилища, а затем записать запись в большую таблицу с ключом строки и AVRO в виде байтов в ячейке столбца .. Я использую AVROIO.read для чтения данных как GenericRecord .. Как сделать я применяю функцию pardo, чтобы преобразовать данные во что-то, что можно записать в bigtable

// Read AVRO from GCS

pipeline
  .apply("Read from Avro",
    AvroIO
       .readGenericRecords(schema)
       .from(options.getInputFilePattern()))

//.apply - pardo transformation 

.apply("Write to Bigtable", write);

Любая помощь на втором этапе конвейера будет очень признательна.

Обновление:

Спасибо, Антон за быструю помощь, теперь я понимаю, что мне нужно делать, и придумал ниже для pardo

 pipeline
   .apply("Read from Avro",
               AvroIO
                 .readGenericRecords(schema)
                 .from(options.getInputFilePattern()))
   .apply(ParDo.of(new DoFn<GenericRecord,  Iterable<Mutation> >() {
       @ProcessElement
       public void processElement(ProcessContext c) {
            GenericRecord gen = c.element();
            byte[] fieldNameByte = null;
            byte[] fieldValueByte = null;

            // ImmutableList.Builder<Mutation> mutations = ImmutableList.builder();
            for (Schema.Field field : fields) {

                try {
                   String fieldName = field.name();
                   fieldNameByte = fieldName.getBytes("UTF-8");
                   String value = String.valueOf(gen.get(fieldName));
                   fieldValueByte = value.getBytes("UTF-8");
                } catch (Exception e) {
                   e.printStackTrace();
                }

                Iterable<Mutation> mutations =
                  ImmutableList.of(
                     Mutation.newBuilder()
                         .setSetCell(
                           Mutation.SetCell.newBuilder()
                              .setValue(
                                   ByteString.copyFrom(fieldValueByte))
                               .setFamilyName(COLUMN_FAMILY_NAME))
                         .build());
                c.output(,mutations));
              }
          }
       }))
   .apply("Write to Bigtable", write);
 return pipeline.run();

Это всего лишь псевдокод, и я только учусь и пробую. Мне нужна помощь по добавлению мутаций в ProcessContext и написанию. Пожалуйста, взгляните и дайте мне знать, в правильном ли я направлении и как это сделать. я добавляю мутацию в контекст

См. также:  Как создать конвейер шаблона потока данных с помощью Beam 2.0?
Понравилась статья? Поделиться с друзьями:
IT Шеф
Комментарии: 1
  1. Joe

    Что-то в этом роде:

    Pipeline p = Pipeline.create(options);
    p.apply(GenerateSequence.from(0).to(numRows))
     .apply(
         ParDo.of(new DoFn<Long, KV<ByteString, Iterable<Mutation>>>() {
             @ProcessElement
             public void processElement(ProcessContext c) {
                 int index = c.element().intValue();
    
                 Iterable<Mutation> mutations =
                    ImmutableList.of(
                       Mutation.newBuilder()
                               .setSetCell(Mutation.SetCell.newBuilder()
                               .setValue(testData.get(index).getValue())
                               .setFamilyName(COLUMN_FAMILY_NAME))
                               .build());
                 c.output(KV.of(testData.get(index).getKey(), mutations));
             }
         }))
     .apply(
        BigtableIO
          .write()
          .withBigtableOptions(bigtableOptions)
          .withTableId(tableId));
    

    Скопировано с Тест интеграции Bigtable.

    Также здесь Beam doc по ParDo в целом и вот javadoc для BigtableIO есть объяснение.

    Спасибо, Антон .. Я посмотрел на пример и все еще не понимаю, как мне перебрать общую запись AVRO и преобразовать значения как мутации, которые затем можно записать в BigTable. public void processElement (ProcessContext ctx) {GenericRecord genericRecord = ctx.element (); Схема схемы = новый Schema.Parser (). Parse (schemaJson); Мне нужна помощь в понимании преобразования genericRecord в мутации (извлечение байтов из записи avro), которые можно вставить в столбцы BigTable. person Joe; 12.12.2018

    Я не уверен, что полностью понимаю. Чтобы получить значения из общей записи, вы используете genericRecord.get("field_name"), который дает вам объект. Затем вам нужно преобразовать его в байтовые строки в зависимости от того, что вы храните в BigTable. И эта часть — ваша бизнес-логика, вы сами определяете, как вы хотите, чтобы ваши объекты были сериализованы. Вы можете попробовать использовать вспомогательные классы, которые используют другие люди, если они подходят вашему варианту использования, например: github.com/apache/beam/blob/ person Joe; 12.12.2018

    Если у вас есть вопросы о том, как сериализовать объекты в целом, вы должны прочитать эту тему, например вот пример того, как преобразовать объект в массив байтов: stackoverflow.com/questions/2836646/ (это пример, вы выбираете, как будет представлен ваш объект). Затем вы можете использовать ByteStrings.copyFrom(byteArray), если вам это нужно. person Joe; 12.12.2018

    Антон, я отредактировал свой исходный вопрос с обновлением .. Пожалуйста, посмотрите и оставьте отзыв person Joe; 13.12.2018

    Ваш подход в целом имеет смысл. Следующий шаг зависит от того, как должен выглядеть ваш BigTable. Однако следует отметить, что в вашем примере для каждого поля вашего входного объекта Avro вы испускаете отдельную мутацию ячейки — c.output(,mutations), не уверен, что это то, что вы намереваетесь. Другое дело, что вам нужно создать пару ключ-значение из вашего ParDo, ключ байтовой строки и коллекцию значений мутаций, поэтому вы должны выбрать ключ (например, одно из полей вашего входного объекта Avro) и построить KV, как в моем примере. person Joe; 13.12.2018

    И вы, вероятно, захотите выдать один KV для всего ввода Avro, например. вместо того, чтобы испускать KV с мутацией для каждого поля, соберите мутации для всех полей, а затем отправьте в один KV. Это может быть не то, что вы намереваетесь, это зависит от вашей бизнес-логики. person Joe; 13.12.2018

    Спасибо, Антон .. Единый KV со всеми мутациями, этот ключ байтовой строки является ключом строки большой таблицы? person Joe; 13.12.2018

Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: