Tutorial

Intro

The goal of the tutorial is to import JSON data, do a couple of transformations on the data and save the data to a location.

The tutorial uses the dataset downloaded from https://apps.who.int/gho/data/node.main.A995?lang=en (JSON simple)

The JSON schema used is:

{
  "type": "struct",
  "fields": [
    {
      "name": "Value",
      "type": "string",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "dims",
      "type": {
        "type": "struct",
        "fields": [
          {
            "name": "COUNTRY",
            "type": "string",
            "nullable": true,
            "metadata": {}
          },
          {
            "name": "YEAR",
            "type": "string",
            "nullable": true,
            "metadata": {}
          },
          {
            "name": "DATASOURCE",
            "type": "string",
            "nullable": true,
            "metadata": {}
          },
          {
            "name": "GHO",
            "type": "string",
            "nullable": true,
            "metadata": {}
          }
        ]
      },
      "nullable": true,
      "metadata": {}
    }
  ]
}

Reading data

We define an InputData block to load the json data. We define 2 parameters, Path and Schema, to send to the lift function as well.

yaml_str = """
Parameters:
  Path:
    Description: Path to import data from
  Schema:
    Description: The schema describing the json

LiftJob:
  InputData:
    Type: load::batch_json
    Properties:
      Path: ${Path}
      JsonSchemaPath: ${Schema}
"""

history = lift(
    spark=spark,
    lift_def=yaml_str,
    parameters={
        "Path": "/path/data.json",
        "Schema": "/path/schema.json",
    }
)

Displaying the InputData can be done as follows:

display(history.get("InputData"))

Cleaning up the Value

The Value column is of type string and contains spaces, to remove them we write a custom python codeblock.

yaml_str = """
Parameters:
  Path:
    Description: Path to import data from
  Schema:
    Description: The schema describing the json
  CleanValuePython:
    Description: Clean value python function

LiftJob:
  InputData:
    Type: load::batch_json
    Properties:
      Path: ${Path}
      JsonSchemaPath: ${Schema}

  CleanValue:
    Type: custom::python_codeblock
    Input:
      - InputData
    Properties:
      CustomFunction: ${CleanValuePython}
"""

def clean_value_python(params):
    df = params["dataframes"]["InputData"]
    return df.withColumn("Value", F.regexp_replace(F.col("Value"), "\s+", ""))

history = lift(
    spark=spark,
    lift_def=yaml_str,
    parameters={
        "Path": "/path/data.json",
        "Schema": "/path/schema.json",
        "CleanValuePython": clean_value_python,
    }
)

Transforming and casting

Now the Value column can be cast to an integer. Only Value, dims.YEAR and dims.COUNTRY are relevant, so select the columns. Rows where Year is null can be filtered out as well.

yaml_str = """
Parameters:
  Path:
    Description: Path to import data from
  Schema:
    Description: The schema describing the json
  CleanValuePython:
    Description: Clean value python function

LiftJob:
  InputData:
    Type: load::batch_json
    Properties:
      Path: ${Path}
      JsonSchemaPath: ${Schema}

  CleanValue:
    Type: custom::python_codeblock
    Input:
      - InputData
    Properties:
      CustomFunction: ${CleanValuePython}

  Trans:
    Type: transform::generic
    Input: CleanValue
    Properties:
      Functions:
        - cast_column:
            col: Value
            new_type: integer
        - select:
            cols:
              - col: Value
              - { col: dims.YEAR, alias: Year, cast: integer }
              - { col: dims.COUNTRY, alias: Country }
        - where:
            predicate: ["Year", "!=", "null"]
"""

def clean_value_python(params):
    df = params["dataframes"]["InputData"]
    return df.withColumn("Value", F.regexp_replace(F.col("Value"), "\s+", ""))

history = lift(
    spark=spark,
    lift_def=yaml_str,
    parameters={
        "Path": "/path/data.json",
        "Schema": "/path/schema.json",
        "CleanValuePython": clean_value_python,
    }
)

Save to delta files

Lastly, we save down the Trans block to delta files.

yaml_str = """
Parameters:
  Path:
    Description: Path to import data from
  Schema:
    Description: The schema describing the json
  CleanValuePython:
    Description: Clean value python function
  WritePath:
    Description: Destination path

LiftJob:
  InputData:
    Type: load::batch_json
    Properties:
      Path: ${Path}
      JsonSchemaPath: ${Schema}

  CleanValue:
    Type: custom::python_codeblock
    Input:
      - InputData
    Properties:
      CustomFunction: ${CleanValuePython}

  Trans:
    Type: transform::generic
    Input: CleanValue
    Properties:
      Functions:
        - cast_column:
            col: Value
            new_type: integer
        - select:
            cols:
              - col: Value
              - { col: dims.YEAR, alias: Year, cast: integer }
              - { col: dims.COUNTRY, alias: Country }
        - where:
            predicate: ["Year", "!=", "null"]

  Save:
    Type: write::batch_delta
    Input: Trans
    Properties:
      Path: ${WritePath}
      Mode: overwrite
"""

def clean_value_python(params):
    df = params["dataframes"]["InputData"]
    return df.withColumn("Value", F.regexp_replace(F.col("Value"), "\s+", ""))

history = lift(
    spark=spark,
    lift_def=yaml_str,
    parameters={
        "Path": "/path/data.json",
        "Schema": "/path/schema.json",
        "CleanValuePython": clean_value_python,
        "WritePath": "/path/vehicles/",
    }
)