Обработка ошибок в Apache NiFi: исключения и что с ними делать

Автор Категория ,
Обработка ошибок в Apache NiFi: исключения и что с ними делать

Недавно мы рассказывали про стратегии обработки ошибок в потоковых конвейерах данных на Apache NiFi. В продолжении этой темы, сегодня более детально разберем, с какими исключениями может столкнуться дата-инженер, о чем они говорят и как их обойти.

Виды исключений Apache NiFi

При разработке собственного процессора может возникнуть несколько различных неожиданных ситуаций. Поэтому дата-инженеру нужно понимать внутренние механизмы NiFi, если процессоры сами не обрабатывают ошибки, и знать, какая обработка ошибок ожидается от процессоров. В частности, во время выполнения метода onTrigger() могут случиться следующие проблемы:

  • входные данные поступают не в том формате;
  • сетевые подключения к внешним службам завершаются неудачно;
  • возник сбой чтения или записи данных на диск;
  • произошла ошибка в процессоре или зависимой библиотеке.

Любое из этих условий может привести к возникновению исключения процессора. С точки зрения фреймворка существует два типа исключений, которые могут избежать процессора: ProcessException и все остальные.

В случае ProcessException инфраструктура предполагает, что это сбой, результат которого известен. Более того, это условие, при котором повторная попытка обработки данных позже может быть успешной. В результате фреймворк откатит обрабатываемый сеанс и обрабатываемые потоковые файлы.

Однако, если какое-либо другое исключение выходит за пределы процессора, инфраструктура предполагает, что это сбой, который не был учтен разработчиком. В этом случае платформа также откатит сеанс и потоковые файлы. Но процессор может продолжать работать, вхолостую потребляя системные ресурсы без предоставления полезного выхода. Это часто случается, когда постоянно генерируется NullPointerException. Избежать этого позволит настройка повторного запуска в конфигурационном файле nifi.properties. По умолчанию время повторного запуска процессора составляет 10 секунд.

Однако, чаще всего исключения процессора происходит из обратного вызова, т.е. InputStreamCallback, OutputStreamCallback или StreamCallback, во время обработки содержимого FlowFile. Обратные вызовы могут вызывать либо RuntimeException, либо IOException. В случае RuntimeException это исключение будет передаваться обратно методу onTrigger(). В случае IOException исключение будет заключено в ProcessException, которое затем будет выброшено фреймворком.

Поэтому процессорам, использующим обратные вызовы, рекомендуется делать это в блоке try/catch и перехватывать ProcessException, а также другие исключения RuntimeException, которые сделают обратный вызов. Однако, не рекомендуется, чтобы процессоры перехватывали общие случаи Exception или Throwable по следующим причинам:

  • неожиданное исключение RuntimeException чаще всего является ошибкой, и разрешение фреймворку откатить сеанс гарантирует сохранность данных и то, что диспетчеры потоков данных смогут обрабатывать их по своему усмотрению, сохранив в очереди.
  • когда IOException вызывается из обратного вызова, исключения вызываются из кода процессора, например, данные не в том формате или сбой сетевого подключения. Если случилась проблема с репозиторием контента, где хранится содержимое потокового файла, платформа поймает это исключение IOException и завернет его в исключение FlowFileAccessException, которое расширяет RuntimeException. Это сделано явным образом, чтобы миновать метод onTrigger(), и фреймворк мог соответствующим образом обработать это условие. Перехват общего исключения предотвращает это.

Методы работы с исключениями процессора: штраф или уступка

Когда проблема возникает во время обработки, NiFi предоставляет два метода, позволяющих разработчикам процессоров избежать выполнения ненужной работы: «штраф» (penalization) и «уступка» (yielding). Разработчик может вызвать метод penalize(FlowFile) класса ProcessSession. Это приводит к тому, что сам FlowFile становится недоступным для нижестоящих процессоров в течение определенного периода времени. Время, в течение которого файл FlowFile недоступен, определяется диспетчером потока данных путем установки параметра Penalty Duration  в веб-интерфейсе фреймворка, в диалоговом окне конфигурации процессора. Значение этого параметра по умолчанию составляет 30 секунд.

Обычно кастомизация этого параметра выполняется, когда обработчик определяет, что данные не могут быть обработаны из-за причин, которые разрешатся сами собой. Например, процессор PutSFTP, который отправляет FlowFile на SFTP-сервер. Если потоковый файл с таким же именем уже существует на этом сервере SFTP, процессор штрафует FlowFile и направляет его к сбою (failure). Затем диспетчер потока данных (DataFlow Manager) может перенаправить ошибку обратно на тот же процессор PutSFTP. Таким образом, если FlowFile с таким же именем уже существует, процессор не будет пытаться отправить файл снова в течение 30 секунд или любого другого заданного периода, но может продолжать обрабатывать другие потоковые файлы.

Уступка позволяет разработчику процессора указать NiFi, что он не сможет выполнять какие-либо полезные функции в течение некоторого периода времени. Это обычно полезно при работе с процессором, который взаимодействует с удаленным ресурсом. Если процессор не может подключиться к удаленному ресурсу или если ожидается, что удаленный ресурс предоставит данные, но сообщает, что у него их нет, процессор должен вызвать yield для объекта ProcessContext, а затем вернуться. Делая это, процессор сообщает фреймворку, что он не должен тратить ресурсы на запуск этого процессора, а вместо этого использовать ресурсы для других обработчиков.

Откат сеанса

Хотя ProcessSession можно рассматривать как механизм доступа к потоковым файлам, он предоставляет еще одну очень важную возможность — транзакционность. Все методы, вызываемые в ProcessSession, выполняются как транзакции. Например, завершить транзакцию, можно вызвав commit() или rollback(). Обычно это обрабатывается классом AbstractProcessor: если метод onTrigger() генерирует исключение, AbstractProcessor перехватывает его, вызывает session.rollback(), а затем повторно генерирует исключение. Иначе AbstractProcessor вызовет commit() для ProcessSession.

Однако, бывают случаи, когда дата-инженеру нужно хотят явно откатить сеанс. Это можно сделать в любое время, вызвав метод rollback() или rollback(boolean). При использовании последнего логическое значение указывает, должны ли быть оштрафованы те потоковые файлы, которые были извлечены из очередей через методы получения ProcessSession, прежде чем они будут добавлены обратно в свои очереди.

При вызове отката любые изменения, произошедшие с потоковыми файлами в этом сеансе, отбрасываются, включая изменения содержимого и атрибутов. Кроме того, откатываются все события происхождения за исключением любого события SEND, созданного путем передачи значения true для аргумента force. Файлы FlowFiles, которые были извлечены из входных очередей, затем передаются обратно во входные очереди и штрафуются, чтобы их можно было снова обработать.

С другой стороны, когда вызывается метод фиксации, новое состояние FlowFile сохраняется в репозитории FlowFile, а любые произошедшие события Provenance сохраняются в репозитории Provenance. Предыдущее содержимое уничтожается, если другой FlowFile не ссылается на тот же фрагмент содержимого, а сами потоковые файлы передаются в исходящие очереди, чтобы следующие процессоры могли работать с данными.

Важно отметить, как на это поведение влияет использование аннотации org.apache.nifi.annotation.behavior.SupportsBatching. Если процессор использует эту аннотацию, вызовы ProcessSession.commit() могут не вступить в силу немедленно. Эти фиксации будут объединены в пакеты, чтобы обеспечить более высокую пропускную способность. Однако, если в какой-то момент процессор откатит ProcessSession, все изменения с момента последнего вызова фиксации будут отменены, и все пакетные фиксации вступят в силу. Эти пакетные коммиты не откатываются.

Узнайте больше про тонкости администрирования и эксплуатации Apache NiFi  для построения эффективных ETL-конвейеров потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://dzone.com/articles/best-practices-for-data-pipeline-error-handling-in
  2. https://nifi.apache.org/developer-guide.html