Lift job blocks

The lift job is defined by one or more lift blocks. The types of supported blocks are the following:

  • custom - Custom codeblocks that can contain any kind of spark code
  • load - Blocks that can load data such as xml, json, delta etc.
  • transform - Transform data with functions as alias, where and concat
  • write - Write data down as delta or to databases

This is true for each block:

  • has a sub block that is annotated as block::sub-block
  • outputs one dataframe
  • needs a input dataframe (exception is the load block)
LiftJob:
  {BlockName}
    Type: {block::sub-block}
    Input: {BlockInput}
    Properties:
      {Prop}: {Prop}

Example:

LiftJob:

  RawFiles:
    Type: load::batch_parquet
    Properties:
      Path: s3://bucket/path/to/data

  TransformBlock:
    Type: transform::generic
    Input: PerformOperation
    Properties:
      Functions:
        - where:
            predicate: [date, '>=', '2020-01-01']
        - select:
          - { col: name, alias: firstName, cast: string }
          - { col: age, cast: integer }
          - { col: gender }

custom

Entrypoint for the custom code block.

custom::python_codeblock

Execute external python function.

Either CustomFunction or CustomCodePath must be set, not both.

Parameters
  • CustomFunction (str) – this will always be a parameter string
  • CustomCodePath (str) – path to the python file with a resolve function
  • CustomProps (dict, optional) – dictionary that will be send to the function
  • Packages (list, optional) – list of extra packages that will be installed
  • Output (list, optional) – list of names of dataframes that will be outputted

The function requires 1 argument that is of type dict. The argument will contain a key dataframes that is of type dict, where the key is the Input name and value is the dataframe. The argument will also contain all CustomProps key and values.

Examples:

There are 2 ways for specifying the python code:

  1. specify a python function
yml_string = """
Parameters:
    PyFunction:
        Description: custom python function

LiftJob:
    LoadInput:
        Type: load::batch_json
        Path: s3://bucket/folder/with/data

    CustomPythonFunction:
        Type: custom::python_codeblock
        Input:
            - LoadInput
        Properties:
            CustomFunction: ${PyFunction}
            CustomProps:
                date: '2020-01-01'
"""


def my_python_function(params:dict) -> DataFrame:
    dataframe = params["dataframes"]["LoadInput"]
    return dataframe.where(F.col("date") == params["date"])


lift(
    spark,
    lift_def=yml_string,
    parameters={
        "PyFunction": my_python_function,
    },
)

2. specify a python file

yml_string = """
LiftJob:
    LoadInput:
        Type: load::batch_json
        Path: s3://bucket/folder/with/data

    CustomPythonFunction:
        Type: custom::python_codeblock
        Input:
            - LoadInput
        Properties:
            CustomCodePath: s3://bucket/path/to/python/module.py
            CustomProps:
                date: '2020-01-01'
            Packages:
                - pytz==2020.1
"""


lift(
    spark,
    lift_def=yml_string,
    parameters={},
)

Multiple outputs

When the custom python function returns a dictionary of outputs and the Output param is set, multiple outputs can be set.

Example:

yml_string = """
Parameters:
    PyFunction:
        Description: custom python function

LiftJob:
    LoadInput:
        Type: load::batch_json
        Path: s3://bucket/folder/with/data

    DateSplit:
        Type: custom::python_codeblock
        Input:
            - LoadInput
        Properties:
            CustomFunction: ${PyFunction}
            Output:
                - Pre2020
                - Post2020

    SavePre2020:
        Type: write::batch_delta
        Input: DateSplit.Pre2020
        Properties:
            Path: s3://bucket/folder/old-data/
            Mode: overwrite

    SavePost2020:
        Type: write::batch_delta
        Input: DateSplit.Post2020
        Properties:
            Path: s3://bucket/folder/new-data/
            Mode: overwrite
"""


def my_python_function(params:dict) -> DataFrame:
    dataframe = params["dataframes"]["LoadInput"]
    return {
        "Pre2020": dataframe.where(F.col("date") < "2020-01-01"),
        "Post2020": dataframe.where(F.col("date") >= "2020-01-01"),
    }


lift(
    spark,
    lift_def=yml_string,
    parameters={
        "PyFunction": my_python_function,
    },
)

custom::sql

Execute SQL statement.

Parameters
  • Statement (str) – the SQL statement to execute

Example:

yml_string = """
Parameters:
    DbName:
        Description: Database name
    TableName:
        Description: Table name

LiftJob:
    OptimizeDb:
        Type: custom::sql
        Properties:
            Statement: OPTIMIZE ${DbName}.${TableName} WHERE date >= current_date() - INTERVAL 7 DAYS ZORDER BY (id)
"""


lift(
    spark,
    lift_def=yml_string,
    parameters={
        "DbName": "default",
        "TableName": "mowers",
    },
)

load

Entrypoint for the load block.

load::batch_csv

Load csv data in batch.

Parameters
  • Path (str) – path to the csv files
  • Options (dict) – options to be passed to the csv reader
  • Alias (str, optional) – an alias for the dataframe that is loaded
SectionName:
    Type: load::batch_csv
    Properties:
        Path: s3://bucket-name/trusted/live
        Options:
            header: True
            inferSchema: True
        Alias: settings

load::batch_delta

Load delta data in batch.

Parameters
  • Path (str) – path to the data files
  • Alias (str, optional) – an alias for the dataframe that is loaded
  • ReadChangeFeed (bool, optional, default: False) – Read delta table change feed
  • StartingTimestamp (str, optional) – To read the changes from a timestamp. The start timestamps are inclusive in the query. this option only has an effect when the ReadChangeFeed: True has been set

Change Data Feed feature details https://docs.databricks.com/delta/delta-change-data-feed.html

SectionName:
    Type: load::batch_delta
    Properties:
        Path: s3://bucket-name/trusted/live
        Alias: settings
        ReadChangeFeed: True
        StartingTimestamp: 2021-11-15

load::batch_json

Load json data in batch.

Parameters
  • Path (str) – path to the data files
  • FileRegistry (str, optional) – name of the fileregistry to use
  • Alias (str, optional) – an alias for the dataframe that is loaded
  • Suffix (str, optional, default: .json) – the suffix of the file
  • JsonSchemaPath (str, optional) – path to a file with schema in json format, if no path is submitted, inferSchema will be set to true
  • JsonSchema (dict, optional) – the file schema in json format, if no schema is submitted, inferSchema will be set to true
  • PySparkSchema (StructType, optional) – the file schema in PySpark StructType format, if no schema is submitted, inferSchema will be set to true
  • Options (dict) – options to be passed to the reader
SectionName:
    Type: load::batch_json
    Properties:
        Path: s3://bucket-name/trusted/live
        FileRegistry: S3DatePrefixScan
        Alias: settings
        Suffix: .json
        JsonSchemaPath: s3://bucket-name/schema.json
        JsonSchema: dict
        PySparkSchema: StructType

load::batch_parquet

Load parquet data in batch.

Parameters
  • Path (str) – path to the data files
  • FileRegistry (str, optional) – name of the fileregistry to use
  • Alias (str, optional) – an alias for the dataframe that is loaded
SectionName:
    Type: load::batch_parquet
    Properties:
        Path: s3://bucket-name/trusted/live
        FileRegistry: S3DatePrefixScan
        Alias: settings

load::batch_xml

Load xml data in batch.

Parameters
  • Path (str) – path to the data files
  • RowTag (str) – the row tag that indicates a new item in the xml structure
  • FileRegistry (str, optional) – name of the fileregistry to use
  • Alias (str, optional) – an alias for the dataframe that is loaded
  • Suffix (str, optional, default: .xml) – the suffix of the file
  • BatchSize (int, optional, default: 200) – the amount of data to process in one go
  • JsonSchemaPath (str, optional) – path to a file with schema in json format, if no path is submitted, inferSchema will be set to true
  • JsonSchema (dict, optional) – the file schema in json format, if no schema is submitted, inferSchema will be set to true
  • PySparkSchema (StructType, optional) – the file schema in PySpark StructType format, if no schema is submitted, inferSchema will be set to true
SectionName:
    Type: load::batch_xml
    Properties:
        Path: s3://bucket-name/trusted/live
        FileRegistry: S3DatePrefixScan
        Alias: settings
        RowTag: employee
        Suffix: .xml
        BatchSize: 200
        JsonSchemaPath: s3://bucket-name/schema.json
        JsonSchema: dict
        PySparkSchema: StructType

load::jdbc

Load data from RDS

Parameters
  • Driver (str) – database driver
  • ConnUrl (str) – connection url to the database server
  • Table (str) – table name with the data
  • User (str) – database user
  • Password (str) – database password
  • Query (str) – SQL query to access the data
  • Alias (str, optional) – an alias for the dataframe that is loaded
SectionName:
    Type: load::jdbc
    Properties:
        Driver: 'org.sqlite.JDBC'
        ConnUrl: ${DBUrl}
        Table: ${DBTable}
        User: ${DBUser}
        Password: ${DBPassword}
        Alias: settings
        Query: >-
           SELECT * FROM TABLE
           WHERE name == me

load::stream_json

Load json data as a stream.

Parameters
  • Path (str) – path to the data files
  • SchemaPath (str) – path to the schema
  • Alias (str, optional) – an alias for the dataframe that is loaded
SectionName:
    Type: load::stream_json
    Properties:
        SchemaPath: ${PathToSchema}
        Path: ${PathToRawFiles}
        Alias: settings

transform

Entrypoint for the generic transform block.

transform::generic

Resolve the transform block.

Parameters

Example:

SectionName:
    Type: transform::generic
    Input: OtherSectionName
    Properties:
        Functions:
            - add_column.date.unixtime_to_utcz:
                from_column: timestamp
                to_column: utcTimestamp
            - add_column.date.year:
                from_column: utcTimestamp
                to_column: year

write

Entrypoint for the write block.

write::batch_delta

Write delta data down to some location.

Parameters
  • Path (str) – location to write to
  • Mode (str) – the mode to write with such as append or overwrite
  • PartitionBy.Columns (list[str], optional) – partition the delta table on one or multiple columns
  • Optimize.Enabled (bool, optional, default: False) – enable optimze on delta table (Only works on databricks)
  • Optimize.ZorderBy (str, optional, default: None) – the column names to optimize on
  • Vacuum.Enabled (bool, optional, default: False) – enable vacuum on delta table (Only works on databricks)
  • Vacuum.RetainHours (int, optional, default: 168) – number of days we keep version, default is 7 days, cannot be set lower
  • Upsert.MergeStatement (str, optional) – statement to merge the new data updates.{col} with the old data source.{col}, this option only has an effect when the Mode: upsert has been set
  • HiveTable.DatabaseName (str) – name of hive table
  • HiveTable.TableName (str) – name of the hive table
  • HiveTable.Schema (str, optional) – schema of the hive table
  • MergeSchema (bool, optional, default: False) – enable delta table mergeSchema option
SectionName:
    Type: write::batch_delta
    Input: OtherSectionName
    Properties:
        Path: s3://path/to/files
        Mode: upsert
        PartitionBy:
            Columns: [year, month, day]
        Optimize:
            Enabled: False
            ZorderBy: column_name, column_name_2
        Vacuum:
            Enabled: False
            RetainHours: 168
        Upsert:
            MergeStatement: source.eventId == updates.eventId
        HiveTable:
            DatabaseName: dbname
            TableName: dbtable
            Schema: >-
                file_path STRING NOT NULL
                date_lifted TIMESTAMP
        MergeSchema: True

write::batch_jdbc

Batch save data with jdbc driver.

Parameters
  • Mode (str) – the mode to write with such as append or overwrite
  • Driver (str) – the driver
  • ConnUrl (str) – the connection url
  • Table (str) – table to write to
  • User (str) – username to database
  • Password (str) – password to database
  • NumPartitions (str) – number of partitions to write with
SectionName:
    Type: write::batch_jdbc
    Input: OtherSectionName
    Properties:
        Mode: append
        Driver: 'org.sqlite.JDBC'
        ConnUrl: 'jdbc:postgresql://localhost:5432/productapi'
        Table: 'table_name'
        User: 'username'
        Password: 'password'
        NumPartitions: 10

write::batch_json

Write delta data down to some location.

Parameters
  • Path (str) – location to write to
  • Mode (str) – the mode to write with such as append or overwrite
  • PartitionBy.Columns (str, optional) – Partition the delta table on one or multiple columns
SectionName:
    Type: write::batch_json
    Input: OtherSectionName
    Properties:
        Path: s3://path/to/files
        Mode: overwrite
        PartitionBy:
            Columns: [year, month, day]

write::batch_mysql_upsert

Batch upsert data with mysql-connector-python python package. This package can be installed with the mysql extra:

pip install getl[mysql]
Parameters
  • Host (str) – the host
  • Port (int) – the port
  • Database (str) – the database
  • User (str) – username to database
  • Password (str) – password to database
  • Table (str) – table to write to
  • Columns (list[str]) – the columns to try and insert from the dataframe, the database columns and dataframe columns bust match
  • ConflictColumns (list[str]) – when conflict on these columns occure, update instead
  • UpdateColumns (list[str], optional) – columns to update in case of a conflict, the default value is all the Columns excluding the ConflictColumns
SectionName:
    Type: write::batch_mysql_upsert
    Input: OtherSectionName
    Properties:
        Host: localhost
        Port: 3306
        Database: productapi
        User: 'username'
        Password: 'password'
        Table: 'table_name'
        Columns: ['file_path', 'count']
        ConflictColumns: ['file_path']
        UpdateColumns: ['count']

write::batch_postgres_upsert

Batch upsert data with psycopg2-binary python package. This package can be installed with the postgres extra:

pip install getl[postgres]
Parameters
  • ConnUrl (str) – the connection url
  • Table (str) – table to write to
  • User (str) – username to database
  • Password (str) – password to database
  • Columns (list[str]) – the columns to try and insert from the dataframe, the database columns and dataframe columns bust match
  • ConflictColumns (list[str]) – when conflict on these columns occure, update instead
  • UpdateColumns (list[str], optional) – columns to update in case of a conflict, the default value is all the Columns excluding the ConflictColumns
SectionName:
    Type: write::batch_psycopg2_upsert
    Input: OtherSectionName
    Properties:
        ConnUrl: 'postgresql://localhost:5432/productapi'
        Table: 'table_name'
        User: 'username'
        Password: 'password'
        Columns: ['file_path', 'count']
        ConflictColumns: ['file_path']
        UpdateColumns: ['count']

write::stream_delta

Write data as a stream to delta files.

Parameters
  • Path (str) – Where to write the data
  • OutputMode (str) – How to write the data
SectionName:
    Type: write::stream_delta
    Input: OtherSectionName
    Properties:
        Path: s3://path/to/files
        OutputMode: overwrite