Enabling HDF5 Support in AWS EMR

In this article

Overview

Hierarchical Data Format (HDF) is a file format that supports complex objects with a wide variety of metadata. SnapLogic eXtreme supports HDF version 5. You can use the steps in this article to enable HDF5 on your AWS EMR cluster. After setting up your AWS EMR cluster, you will be able to use the PySpark Script Snap to read HDF5 files in your AWS S3 buckets and convert them into JSON files.

Key Steps

  1. Create a virtual environment locally and compress the environment into a ZIP file.

  2. Upload the ZIP file to an S3 bucket accessible by the AWS EMR cluster along with a PySpark script to convert HDF5 files to JSON.

  3. Configure and execute the PySpark Script Snap to read HDF5 files and parse them into JSON

Creating the Virtual Environment

Prerequisites 

You need to have a Linux machine with Python installed. In this example, we are using Python 2.

Steps

To create a virtual environment: 

  1. Launch the command line interface and navigate to the working directory where you want to install the virtual environment. Install the virtual environment using the following command:

    1 python -m virtualenv venv

    This creates a directory, venv, in your current working directory.

  2. Verify that the directory is created.

    1 ls

     

  3. Verify that the new directory has Python in it:

    1 ls venv/bin/python

     

  4. Setup dependencies using the requirements.txt file. In this file you can specify the modules you want in your virtual environment. For HDF5 support, we recommend s3fs, pandas, and h5py.

    1 2 3 4 5 vi requirements.txt cat requirements.txt s3fs pandas h5py
    • h5py: To read HDF5 files.

    • s3fs: S3 filesystem to read HDF5 files in S3.

    • pandas: To convert each dataset (from each HDF5 file) to dataframe.

     

  5. After setting up the virtual environment, you can activate it by entering the following command:  

    1 source venv/bin/activate

    After the virtual environment is active, you can see its tag before the login information. This means that you are now inside the virtual environment.

  6. Install the dependencies you listed in the requirements.txt file in step 5 earlier. 

    1 pip install -r requirements.txt

     

  7. Verify that all the modules were installed.

    1 pip list

     

  8. Exit the virtual environment.

    1 deactivate

     

  9. Navigate to the directory where you installed the virtual environment, venv

    1 pushd venv

     

  10. Compress the virtual environment directory into a ZIP file which is placed in the working directory.

    1 zip -rq ../venv.zip *

     

  11. Optional. Verify that the environment's ZIP file is placed within the working directory by executing the following command in the working directory:

    1 ls -l

Uploading Files to S3 Bucket

You must upload two files to your S3 bucket:

  • The virtual environment ZIP file.

  • A PySpark script to convert the HDF5 files to JSON format.

Note the file paths for the above files. You will need them when configuring a PySpark Script Snap to read HDF5 files. For example, "s3://mybucket/virtualenv/venv.zip" and "s3://mybucket/script/hdf5_to_json.py".

Your AWS EMR cluster is now ready to read HDF5 files. See Read and Parse HDF5 Files for details.

While you can create your own PySpark script to convert HDF5 files to JSON format, you can also use the following:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 from pyspark import SparkConf from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql import Row from pyspark.sql.functions import udf, col import s3fs import h5py import pandas as pd import numpy as np import boto3 import sys import copy import json import gzip from io import BytesIO print ("Provided S3 path in argument....") print ("Source folder for reading HDF5 files: ", sys.argv[1]) print ("Target folder for writing JSON files: ", sys.argv[2]) s3 = s3fs.S3FileSystem() s3_res = boto3.resource('s3') src_hdf_files = sys.argv[1].split("://") target_json_files = sys.argv[2].split("://") if (len(src_hdf_files) != 2): print ("Invalid source path in S3") if (len(target_json_files) != 2): print ("Invalid target path in S3") target_json_files = sys.argv[2] if (target_json_files.endswith('/') == False): target_json_files = target_json_files + '/' src_index = src_hdf_files[1].find('/') src_bucket = src_hdf_files[1][:src_index] src_folder_path_after_bucket = src_hdf_files[1][src_index+1:] s3_src_bucket = s3_res.Bucket(src_bucket) target_path = target_json_files.split("://") target_index = target_path[1].find('/') target_bucket = target_path[1][:target_index] target_folder_path_after_bucket = target_path[1][target_index+1:] def main(): print ("======================================= Started =======================================") sconf = SparkConf().setAppName("HDF5 to JSON") sc = SparkContext(appName="HDF5 to JSON", conf=sconf) sqlContext = SQLContext(sc) my_hdf_files = [] print ("\n") print ("S3 file paths....") for file in s3_src_bucket.objects.filter(Prefix=src_folder_path_after_bucket): print(file.key) if (file.key.endswith('.h5') or file.key.endswith('.hdf5') or file.key.endswith('.gh5')): my_hdf_files.append(file.key) row = Row("hdf_file_paths") all_hdf_files = sc.parallelize(my_hdf_files) sparkdf_hdf = all_hdf_files.map(row).toDF() sparkdf_hdf.show() print ("Columns: ", sparkdf_hdf.columns) converthdffile_udf = udf(lambda filename: convert_hdf_json(filename)) sparkdf_hdf.withColumn("processed_status", converthdffile_udf(col("hdf_file_paths"))).show() print ("======================================= Done =======================================") sc.stop() def convert_hdf_json(filename): s3 = s3fs.S3FileSystem() s3_res = boto3.resource('s3') dct = [] allmetadata = {} json_folder_pre = filename.replace("/", "_") indx = json_folder_pre.rfind('.') json_folder_pre = json_folder_pre[:indx] print ("\n\n") full_path = 's3://' + src_bucket + '/' + filename print ("Full path for the HDF5 file: ", full_path) hf = h5py.File(s3.open(full_path), "r") def getallmetadata(name, obj): if isinstance(obj, h5py.Dataset): dct.append(name) else: indx_slash = name.rfind("/") find_metadata = name[:indx_slash] metadata_dict = {} if (indx_slash != -1): metadata_dict = copy.deepcopy(allmetadata[find_metadata]) for i in obj.attrs.keys(): updated_i = convert_key_spark_equivalent(i) metadata_dict[updated_i] = convert_numpy_python_dtypes(obj.attrs[i]) allmetadata[name] = metadata_dict hf.visititems(getallmetadata) json_folder = target_folder_path_after_bucket + json_folder_pre + "/" json_dataset_pre = "dataset_" json_metadata_pre = "metadata_" json_dataset_metadata_num = 1 for dataset in dct: print ("\n") print ("Dataset: ", dataset) json_dataset_file = json_folder + json_dataset_pre + str(json_dataset_metadata_num) + ".json.gz" json_metadata_file = json_folder + json_metadata_pre + str(json_dataset_metadata_num) + ".json.gz" df_f = pd.DataFrame(hf[dataset][:]) df_f.columns = df_f.columns.astype(str) indx_slash = dataset.rfind("/") find_metadata = dataset[:indx_slash] output = [] if (find_metadata in allmetadata.keys()): metadata_dict = allmetadata[find_metadata] else: metadata_dict = {"found": False} id_num = 0 for jdict in df_f.to_dict(orient='records'): jdict['id'] = id_num output.append(jdict) id_num = id_num + 1 buf_data = BytesIO() compressed_data = gzip.GzipFile(fileobj=buf_data, mode="wb") compressed_data.write(bytes(json.dumps(output).encode('UTF-8'))) compressed_data.close() buf_metadata= BytesIO() compressed_metadata = gzip.GzipFile(fileobj=buf_metadata, mode="wb") compressed_metadata.write(bytes(json.dumps(metadata_dict).encode('UTF-8'))) compressed_metadata.close() s3_res.Object(target_bucket, json_dataset_file).put(Body=buf_data.getvalue(), ContentEncoding='gzip') s3_res.Object(target_bucket, json_metadata_file).put(Body=buf_metadata.getvalue(), ContentEncoding='gzip') json_dataset_file = "s3://" + target_bucket + "/" + json_dataset_file json_metadata_file = "s3://" + target_bucket + "/" + json_metadata_file json_dataset_metadata_num = json_dataset_metadata_num + 1 print ("Created the following JSON file for the corresponding dataset: ", json_dataset_file) print ("Created the following JSON file for the corresponding metadata: ", json_metadata_file) del output[:] del dct[:] allmetadata.clear() return "Processed" def convert_numpy_python_dtypes(metadata_val): if (np.string_ == type(metadata_val)): new_value = str(metadata_val) elif (np.float32 == type(metadata_val)): new_value = np.float32(metadata_val).item() elif (np.float64 == type(metadata_val)): new_value = np.float64(metadata_val).item() elif (np.int32 == type(metadata_val)): new_value = np.int32(metadata_val).item() else: new_value = metadata_val return new_value def convert_key_spark_equivalent(key): updated_key = key.replace(" ", "_") updated_key = updated_key.replace(",", "_") updated_key = updated_key.replace(";", "_") updated_key = updated_key.replace("{", "_") updated_key = updated_key.replace("}", "_") updated_key = updated_key.replace("(", "_") updated_key = updated_key.replace(")", "_") return updated_key if __name__ == "__main__": main()



Read and Parse HDF5 Files

  1. Login to your AWS console and upload the HDF5 files to your S3 bucket. Note the location of this source folder. For example, "s3://mybucket/hdf_source_files/".

  2. Create a target folder in the S3 bucket where you want to store the parsed JSON files. Note the location of this target folder. For example, "s3://mybucket/json_files/".

  3. Configure and execute the PySpark Script Snap to read the HDF5 files and parse them into JSON format.

    1. Specify the path of the Python script to be used for converting HDF5 files into JSON format in the PySpark Script path field.

    2. Specify the path of the HDF5 files and the target location for saving the JSON files as script arguments in the Script args field.

    3. Specify the path of the virtual environment's ZIP file in the Virtual environment path field.

  4. Validate and execute the Snap. The HDF5 files in the source folder are converted into JSON files. You can create separate Pipelines to process the JSON files, or add the appropriate Snaps in this Pipeline itself.


See Also