Skip to content

Spark Helpers

Verify is path exists

python
def path_exists(path, spark:SparkSession):
    """
    Verify if path exists in S3
    """
    sc = spark.sparkContext
    fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(
        sc._jvm.java.net.URI.create("s3://" + path.split("/")[2]),
        sc._jsc.hadoopConfiguration(),
    )
    return fs.exists(sc._jvm.org.apache.hadoop.fs.Path(path))

List all files inside bucket

python
def get_files(spark:SparkSession, path:str):
    """
    List all files inside the provided path
    """

    files = spark.sparkContext._jvm.org.apache.hadoop.fs.FileSystem.get(
        spark.sparkContext._jvm.jaba.net.URI.create(path),
        spark.sparkContext._jsc.hadoopConfiguration()
    ).listStatus(spark.sparkContext._jvm.org.apache.hadoop.fs.Path(path))

    files_list = []
    for file in files:
        files_list.append(file.getPath().getName())
    
    return files_list

Generate list of partitions to read

python
def create_date_list(begin_date:str ,end_date:str ):
    begin_date_obj = datetime.strptime(begin_date, '%y/%m/%d %H:%M:%S')
    end_date_obj = datetime.strptime(end_date, '%y/%m/%d %H:%M:%S')
    
    date_list = []
    date_time = begin_date_obj

    while date_time < end_date_obj:
        date_list.append(date_time)
        date_time += timedelta(hours=1)
    
    return date_list

def create_spark_load_list(spark:SparkSession, basePath:str, date_list:list):
    list = []

    for date in date_list:
        load_path = f"{basePath}year={date.year}/month={date.month:02d}/day={date.day:02d}/hour={date.hour:02d}/"
        load_path_exists = path_exists(load_path, spark)
        
        if load_path_exists:
            list.append(load_path)

    return list

def get_data(spark:SparkSession, base_path:str, date_list_load_path:list):
    """
    Carrega Dataframe filtrando os dados do bucket de referência conforme 
    lista de paths a serem carregados
    """
    df = ( 
        spark.read
            .format('json')
            .option('basePath', base_path)
            .load(date_list_load_path)
    )

    return df

# list of datetimes to import
date_list_to_import = create_date_list('BEGIN_DATE', 'END_DATE')

# list of paths to spark .load()
date_list_load_path = create_spark_load_list(spark, 'base_path', date_list_to_import)

logger.info('Reading landing data')
df_table = get_data(spark, 'base_path', date_list_load_path)

Rename all columns

python
df = df.select(
    [ F.col(c).alias('prefix' + c + 'suffix') for c in df.columns ]
)

Convert STRING col to JSON with AutoSchema Inference

python
# Load Dataframe
df = spark.read.format("text/json").load(path)

# AutoInfer Schema from String Column
# value is the name of the col with type str having json data
schema_df = spark.read.json(df.limit(1).rdd.map(lambda r: r.value)).schema

# Read JSON and Explode Cols
exploded_df = (
    df.withColumn(
        "value",
        F.from_json("value", schema_df)
    )
    .select(F.col('value.*'))

# Show Data
exploded_df.show(1, vertical-True, truncate=False)

Feel free to use any content here.