Lift job blocks
The lift job is defined by one or more lift blocks. The types of supported blocks are the following:
- custom - Custom codeblocks that can contain any kind of spark code
- load - Blocks that can load data such as xml, json, delta etc.
- transform - Transform data with functions as alias, where and concat
- write - Write data down as delta or to databases
This is true for each block:
- has a sub block that is annotated as
block::sub-block
- outputs one dataframe
- needs a input dataframe (exception is the load block)
LiftJob:
{BlockName}
Type: {block::sub-block}
Input: {BlockInput}
Properties:
{Prop}: {Prop}
Example:
LiftJob:
RawFiles:
Type: load::batch_parquet
Properties:
Path: s3://bucket/path/to/data
TransformBlock:
Type: transform::generic
Input: PerformOperation
Properties:
Functions:
- where:
predicate: [date, '>=', '2020-01-01']
- select:
- { col: name, alias: firstName, cast: string }
- { col: age, cast: integer }
- { col: gender }
custom
Entrypoint for the custom code block.
custom::python_codeblock
Execute external python function.
Either CustomFunction or CustomCodePath must be set, not both.
- Parameters
-
- CustomFunction (str) – this will always be a parameter string
- CustomCodePath (str) – path to the python file with a
resolve
function - CustomProps (dict, optional) – dictionary that will be send to the function
- Packages (list, optional) – list of extra packages that will be installed
- Output (list, optional) – list of names of dataframes that will be outputted
The function requires 1 argument that is of type dict
.
The argument will contain a key dataframes
that is of type dict
,
where the key is the Input name and value is the dataframe.
The argument will also contain all CustomProps
key and values.
Examples:
There are 2 ways for specifying the python code:
- specify a python function
yml_string = """
Parameters:
PyFunction:
Description: custom python function
LiftJob:
LoadInput:
Type: load::batch_json
Path: s3://bucket/folder/with/data
CustomPythonFunction:
Type: custom::python_codeblock
Input:
- LoadInput
Properties:
CustomFunction: ${PyFunction}
CustomProps:
date: '2020-01-01'
"""
def my_python_function(params:dict) -> DataFrame:
dataframe = params["dataframes"]["LoadInput"]
return dataframe.where(F.col("date") == params["date"])
lift(
spark,
lift_def=yml_string,
parameters={
"PyFunction": my_python_function,
},
)
2. specify a python file
yml_string = """
LiftJob:
LoadInput:
Type: load::batch_json
Path: s3://bucket/folder/with/data
CustomPythonFunction:
Type: custom::python_codeblock
Input:
- LoadInput
Properties:
CustomCodePath: s3://bucket/path/to/python/module.py
CustomProps:
date: '2020-01-01'
Packages:
- pytz==2020.1
"""
lift(
spark,
lift_def=yml_string,
parameters={},
)
Multiple outputs
When the custom python function returns a dictionary of outputs and the Output param is set, multiple outputs can be set.
Example:
yml_string = """
Parameters:
PyFunction:
Description: custom python function
LiftJob:
LoadInput:
Type: load::batch_json
Path: s3://bucket/folder/with/data
DateSplit:
Type: custom::python_codeblock
Input:
- LoadInput
Properties:
CustomFunction: ${PyFunction}
Output:
- Pre2020
- Post2020
SavePre2020:
Type: write::batch_delta
Input: DateSplit.Pre2020
Properties:
Path: s3://bucket/folder/old-data/
Mode: overwrite
SavePost2020:
Type: write::batch_delta
Input: DateSplit.Post2020
Properties:
Path: s3://bucket/folder/new-data/
Mode: overwrite
"""
def my_python_function(params:dict) -> DataFrame:
dataframe = params["dataframes"]["LoadInput"]
return {
"Pre2020": dataframe.where(F.col("date") < "2020-01-01"),
"Post2020": dataframe.where(F.col("date") >= "2020-01-01"),
}
lift(
spark,
lift_def=yml_string,
parameters={
"PyFunction": my_python_function,
},
)
custom::sql
Execute SQL statement.
- Parameters
-
- Statement (str) – the SQL statement to execute
Example:
yml_string = """
Parameters:
DbName:
Description: Database name
TableName:
Description: Table name
LiftJob:
OptimizeDb:
Type: custom::sql
Properties:
Statement: OPTIMIZE ${DbName}.${TableName} WHERE date >= current_date() - INTERVAL 7 DAYS ZORDER BY (id)
"""
lift(
spark,
lift_def=yml_string,
parameters={
"DbName": "default",
"TableName": "mowers",
},
)
load
Entrypoint for the load block.
load::batch_csv
Load csv data in batch.
- Parameters
-
- Path (str) – path to the csv files
- Options (dict) – options to be passed to the csv reader
- Alias (str, optional) – an alias for the dataframe that is loaded
SectionName:
Type: load::batch_csv
Properties:
Path: s3://bucket-name/trusted/live
Options:
header: True
inferSchema: True
Alias: settings
load::batch_delta
Load delta data in batch.
- Parameters
-
- Path (str) – path to the data files
- Alias (str, optional) – an alias for the dataframe that is loaded
- ReadChangeFeed (bool, optional, default: False) – Read delta table change feed
- StartingTimestamp (str, optional) – To read the changes from a timestamp. The start timestamps are inclusive in the query.
this option only has an effect when the
ReadChangeFeed: True
has been set
Change Data Feed feature details https://docs.databricks.com/delta/delta-change-data-feed.html
SectionName:
Type: load::batch_delta
Properties:
Path: s3://bucket-name/trusted/live
Alias: settings
ReadChangeFeed: True
StartingTimestamp: 2021-11-15
load::batch_json
Load json data in batch.
- Parameters
-
- Path (str) – path to the data files
- FileRegistry (str, optional) – name of the fileregistry to use
- Alias (str, optional) – an alias for the dataframe that is loaded
- Suffix (str, optional, default: .json) – the suffix of the file
- JsonSchemaPath (str, optional) – path to a file with schema in json format, if no path is submitted, inferSchema will be set to true
- JsonSchema (dict, optional) – the file schema in json format, if no schema is submitted, inferSchema will be set to true
- PySparkSchema (StructType, optional) – the file schema in PySpark StructType format, if no schema is submitted, inferSchema will be set to true
- Options (dict) – options to be passed to the reader
SectionName:
Type: load::batch_json
Properties:
Path: s3://bucket-name/trusted/live
FileRegistry: S3DatePrefixScan
Alias: settings
Suffix: .json
JsonSchemaPath: s3://bucket-name/schema.json
JsonSchema: dict
PySparkSchema: StructType
load::batch_parquet
Load parquet data in batch.
- Parameters
-
- Path (str) – path to the data files
- FileRegistry (str, optional) – name of the fileregistry to use
- Alias (str, optional) – an alias for the dataframe that is loaded
SectionName:
Type: load::batch_parquet
Properties:
Path: s3://bucket-name/trusted/live
FileRegistry: S3DatePrefixScan
Alias: settings
load::batch_xml
Load xml data in batch.
- Parameters
-
- Path (str) – path to the data files
- RowTag (str) – the row tag that indicates a new item in the xml structure
- FileRegistry (str, optional) – name of the fileregistry to use
- Alias (str, optional) – an alias for the dataframe that is loaded
- Suffix (str, optional, default: .xml) – the suffix of the file
- BatchSize (int, optional, default: 200) – the amount of data to process in one go
- JsonSchemaPath (str, optional) – path to a file with schema in json format, if no path is submitted, inferSchema will be set to true
- JsonSchema (dict, optional) – the file schema in json format, if no schema is submitted, inferSchema will be set to true
- PySparkSchema (StructType, optional) – the file schema in PySpark StructType format, if no schema is submitted, inferSchema will be set to true
SectionName:
Type: load::batch_xml
Properties:
Path: s3://bucket-name/trusted/live
FileRegistry: S3DatePrefixScan
Alias: settings
RowTag: employee
Suffix: .xml
BatchSize: 200
JsonSchemaPath: s3://bucket-name/schema.json
JsonSchema: dict
PySparkSchema: StructType
load::jdbc
Load data from RDS
- Parameters
-
- Driver (str) – database driver
- ConnUrl (str) – connection url to the database server
- Table (str) – table name with the data
- User (str) – database user
- Password (str) – database password
- Query (str) – SQL query to access the data
- Alias (str, optional) – an alias for the dataframe that is loaded
SectionName:
Type: load::jdbc
Properties:
Driver: 'org.sqlite.JDBC'
ConnUrl: ${DBUrl}
Table: ${DBTable}
User: ${DBUser}
Password: ${DBPassword}
Alias: settings
Query: >-
SELECT * FROM TABLE
WHERE name == me
load::stream_json
Load json data as a stream.
- Parameters
-
- Path (str) – path to the data files
- SchemaPath (str) – path to the schema
- Alias (str, optional) – an alias for the dataframe that is loaded
SectionName:
Type: load::stream_json
Properties:
SchemaPath: ${PathToSchema}
Path: ${PathToRawFiles}
Alias: settings
transform
Entrypoint for the generic transform block.
transform::generic
Resolve the transform block.
- Parameters
-
- Functions (list) – a list of transform operators defined in transform definitions
Example:
SectionName:
Type: transform::generic
Input: OtherSectionName
Properties:
Functions:
- add_column.date.unixtime_to_utcz:
from_column: timestamp
to_column: utcTimestamp
- add_column.date.year:
from_column: utcTimestamp
to_column: year
write
Entrypoint for the write block.
write::batch_delta
Write delta data down to some location.
- Parameters
-
- Path (str) – location to write to
- Mode (str) – the mode to write with such as append or overwrite
- PartitionBy.Columns (list[str], optional) – partition the delta table on one or multiple columns
- Optimize.Enabled (bool, optional, default: False) – enable optimze on delta table (Only works on databricks)
- Optimize.ZorderBy (str, optional, default: None) – the column names to optimize on
- Vacuum.Enabled (bool, optional, default: False) – enable vacuum on delta table (Only works on databricks)
- Vacuum.RetainHours (int, optional, default: 168) – number of days we keep version, default is 7 days, cannot be set lower
- Upsert.MergeStatement (str, optional) – statement to merge the new data
updates.{col}
with the old datasource.{col}
, this option only has an effect when theMode: upsert
has been set - HiveTable.DatabaseName (str) – name of hive table
- HiveTable.TableName (str) – name of the hive table
- HiveTable.Schema (str, optional) – schema of the hive table
- MergeSchema (bool, optional, default: False) – enable delta table mergeSchema option
SectionName:
Type: write::batch_delta
Input: OtherSectionName
Properties:
Path: s3://path/to/files
Mode: upsert
PartitionBy:
Columns: [year, month, day]
Optimize:
Enabled: False
ZorderBy: column_name, column_name_2
Vacuum:
Enabled: False
RetainHours: 168
Upsert:
MergeStatement: source.eventId == updates.eventId
HiveTable:
DatabaseName: dbname
TableName: dbtable
Schema: >-
file_path STRING NOT NULL
date_lifted TIMESTAMP
MergeSchema: True
write::batch_jdbc
Batch save data with jdbc driver.
- Parameters
-
- Mode (str) – the mode to write with such as append or overwrite
- Driver (str) – the driver
- ConnUrl (str) – the connection url
- Table (str) – table to write to
- User (str) – username to database
- Password (str) – password to database
- NumPartitions (str) – number of partitions to write with
SectionName:
Type: write::batch_jdbc
Input: OtherSectionName
Properties:
Mode: append
Driver: 'org.sqlite.JDBC'
ConnUrl: 'jdbc:postgresql://localhost:5432/productapi'
Table: 'table_name'
User: 'username'
Password: 'password'
NumPartitions: 10
write::batch_json
Write delta data down to some location.
- Parameters
-
- Path (str) – location to write to
- Mode (str) – the mode to write with such as append or overwrite
- PartitionBy.Columns (str, optional) – Partition the delta table on one or multiple columns
SectionName:
Type: write::batch_json
Input: OtherSectionName
Properties:
Path: s3://path/to/files
Mode: overwrite
PartitionBy:
Columns: [year, month, day]
write::batch_mysql_upsert
Batch upsert data with mysql-connector-python python package. This package can be installed with the mysql extra:
pip install getl[mysql]
- Parameters
-
- Host (str) – the host
- Port (int) – the port
- Database (str) – the database
- User (str) – username to database
- Password (str) – password to database
- Table (str) – table to write to
- Columns (list[str]) – the columns to try and insert from the dataframe, the database columns and dataframe columns bust match
- ConflictColumns (list[str]) – when conflict on these columns occure, update instead
- UpdateColumns (list[str], optional) – columns to update in case of a conflict, the default value is all the Columns excluding the ConflictColumns
SectionName:
Type: write::batch_mysql_upsert
Input: OtherSectionName
Properties:
Host: localhost
Port: 3306
Database: productapi
User: 'username'
Password: 'password'
Table: 'table_name'
Columns: ['file_path', 'count']
ConflictColumns: ['file_path']
UpdateColumns: ['count']
write::batch_postgres_upsert
Batch upsert data with psycopg2-binary python package. This package can be installed with the postgres extra:
pip install getl[postgres]
- Parameters
-
- ConnUrl (str) – the connection url
- Table (str) – table to write to
- User (str) – username to database
- Password (str) – password to database
- Columns (list[str]) – the columns to try and insert from the dataframe, the database columns and dataframe columns bust match
- ConflictColumns (list[str]) – when conflict on these columns occure, update instead
- UpdateColumns (list[str], optional) – columns to update in case of a conflict, the default value is all the Columns excluding the ConflictColumns
SectionName:
Type: write::batch_psycopg2_upsert
Input: OtherSectionName
Properties:
ConnUrl: 'postgresql://localhost:5432/productapi'
Table: 'table_name'
User: 'username'
Password: 'password'
Columns: ['file_path', 'count']
ConflictColumns: ['file_path']
UpdateColumns: ['count']
write::stream_delta
Write data as a stream to delta files.
- Parameters
-
- Path (str) – Where to write the data
- OutputMode (str) – How to write the data
SectionName:
Type: write::stream_delta
Input: OtherSectionName
Properties:
Path: s3://path/to/files
OutputMode: overwrite