Почему срабатывает мое окно времени обработки, а время события — нет

Я изо всех сил пытаюсь заставить запускать триггеры на основе времени события для моего конвейера лучей apache, но, похоже, могу запускать запуск окна со временем обработки.

Мой конвейер довольно прост:

  1. Я получаю пакеты точек данных, которые включают отметки времени миллисекундного уровня от чтения pubsub с отметкой времени немного раньше, чем самая ранняя точка пакетных данных. Данные группируются, чтобы уменьшить рабочую нагрузку на стороне клиента и расходы на pubsub.

  2. Я извлекаю временные метки второго уровня и применяю временные метки к отдельным точкам данных

  3. Я закрываю данные для обработки и избегаю использования глобального окна.

  4. Я группирую данные по секундам для последующей категоризации по секундам потоковых данных.

  5. В конце концов, я использую скользящие окна в отсортированных по категориям секундах, чтобы условно отправлять одно из двух сообщений в pubsub один раз в секунду.

Моя проблема, похоже, находится на шаге 3.

Я пытаюсь использовать ту же стратегию работы с окнами на этапе 3, которую я в конечном итоге буду использовать на этапе 5, чтобы выполнить расчет скользящего среднего на категоризированных секундах.

Я пробовал возиться с параметрами withTimestampCombiner (TimestampCombiner.EARLIEST), но, похоже, это не решает проблему.

Я читал о методе .withEarlyFirings, используемом для времени события, но похоже, что он имитирует мою существующую работу. В идеале я мог бы полагаться на водяной знак, проходящий через конец окна, и включать позднее срабатывание.

// De-Batching The Pubsub Message

  static public class UnpackDataPoints extends DoFn<String,String>{
    @ProcessElement
        public  void processElement(@Element String c, OutputReceiver<String> out) {
            JsonArray packedData = new JsonParser().parse(c).getAsJsonArray();
            DateTimeFormatter dtf = DateTimeFormat.forPattern("EEE dd MMM YYYY HH:mm:ss:SSS zzz");
            for (JsonElement acDataPoint: packedData){
                String hereData = acDataPoint.toString();
                DateTime date = dtf.parseDateTime(acDataPoint.getAsJsonObject().get("Timestamp").getAsString());
                Instant eventTimeStamp = date.toInstant();
                out.outputWithTimestamp(hereData,eventTimeStamp);
            }
        }
        }
// Extracting The Second
 static public class ExtractTimeStamp extends DoFn<String,KV<String,String>> {
    @ProcessElement
        public void processElement(ProcessContext ctx ,@Element String c, OutputReceiver<KV<String,String>> out) {
            JsonObject accDataObject = new JsonParser().parse(c).getAsJsonObject();
            String milliString = accDataObject.get("Timestamp").getAsString();
            String secondString = StringUtils.left(milliString,24);
            accDataObject.addProperty("noMiliTimeStamp", secondString);
            String updatedAccData = accDataObject.toString();
            KV<String,String> outputKV = KV.of(secondString,updatedAccData);
                    out.output(outputKV);
    }
    }
// The Pipeline & Windowing
   Pipeline pipeline = Pipeline.create(options);

 PCollection<String> dataPoints = pipeline
    .apply("Read from Pubsub", PubsubIO.readStrings()
                    .fromTopic("projects/????/topics/???")
                    .withTimestampAttribute("messageTimestamp"))
   .apply("Extract Individual Data Points",ParDo.of(new UnpackDataPoints()));


 /// This is the event time window that doesn't fire for some reason
        /*
        PCollection<String> windowedDataPoints = dataPoints.apply(
                Window.<String>into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1)))
               // .triggering(AfterWatermark.pastEndOfWindow())
               .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                .plusDelayOf(TWO_MINUTES))
                //.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(2)))
                .discardingFiredPanes()
                .withTimestampCombiner(TimestampCombiner.EARLIEST)
                .withAllowedLateness(Duration.standardSeconds(1)));
        */
     ///// Temporary Work Around, this does fire but data is out of order

        PCollection<String> windowedDataPoints = dataPoints.apply(
                Window.<String>into(FixedWindows.of(Duration.standardMinutes(120)))
                .triggering(
                        AfterProcessingTime.pastFirstElementInPane()
                                .plusDelayOf(Duration.standardSeconds(5)))
                .discardingFiredPanes()
                .withTimestampCombiner(TimestampCombiner.EARLIEST)
                        .withAllowedLateness(Duration.standardSeconds(1)));

  PCollection<KV<String, String>> TimeStamped = windowedDataPoints
                .apply( "Pulling Out The Second For Aggregates", ParDo.of(new ExtractTimeStamp()));

        PCollection<KV<String, Iterable<String>>> TimeStampedGrouped = TimeStamped.apply("Group By Key",GroupByKey.create());

        PCollection<KV<String, Iterable<String>>> testing = TimeStampedGrouped.apply("testingIsh", ParDo.of(new LogKVIterable()));

Когда я использую первую оконную стратегию, которая закомментирована, мой конвейер работает бесконечно, и получение данных и LogKVIterable ParDo никогда ничего не возвращает, когда я использую время обработки, LogKVIterable запускается и регистрируется в консоли.

См. также:  Невозможно импортировать torch.distributed.rpc

Вы проверили, правильно ли установлены ваши временные метки? Возможно, ваш водяной знак не продвигается должным образом, если временные метки не анализируются должным образом …   —  person Sam-U_L-L-L    schedule 28.09.2019

Вы видите, что показатели актуальности данных / системного лага не отстают от вашего конвейера?   —  person Sam-U_L-L-L    schedule 30.09.2019

Понравилась статья? Поделиться с друзьями:
IT Шеф
Комментарии: 2
  1. Sam-U_L-L-L

    Это действительно похоже на то, что метка времени, которую вы добавляете к своим данным, может быть неправильной / поврежденной. Я бы посоветовал вам проверить следующее:

    1. Метка времени в ваших элементах добавляется правильно. Добавьте запись в трансформации до и после и тщательно протестируйте этот код.

    2. Показатели «Свежесть данных» и «Системное отставание» в вашем конвейере развиваются так, как вы ожидаете. Если актуальность данных меняется не так, как ожидалось, это явный признак того, что ваша временная метка установлена ​​неправильно.

  2. Sam-U_L-L-L

    Запуск по времени обработки отличается от срабатывания по времени события. Во времени обработки нет такого понятия, как поздние данные. Во время событий реальная проблема — обработка запаздывающих данных. Поздние данные в обработке времени события обрабатываются с помощью водяных знаков и триггеров. Чтобы получить отличное руководство по этому поводу, я рекомендую проверить эти две статьи Googler Tyler Akidau: a, b </ а>.

    Поскольку в оконном режиме Время обработки нет такой вещи, как поздние данные, имеет смысл, чтобы ваш конвейер Время обработки Apache Beam работал без каких-либо проблем.

    Между тем, при работе с окнами Event Time могут появляться запаздывающие данные, и ваши окна и триггеры должны обрабатывать эти сценарии при правильной разработке.

    Скорее всего, ваш код конвейера обработки времени не запускается из-за неправильной конфигурации! Я не могу воспроизвести вашу проблему, поскольку ваш водяной знак (для источника Pub / Sub) определяется эвристически. Хотя я рекомендую вам отлаживать свой код следующим образом: Во-первых, увеличивая allowedLatness. Например: до 1 часа. Если это сработает — отлично! Если нет, см. Второе. Второй — комментарий с EarlyFirings. Если это сработает — отлично! Если нет, раскомментируйте и прочтите «Три три — Используйте окна фиксированного времени вместо окон скользящего времени».

    Продолжайте отладку, пока не сможете изолировать проблему

    :) :)

Добавить комментарий

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: