Tutorial
Intro
The goal of the tutorial is to import JSON data, do a couple of transformations on the data and save the data to a location.
The tutorial uses the dataset downloaded from https://apps.who.int/gho/data/node.main.A995?lang=en (JSON simple)
The JSON schema used is:
{
"type": "struct",
"fields": [
{
"name": "Value",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "dims",
"type": {
"type": "struct",
"fields": [
{
"name": "COUNTRY",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "YEAR",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "DATASOURCE",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "GHO",
"type": "string",
"nullable": true,
"metadata": {}
}
]
},
"nullable": true,
"metadata": {}
}
]
}
Reading data
We define an InputData block to load the json data.
We define 2 parameters, Path and Schema, to send to the lift function as well.
yaml_str = """
Parameters:
Path:
Description: Path to import data from
Schema:
Description: The schema describing the json
LiftJob:
InputData:
Type: load::batch_json
Properties:
Path: ${Path}
JsonSchemaPath: ${Schema}
"""
history = lift(
spark=spark,
lift_def=yaml_str,
parameters={
"Path": "/path/data.json",
"Schema": "/path/schema.json",
}
)
Displaying the InputData can be done as follows:
display(history.get("InputData"))
Cleaning up the Value
The Value column is of type string and contains spaces, to remove them we write a custom python codeblock.
yaml_str = """
Parameters:
Path:
Description: Path to import data from
Schema:
Description: The schema describing the json
CleanValuePython:
Description: Clean value python function
LiftJob:
InputData:
Type: load::batch_json
Properties:
Path: ${Path}
JsonSchemaPath: ${Schema}
CleanValue:
Type: custom::python_codeblock
Input:
- InputData
Properties:
CustomFunction: ${CleanValuePython}
"""
def clean_value_python(params):
df = params["dataframes"]["InputData"]
return df.withColumn("Value", F.regexp_replace(F.col("Value"), "\s+", ""))
history = lift(
spark=spark,
lift_def=yaml_str,
parameters={
"Path": "/path/data.json",
"Schema": "/path/schema.json",
"CleanValuePython": clean_value_python,
}
)
Transforming and casting
Now the Value column can be cast to an integer.
Only Value, dims.YEAR and dims.COUNTRY are relevant, so select the columns.
Rows where Year is null can be filtered out as well.
yaml_str = """
Parameters:
Path:
Description: Path to import data from
Schema:
Description: The schema describing the json
CleanValuePython:
Description: Clean value python function
LiftJob:
InputData:
Type: load::batch_json
Properties:
Path: ${Path}
JsonSchemaPath: ${Schema}
CleanValue:
Type: custom::python_codeblock
Input:
- InputData
Properties:
CustomFunction: ${CleanValuePython}
Trans:
Type: transform::generic
Input: CleanValue
Properties:
Functions:
- cast_column:
col: Value
new_type: integer
- select:
cols:
- col: Value
- { col: dims.YEAR, alias: Year, cast: integer }
- { col: dims.COUNTRY, alias: Country }
- where:
predicate: ["Year", "!=", "null"]
"""
def clean_value_python(params):
df = params["dataframes"]["InputData"]
return df.withColumn("Value", F.regexp_replace(F.col("Value"), "\s+", ""))
history = lift(
spark=spark,
lift_def=yaml_str,
parameters={
"Path": "/path/data.json",
"Schema": "/path/schema.json",
"CleanValuePython": clean_value_python,
}
)
Save to delta files
Lastly, we save down the Trans block to delta files.
yaml_str = """
Parameters:
Path:
Description: Path to import data from
Schema:
Description: The schema describing the json
CleanValuePython:
Description: Clean value python function
WritePath:
Description: Destination path
LiftJob:
InputData:
Type: load::batch_json
Properties:
Path: ${Path}
JsonSchemaPath: ${Schema}
CleanValue:
Type: custom::python_codeblock
Input:
- InputData
Properties:
CustomFunction: ${CleanValuePython}
Trans:
Type: transform::generic
Input: CleanValue
Properties:
Functions:
- cast_column:
col: Value
new_type: integer
- select:
cols:
- col: Value
- { col: dims.YEAR, alias: Year, cast: integer }
- { col: dims.COUNTRY, alias: Country }
- where:
predicate: ["Year", "!=", "null"]
Save:
Type: write::batch_delta
Input: Trans
Properties:
Path: ${WritePath}
Mode: overwrite
"""
def clean_value_python(params):
df = params["dataframes"]["InputData"]
return df.withColumn("Value", F.regexp_replace(F.col("Value"), "\s+", ""))
history = lift(
spark=spark,
lift_def=yaml_str,
parameters={
"Path": "/path/data.json",
"Schema": "/path/schema.json",
"CleanValuePython": clean_value_python,
"WritePath": "/path/vehicles/",
}
)