Я изо всех сил пытаюсь заставить запускать триггеры на основе времени события для моего конвейера лучей apache, но, похоже, могу запускать запуск окна со временем обработки.
Мой конвейер довольно прост:
-
Я получаю пакеты точек данных, которые включают отметки времени миллисекундного уровня от чтения pubsub с отметкой времени немного раньше, чем самая ранняя точка пакетных данных. Данные группируются, чтобы уменьшить рабочую нагрузку на стороне клиента и расходы на pubsub.
-
Я извлекаю временные метки второго уровня и применяю временные метки к отдельным точкам данных
-
Я закрываю данные для обработки и избегаю использования глобального окна.
-
Я группирую данные по секундам для последующей категоризации по секундам потоковых данных.
-
В конце концов, я использую скользящие окна в отсортированных по категориям секундах, чтобы условно отправлять одно из двух сообщений в pubsub один раз в секунду.
Моя проблема, похоже, находится на шаге 3.
Я пытаюсь использовать ту же стратегию работы с окнами на этапе 3, которую я в конечном итоге буду использовать на этапе 5, чтобы выполнить расчет скользящего среднего на категоризированных секундах.
Я пробовал возиться с параметрами withTimestampCombiner (TimestampCombiner.EARLIEST), но, похоже, это не решает проблему.
Я читал о методе .withEarlyFirings, используемом для времени события, но похоже, что он имитирует мою существующую работу. В идеале я мог бы полагаться на водяной знак, проходящий через конец окна, и включать позднее срабатывание.
// De-Batching The Pubsub Message
static public class UnpackDataPoints extends DoFn<String,String>{
@ProcessElement
public void processElement(@Element String c, OutputReceiver<String> out) {
JsonArray packedData = new JsonParser().parse(c).getAsJsonArray();
DateTimeFormatter dtf = DateTimeFormat.forPattern("EEE dd MMM YYYY HH:mm:ss:SSS zzz");
for (JsonElement acDataPoint: packedData){
String hereData = acDataPoint.toString();
DateTime date = dtf.parseDateTime(acDataPoint.getAsJsonObject().get("Timestamp").getAsString());
Instant eventTimeStamp = date.toInstant();
out.outputWithTimestamp(hereData,eventTimeStamp);
}
}
}
// Extracting The Second
static public class ExtractTimeStamp extends DoFn<String,KV<String,String>> {
@ProcessElement
public void processElement(ProcessContext ctx ,@Element String c, OutputReceiver<KV<String,String>> out) {
JsonObject accDataObject = new JsonParser().parse(c).getAsJsonObject();
String milliString = accDataObject.get("Timestamp").getAsString();
String secondString = StringUtils.left(milliString,24);
accDataObject.addProperty("noMiliTimeStamp", secondString);
String updatedAccData = accDataObject.toString();
KV<String,String> outputKV = KV.of(secondString,updatedAccData);
out.output(outputKV);
}
}
// The Pipeline & Windowing
Pipeline pipeline = Pipeline.create(options);
PCollection<String> dataPoints = pipeline
.apply("Read from Pubsub", PubsubIO.readStrings()
.fromTopic("projects/????/topics/???")
.withTimestampAttribute("messageTimestamp"))
.apply("Extract Individual Data Points",ParDo.of(new UnpackDataPoints()));
/// This is the event time window that doesn't fire for some reason
/*
PCollection<String> windowedDataPoints = dataPoints.apply(
Window.<String>into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1)))
// .triggering(AfterWatermark.pastEndOfWindow())
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TWO_MINUTES))
//.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(2)))
.discardingFiredPanes()
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.standardSeconds(1)));
*/
///// Temporary Work Around, this does fire but data is out of order
PCollection<String> windowedDataPoints = dataPoints.apply(
Window.<String>into(FixedWindows.of(Duration.standardMinutes(120)))
.triggering(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(5)))
.discardingFiredPanes()
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.standardSeconds(1)));
PCollection<KV<String, String>> TimeStamped = windowedDataPoints
.apply( "Pulling Out The Second For Aggregates", ParDo.of(new ExtractTimeStamp()));
PCollection<KV<String, Iterable<String>>> TimeStampedGrouped = TimeStamped.apply("Group By Key",GroupByKey.create());
PCollection<KV<String, Iterable<String>>> testing = TimeStampedGrouped.apply("testingIsh", ParDo.of(new LogKVIterable()));
Когда я использую первую оконную стратегию, которая закомментирована, мой конвейер работает бесконечно, и получение данных и LogKVIterable ParDo никогда ничего не возвращает, когда я использую время обработки, LogKVIterable запускается и регистрируется в консоли.
Вы проверили, правильно ли установлены ваши временные метки? Возможно, ваш водяной знак не продвигается должным образом, если временные метки не анализируются должным образом … — person Sam-U_L-L-L schedule 28.09.2019
Вы видите, что показатели актуальности данных / системного лага не отстают от вашего конвейера? — person Sam-U_L-L-L schedule 30.09.2019
Это действительно похоже на то, что метка времени, которую вы добавляете к своим данным, может быть неправильной / поврежденной. Я бы посоветовал вам проверить следующее:
Метка времени в ваших элементах добавляется правильно. Добавьте запись в трансформации до и после и тщательно протестируйте этот код.
Показатели «Свежесть данных» и «Системное отставание» в вашем конвейере развиваются так, как вы ожидаете. Если актуальность данных меняется не так, как ожидалось, это явный признак того, что ваша временная метка установлена неправильно.
Запуск по времени обработки отличается от срабатывания по времени события. Во времени обработки нет такого понятия, как поздние данные. Во время событий реальная проблема — обработка запаздывающих данных. Поздние данные в обработке времени события обрабатываются с помощью водяных знаков и триггеров. Чтобы получить отличное руководство по этому поводу, я рекомендую проверить эти две статьи Googler Tyler Akidau: a, b </ а>.
Поскольку в оконном режиме Время обработки нет такой вещи, как поздние данные, имеет смысл, чтобы ваш конвейер Время обработки Apache Beam работал без каких-либо проблем.
Между тем, при работе с окнами Event Time могут появляться запаздывающие данные, и ваши окна и триггеры должны обрабатывать эти сценарии при правильной разработке.
Скорее всего, ваш код конвейера обработки времени не запускается из-за неправильной конфигурации! Я не могу воспроизвести вашу проблему, поскольку ваш водяной знак (для источника Pub / Sub) определяется эвристически. Хотя я рекомендую вам отлаживать свой код следующим образом: Во-первых, увеличивая allowedLatness. Например: до 1 часа. Если это сработает — отлично! Если нет, см. Второе. Второй — комментарий с EarlyFirings. Если это сработает — отлично! Если нет, раскомментируйте и прочтите «Три три — Используйте окна фиксированного времени вместо окон скользящего времени».
Продолжайте отладку, пока не сможете изолировать проблему