Что не так с Delta Lake на Apache Spark: 7 основных проблем и их решения

Spark, архитектура, обработка данных, большие данные, Big Data, Hadoop, Data Lake, Hive, SQL, NoSQL, MLOps, DataOps, Delta Lake, обучение Apache Spark, курсы по Spark

При всех своих достоинствах Delta Lake, включая коммерческую реализацию этой Big Data технологии от Databricks, оно обладает рядом особенностей, которые могут расцениваться как недостатки. Сегодня мы рассмотрим, чего не стоит ожидать от этого быстрого облачного хранилище для больших данных на Apache Spark и как можно обойти эти ограничения. Читайте далее, как реализовать потоковое чтение и запись данных при отсутствии Sqoop, что делать для изменения типа столбца и при чем тут Hive.

Не все так просто или ключевые особенности Delta Lake

Подчеркнем, что облачное Delta Lake располагается поверх корпоративной системы хранения данных, но не заменяет ее, а добавляет возможность работать с ACID-транзакциями в Hadoop HDFS, BLOB-объекты Azure и Amazon S3. Загрузка информации в Delta Lake возможна из любой системы хранения, которая поддерживает источники данных Apache Spark, и записывать в столбцовом формате Parquet [1].

Однако, на практике при переходе к Delta Lake от традиционных озер данных на Apache Hadoop можно столкнуться со следующими особенностями этой облачной Big Data технологии:

  • Коммерческая реализация Delta Lake от Databricks, наиболее популярная в мире enterprise, не поддерживает Apache Sqoop — инструмент для передачи данных между Hadoop и реляционными СУБД или мэйнфреймам. Обойти это ограничение можно, используя другие средства загрузки реляционных данных в облачное хранилище, например, специализированный JDBC-коннектор [2].
  • Поскольку SQL-представление данных в Hadoop реализуется через Apache Hive, то при переходе к Delta Lake информация должна быть в виде Hive-таблиц [2].
  • Delta Lake не поддерживает транзакции с несколькими таблицами и внешние ключи, работая с ACID-транзакциями на уровне таблицы. А для изменения типа столбца или удаления столбца необходимо переписать всю таблицу [3].
  • При использовании HBase на Hadoop следует перейти на NoSQL-сервисы, такие как, например, Azure Cosmos DB или DynamoDB на AWS с помощью соответствующих коннекторов для обработки данных [2].
  • Delta Lake поддерживает не все функции DDL (Data Definition Language, язык или набор SQL-операторов для определения структуры таблицы) и DML (Data Manipulation Language, язык или набор SQL-операторов для манипулирования данными). В части DDL отсутствует поддержка ANALYZE TABLE PARTITION, ALTER TABLE [ADD|DROP] PARTITION, ALTER TABLE RECOVER PARTITIONS, ALTER TABLE SET SERDEPROPERTIES, CREATE TABLE LIKE, INSERT OVERWRITE DIRECTORY, LOAD DATA. В части DML отсутствуют операторы INSERT INTO [OVERWRITE] для таблицы со статическими секциями, INSERT OVERWRITE TABLE для таблицы с динамическими секциями, Группирование, Указание схемы при чтении из таблицы и указание целевых секций с помощью PARTITION (part_spec) вTRUNCATE TABLE таблицу [3].

Еще одним существенным ограничением Delta Lake является отсутствие поддержки API DStream, о чем мы поговорим далее.

Что использовать вместо DStream для потоковой передачи, чтения и записи

При том, что Delta Lake основан на Apache Spark, оно не поддерживает API DStream для потоковой передачи данных, а также операций чтения и записи. Вместо этого используется Structured Streaming – масштабируемый и отказоустойчивый механизм структурированной потоковой передачи на базе Spark SQL. Важно, что Structured Streaming не обрабатывает входные данные, которые не являются дополнением, и создает исключение, если в таблице-источнике данных происходят какие-либо изменения [4]. Как это принято в Spark, запросы структурированной потоковой передачи обрабатываются в рамках микропакетного подхода (micro-batch), который обрабатывает потоки данных как серию небольших пакетных заданий, достигая сквозных задержек до 100 миллисекунд и строго однократной (exactly-once) гарантированной отказоустойчивости [5].

Поэтому следует контролировать максимальный размер любого микропакета, задав параметр maxFilesPerTrigger, который указывает максимальное число новых файлов в каждом триггере (по умолчанию это значение равно1000). Также имеет смысл ограничить объем данных, обрабатываемых в каждом микро-пакете, задав параметр maxBytesPerTrigger . При использовании параметра Trigger.Once в сочетании с maxFilesPerTrigger микро-пакет обрабатывает данные до тех пор, пока не будет достигнут предел. А чтобы работать с изменениями, которые не могут быть автоматически распространены по нисходящей, например, удалить из таблицы пользовательские данные из-за GDPR, можно сделать следующее [4]:

  • удалить выходные данные и контрольную точку и перезапустить поток с самого начала;
  • игнорировать транзакции, удаляющие данные на границах разделов, с помощью ignoreDeletes;
  • повторно обработать обновления, если файлы должны быть перезаписаны в исходной таблице из-за UPDATE, MERGE INTO, DELETE (в разделах) или OVERWRITE, с помощью ignoreChanges. Неизмененные строки могут по-прежнему выдаваться, поэтому нисходящие потребители должны иметь возможность обрабатывать дубликаты. 

Благодаря этому поток не будет нарушаться удалением или обновлением исходной таблицы. Например, из таблицы user_events со столбцами user_email, секционированными по дате (date), необходимо удалить сведения о пользовательских почтовых ящиках.

Следующий код на Scala иллюстрирует этот кейс:

events.readStream

  .format(«delta»)

  .option(«ignoreChanges», «true»)

  .load(«/mnt/delta/user_events»)

При обновлении user_email с помощью инструкции UPDATE файл с этими данными перезаписывается. При использовании ignoreChanges новая запись передается дальше со всеми другими неизмененными записями, которые были в том же файле. При этом логика программы должна иметь возможность обрабатывать эти входящие дубликаты [4].

Подводя итог особенностям быстрого облачного хранилища на Apache Spark, подчеркнем, что, несмотря на все вышеперечисленные ограничения, эта технология полностью соответствует современным тенденциям в мире Big Data, поддерживая концепции DataOps и MLOps за счет облачных сервисов.

Как адаптировать лучшее из Delta Lake в Apache Spark и Hadoop для аналитики больших данных в проекты цифровизации своего бизнеса, а также государственных и муниципальных предприятий, вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

Источники

  1. https://www.tadviser.ru/index.php/Продукт:Databricks_Cloud
  2. https://blogs.informatica.com/2019/10/04/how-to-go-hadoop-less-with-informatica-data-engineering-and-databricks/
  3. https://docs.microsoft.com/ru-ru/azure/databricks/delta/delta-faq
  4. https://docs.microsoft.com/ru-ru/azure/databricks/delta/delta-streaming
  5. https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html