Как безопасно читать данные из AWS S3 с Apache Spark и boto3

Python boto3 AWS S3 Apache Spark примеры курсы обучение, безопасность и защита данных в Big Data, шифрование Apache Spark примеры курсы обучение, курсы Apache Spark для разработчиков и дата-инженеров, анализ данных с Apache Spark, аналитика больших данных курсы, криптография в Apache Spark, обучение большим данным для разработчиков и инженеров данных примеры Spark, Школа Больших Данных Учебный Центр Коммерсант

Чтобы сделать наши курсы по Apache Spark для дата-инженеров еще более полезными, сегодня рассмотрим, как PySpark-задания могут считывать данные из корзин объектного хранилища AWS S3, используя Python-пакет boto3. Читайте далее, что представляет собой этот SDK, как использовать его вместе с IAM-ролями, а также как обеспечить безопасность конфиденциальных данных с помощью типовых или пользовательских функций PySpark.

Чтение данных из AWS S3 с Apache Spark и boto3

Озеро данных (Data Lake) часто организуется на базе облачного объектного хранилища AWS S3. Чтобы использовать эти данные в задачах прогнозирования с помощью ML-моделей или анализировать их, необходимо сперва считать эту информацию. Это можно сделать с помощью заданий PySpark и учетных записей в IAM (Identity and Access Management) – службе Amazon, которая позволяет получить доступ к другим сервисам AWS. IAM дает возможность указать, кто или что может получить доступ к сервисам и ресурсам AWS, централизованно управлять мелкими разрешениями и анализировать доступ для уточнения разрешений.

Роли IAM позволяют предоставить права доступа пользователям или сервисам, у которых обычно нет доступа к корпоративным AWS-ресурсам. Пользователям IAM или сервисам AWS можно присвоить роли для получения временных данных для доступа к данным и вызовов API. Это исключает необходимость предоставления долгосрочных данных для доступа или назначения разрешений для каждого объекта, которому требуется доступ к определенному ресурсу.

Разрешения предоставляют доступ к ресурсам AWS и предоставляются объектам IAM (пользователям, группам и ролям), которые не могут выполнять никаких действий на платформе AWS без необходимых разрешений. Чтобы предоставить объектам разрешения, можно назначить правило, определяющее тип доступа, допустимые действия и ресурсы для их выполнения. Также можно указать любые условия, которые должны выполняться для разрешения или запрещения доступа.

Чтобы считать данные из AWS S3 средствами Apache Spark, напишем небольшой PySpark-скрипт в интерактивной веб-среде Google Colab или Jupyter Notebook. Сперва установим необходимые библиотеки, используя менеджер пакетов pip. Помимо пакета pyspark следует установить библиотеку boto3, которая непосредственно позволяет обращаться к сервисам AWS. Вообще boto3 – это SDK-пакет от Amazon, дающий возможность разработчикам Python писать код, использующий AWS-сервисы S3 и EC2. Используя boto3, можно получить набор учетных данных AWS для чтения данных с помощью PySpark. При этом важно убедиться, что используемая IAM-роль имеет доступ к корзине S3. Для этого создадим набор временных учетных данных S3, доступных в течение 1 часа. Пример кода на PySpark выглядит следующим образом:

pip install pyspark
pip istall boto3

from pyspark import SparkConf
from pyspark.sql import SparkSession
import boto3session = boto3.session.Session()
sts_connection = session.client('sts')response = sts_connection.assume_role(RoleArn="arn:aws:iam::{YOUR_AWS_ACCOUNT_ID}:role/pyspark-iam-role", RoleSessionName='pyspark-s3-role',DurationSeconds=3600)
credentials = response['Credentials']

Далее можно считать данные из S3, используя пользовательские учетные данные, добавив их в конфигурацию задания Apache Spark. При этом важно иметь правильные JAR-файлы в качестве зависимостей для используемой версии PySpark.

builder = SparkSession.builder.appName("pyspark-demo")builder = builder.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
.config("spark.hadoop.com.amazonaws.services.s3.enableV4", "true")\
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.2,com.amazonaws:aws-java-sdk-bundle:1.11.888")\
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")\
.config("spark.hadoop.fs.s3a.access.key", credentials['AccessKeyId'])\
.config("spark.hadoop.fs.s3a.secret.key", credentials['SecretAccessKey'])\
.config("spark.hadoop.fs.s3a.session.token", credentials['SessionToken'])spark = builder.getOrCreate()

Далее можно прочитать данные из AWS S3 в датафрейм PySpark:

df = spark.read.csv("s3a://your/csv/path")
df.show()

Как обеспечить конфиденциальность прочитанных данных, используя типовые и пользовательские функции PySpark, рассмотрим далее.

Безопасность данных с PySpark

Обеспечить конфиденциальность данных можно, применив маскирование или шифрование личной информации на уровне столбца в датасете, используя несколько функций PySpark. В качестве примера рассмотрим, как маскировать или шифровать адреса электронной почты, которые являются персональными данными и должны охраняться согласно 152-ФЗ и GDPR.  Самый простой способ замаскировать или зашифровать данные — это использовать функцию lit(), заменив значение электронной почты заданным строковым значением, в данном случае «***Masked***». В следующем примере значения NULL не маскируются с помощью условия when():

conditions_mask = when(col("email").isNotNull(), lit("***Masked***")).otherwise(col("email"))
df_emails = df_emails.withColumn("email", conditions_mask)df_emails.show(5, False)
маскирование данных Apache Spark PySpark примеры курсы обучение
Маскирование данных в PySpark c функцией lit()

При всей простоте этого метода, он очень быстрый и гибкий, позволяет применять различные условия к данным, например, маскировать только адреса электронной почты, которые содержат символ @. Можно сделать маскирование более сложным, например, отображать первые 2 символа адреса электронной почты и последние 6 символов, заменив остальные на ****. Например, su****il.com и jo****il.com. Это достигается с помощью функции expr():

conditions_mask = (when(col("email")=="None", col("email"))
    .when(col("email").isNotNull(), expr('concat(LEFT(email, 2) ,"****", RIGHT(email, 6))'))
    .otherwise(col("email"))
)
df_emails = df_emails.withColumn("email", conditions_mask)df_emails.show(5, False)

Также можно функцию expr() на substring(), которая также есть в PySpark. Оба эти метода отлично работают, если не нужно снова получить исходные значения зашифрованных данных. В таком случае следует использовать более сложные техники, например, шифрование и дешифрование AES. Это симметричный тип шифрования, где используется один и тот же ключ как для шифрования, так и для расшифровки данных. Какие еще алгоритмы шифрования используются в Apache Spark, читайте в нашей новой статье.

В следующем фрагменте кода снова применяется PySpark-функция expr(), позволяющая выполнять функции aes_encrypt() и aes_decrypt(), предоставляя 16-битный ключ и режим по умолчанию GCM.

df_emails = df_emails.withColumn("encrypted_email", expr("hex(aes_encrypt(email, '1234567890abcdef', 'GCM'))"))df_emails.show(5, False)
df_emails = df_emails.withColumn("decrypted_email", expr("aes_decrypt(unhex(encrypted_email), '1234567890abcdef', 'GCM')").cast(StringType()))df_emails.show(5, False)
AES encrypting Python Spark, шифрование данных Apache Spark PySpark примеры курсы обучение
Шифрование и дешифрование данных в Apache Spark

Наконец, можно написать собственную UDF-функцию Python, чтобы применить ее к каждой строке. Но этот метод будет менее эффективным с точки зрения производительности, особенно если речь идет об обработке миллиардов строк. В любом случае, после применения маскирования или шифрования к значениям столбцов с конфиденциальными данными, их можно сохранить, например, в формате файла Parquet, следующим образом:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import lit, col, when, exprschema = StructType([
 StructField("id",IntegerType(),True),
 StructField("name",StringType(),True),
 StructField("email",StringType(),True)])data_emails=[(1, "Brooke", "brooke@yahoo.com"),
 (2, "Jon", "jon@hotmail.com"),
 (3, "Susan", "susan@gmail.com"),
 (4, "Axl", "None"),
 (5, "Adam", None)]df_emails = spark.createDataFrame(data=data_emails, schema=schema)df_emails.show(5, False)

В заключение отметим возможность использования методов криптографических Python-библиотек, например, PyCrypto и PyCryptodome, о которых мы писали здесь.

Про применение Apache Spark при переходе от озера данных на HDFS в облачное хранилище объектов Google, читайте в нашей новой статье.

Освойте использование Apache Spark в аналитике больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/@bogdan.cojocar/how-to-read-data-s3-using-pyspark-and-a-temporary-iam-role-1b1bd3055f0a
  2. https://github.com/boto/boto3
  3. https://aws.amazon.com/ru/iam/
  4. https://che-kulhan.medium.com/reduce-data-breaches-with-pyspark-398e057378d3
Поиск по сайту