Мне нужно прочитать файл AVRO из облачного хранилища, а затем записать запись в большую таблицу с ключом строки и AVRO в виде байтов в ячейке столбца .. Я использую AVROIO.read для чтения данных как GenericRecord .. Как сделать я применяю функцию pardo, чтобы преобразовать данные во что-то, что можно записать в bigtable
// Read AVRO from GCS
pipeline
.apply("Read from Avro",
AvroIO
.readGenericRecords(schema)
.from(options.getInputFilePattern()))
//.apply - pardo transformation
.apply("Write to Bigtable", write);
Любая помощь на втором этапе конвейера будет очень признательна.
Обновление:
Спасибо, Антон за быструю помощь, теперь я понимаю, что мне нужно делать, и придумал ниже для pardo
pipeline
.apply("Read from Avro",
AvroIO
.readGenericRecords(schema)
.from(options.getInputFilePattern()))
.apply(ParDo.of(new DoFn<GenericRecord, Iterable<Mutation> >() {
@ProcessElement
public void processElement(ProcessContext c) {
GenericRecord gen = c.element();
byte[] fieldNameByte = null;
byte[] fieldValueByte = null;
// ImmutableList.Builder<Mutation> mutations = ImmutableList.builder();
for (Schema.Field field : fields) {
try {
String fieldName = field.name();
fieldNameByte = fieldName.getBytes("UTF-8");
String value = String.valueOf(gen.get(fieldName));
fieldValueByte = value.getBytes("UTF-8");
} catch (Exception e) {
e.printStackTrace();
}
Iterable<Mutation> mutations =
ImmutableList.of(
Mutation.newBuilder()
.setSetCell(
Mutation.SetCell.newBuilder()
.setValue(
ByteString.copyFrom(fieldValueByte))
.setFamilyName(COLUMN_FAMILY_NAME))
.build());
c.output(,mutations));
}
}
}))
.apply("Write to Bigtable", write);
return pipeline.run();
Это всего лишь псевдокод, и я только учусь и пробую. Мне нужна помощь по добавлению мутаций в ProcessContext и написанию. Пожалуйста, взгляните и дайте мне знать, в правильном ли я направлении и как это сделать. я добавляю мутацию в контекст
Что-то в этом роде:
Скопировано с Тест интеграции Bigtable.
Также здесь Beam doc по
ParDo
в целом и вот javadoc дляBigtableIO
есть объяснение.Спасибо, Антон .. Я посмотрел на пример и все еще не понимаю, как мне перебрать общую запись AVRO и преобразовать значения как мутации, которые затем можно записать в BigTable. public void processElement (ProcessContext ctx) {GenericRecord genericRecord = ctx.element (); Схема схемы = новый Schema.Parser (). Parse (schemaJson); Мне нужна помощь в понимании преобразования genericRecord в мутации (извлечение байтов из записи avro), которые можно вставить в столбцы BigTable. — person Joe; 12.12.2018
Я не уверен, что полностью понимаю. Чтобы получить значения из общей записи, вы используете
genericRecord.get("field_name")
, который дает вам объект. Затем вам нужно преобразовать его в байтовые строки в зависимости от того, что вы храните в BigTable. И эта часть — ваша бизнес-логика, вы сами определяете, как вы хотите, чтобы ваши объекты были сериализованы. Вы можете попробовать использовать вспомогательные классы, которые используют другие люди, если они подходят вашему варианту использования, например: github.com/apache/beam/blob/ — person Joe; 12.12.2018Если у вас есть вопросы о том, как сериализовать объекты в целом, вы должны прочитать эту тему, например вот пример того, как преобразовать объект в массив байтов: stackoverflow.com/questions/2836646/ (это пример, вы выбираете, как будет представлен ваш объект). Затем вы можете использовать
ByteStrings.copyFrom(byteArray)
, если вам это нужно. — person Joe; 12.12.2018Антон, я отредактировал свой исходный вопрос с обновлением .. Пожалуйста, посмотрите и оставьте отзыв — person Joe; 13.12.2018
Ваш подход в целом имеет смысл. Следующий шаг зависит от того, как должен выглядеть ваш BigTable. Однако следует отметить, что в вашем примере для каждого поля вашего входного объекта Avro вы испускаете отдельную мутацию ячейки —
c.output(,mutations)
, не уверен, что это то, что вы намереваетесь. Другое дело, что вам нужно создать пару ключ-значение из вашегоParDo
, ключ байтовой строки и коллекцию значений мутаций, поэтому вы должны выбрать ключ (например, одно из полей вашего входного объекта Avro) и построитьKV
, как в моем примере. — person Joe; 13.12.2018И вы, вероятно, захотите выдать один
KV
для всего ввода Avro, например. вместо того, чтобы испускатьKV
с мутацией для каждого поля, соберите мутации для всех полей, а затем отправьте в одинKV
. Это может быть не то, что вы намереваетесь, это зависит от вашей бизнес-логики. — person Joe; 13.12.2018Спасибо, Антон .. Единый KV со всеми мутациями, этот ключ байтовой строки является ключом строки большой таблицы? — person Joe; 13.12.2018