Устраняем дубли в потоковых данных с Apache Flink SQL

дедупликация Apache Flink SQL примеры курсы обучение, потоковая обработка данных Apache Flink SQL, Apache Flink SQL для дата-инженеров примеры курсы обучение, как удалить дубли в потоковых данных Apache Flink SQL, курсы Apache Flink для разработчиков, обучение большим данным, инженерия Big Data Apache Flink SQL, Школа Больших Данных Учебный Центр Коммерсант

Чем опасны дубли данных при их потоковой обработке и как реализовать дедупликацию в Apache Flink SQL. Смотрим на практическом примере для обучения дата-инженеров и разработчиков распределенных приложений.

Потоковая дедупликация данных в Apache Flink SQL

Apache Flink можно назвать уникальный фреймворком для разработки распределенных приложений в области Big Data, который унифицирует пакетную и потоковую обработку, сохраняя поддержку ANSI SQL и предоставляя широкий набор функций для сценариев реального времени. В потоковой обработке данных приложения-продюсеры, которые генерируют события, иногда могут создавать дубли – записи с одним и тем же идентификатором. Или восходящие ETL-задания не реализуют гарантию строго однократной доставки, что также приводит к дублированию записей в приемнике в случае отработки отказа. Дубликаты записей повлияют на правильность последующих аналитических заданий, особенно в агрегатных функциях, таких как суммирование, подсчет количества значений и пр. Поэтому для обеспечения высокого качества данных необходимо удалить дубли, т.е. выполнить дедупликацию. Цель этого процесса  — обеспечить обработку только уникальных записей и избежать любых проблем, которые могут возникнуть из-за дублирования. В потоковой обработке дедупликация данных может повысить производительность всей Big Data системы.

Таким образом, дедупликация нужна, чтобы:

  • повысить производительность системы за счет уменьшения объема обрабатываемых данных через удаление повторов;
  • освободить место в хранилище, т.к. дубли занимают пространство жесткого диска;
  • улучшить точность анализа и повысить эффективность обработки данных.

Дедупликация данных работает путем выявления и удаления повторяющихся записей из потока данных. Обычно это делается путем сравнения данных в потоке с эталонным набором данных. При обнаружении повторяющейся записи она удаляется из потока. Apache Flink использует SQL-функцию ROW_NUMBER() для удаления дубликатов, как и в случае запроса Top-N. Теоретически дедупликация — это частный случай Top-N, в котором N равно единице и упорядочено по времени обработки или времени события. Ниже показан синтаксис оператора дедупликации:

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1

Рассмотрим параметры этого запроса:

  • ROW_NUMBER() – функция, которая присваивает каждой строке уникальный порядковый номер, начиная с единицы;
  • PARTITION BY col1[, col2…] – ключ дедупликации, т.е. один или несколько столбцов раздела;
  • ORDER BY time_attr [asc|desc] — столбец сортировки, атрибут времени обработки или события. Аналогично стандартным опциям SQL-оператора ORDER BY, ASC означает сохранение первой строки, а DESC – сохранение последней строки.
  • WHERE rownum = 1 – условие уникальности каждой строки, что и означает дедупликацию.

Этот шаблон необходимо точно соблюдать, иначе оптимизатор Apache Flink не сможет преобразовать запрос. Как это работает, далее рассмотрим на практическом примере.

Потоковая обработка данных с помощью Apache Flink

Код курса
FLINK
Ближайшая дата курса
1 августа, 2024
Продолжительность
16 ак.часов
Стоимость обучения
48 000 руб.

Практический пример

В качестве примера возьмем систему обработки заказов, которая каждые 5 секунд генерирует строки в памяти, которые означают повторяющиеся события с одним и тем же идентификатором order_id.

CREATE TABLE orders (
  id INT,
  order_time AS CURRENT_TIMESTAMP,
  WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='10',
  'fields.id.kind'='random',
  'fields.id.min'='1',
  'fields.id.max'='100'
);

Предположим, по правилам бизнес-логики для последующей обработки нужно сохранить только самое последнее событие. Для этого сперва используем комбинацию функции COUNT и предложения HAVING, чтобы проверить, в каких заказах есть более одного события.

--Check for duplicates in the `orders` table
SELECT id AS order_id,
       COUNT(*) AS order_cnt
FROM orders o
GROUP BY id
HAVING COUNT(*) > 1;

Затем отфильтруем эти события с помощью функции ROW_NUMBER():

--Use deduplication to keep only the latest record for each `order_id`
SELECT
  order_id,
  order_time
FROM (
  SELECT id AS order_id,
         order_time,
         ROW_NUMBER() OVER (PARTITION BY id ORDER BY order_time) AS rownum
  FROM orders
     )
WHERE rownum = 1;

Про другой вариант устранения дублей в потоковой обработке данных с помощью Flink SQL читайте в нашей новой статье. А на практике освоить этот прием и другие тонкости использования Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://www.ververica.com/blog/flink-sql-deduplication
  2. https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/queries/deduplication/
  3. https://github.com/ververica/flink-sql-cookbook/blob/main/aggregations-and-analytics/06_dedup/06_dedup.md
Поиск по сайту