File registry

A file registry keeps track of what files have been processed, returning files that have not been processed yet.

This is true for each file registry:

  • has a sub block that is annotated as block::sub-block
  • outputs a list of unprocessed files
  • only some load blocks can be configured with a file registry
FileRegistry:
  {BlockName}
    Type: {block::sub-block}
    Properties:
      {Prop}: {Prop}

Example:

FileRegistry:
  S3FullScan:
    Type: fileregistry::s3_full_scan
    Properties:
      BasePath: s3://datalake/file-registry/dateset-a
      UpdateAfter: WriteToService
      HiveDatabaseName: file_registry
      HiveTableName: dataset-a

LiftJob:
  TrustedFiles:
    Type: load::batch_parquet
    Properties:
      Path: ${Path}
      FileRegistry: S3Fullscan

  WriteToService:
    Type: write::batch_delta
    Input: TrustedFiles
    Properties:
      Path: s3://path/to/prefix
      Mode: overwrite 

AWS S3

The following file registrys can only be used with the AWS S3 service.

fileregistry::delta_diff

Retrieve a dataset with the new rows compared from the last time lifted.

Parameters
  • BasePath (str) – s3 prefix to where you want the file registry
  • UpdateAfter (str) – After what lift block should the new files be marked as processed
  • DefaultStartDate (str) – At what date should the file registry start to search for new files (format %Y-%m-%d %H:%M:%S)
  • JoinOnFields (list) – Join on these fields to exclude the existing rows in a previous version

Only available for pyspark 3.0 and above!

DeltaDiff:
    Type: fileregistry::delta_diff
    Properties:
        BasePath: s3://datalake/file-registry/dateset-a
        UpdateAfter: WriteToDatabase
        DefaultStartDate: 2019-01-01
        JoinOnFields: [id, timestamp]

fileregistry::s3_date_prefix_scan

Find all new files in S3 based with a date partition format i.e. YYYY/MM/DD

With the parameter PartitionFormat you can specify multiple different date formats that will be used to scan an S3 prefix.

Take the following example, you can have files stored in the following way on S3 YYYY/MM/DD/HH. If we give the PartitionFormat parameter the value %Y/%m, we use python strftime codes, then it will scan files on a monthly basis. Set it to %Y/%m/%d/%H and it will scan files on a hourly basis instead.

Parameters
  • BasePath (str) – s3 prefix to where you want the file registry
  • UpdateAfter (str) – After what lift block should the new files be marked as processed
  • DefaultStartDate (str) – At what date should the file registry start to search for new files
  • PartitionFormat (str) – Describe how are the files partitioned and how the file registry will search for new files.
  • HiveDatabaseName (str) – The hive database name for the file registry
  • HiveTableName (str) – The hive table name for the file registry
S3DatePrefixScan:
    Type: fileregistry::s3_date_prefix_scan
    Properties:
        BasePath: s3://datalake/file-registry
        UpdateAfter: WriteToDatabase
        DefaultStartDate: 2019-01-01
        PartitionFormat: %Y/%m/d
        HiveDatabaseName: file_registry
        HiveTableName: dataset-a

fileregistry::s3_full_scan

Do a full scan for new files under a prefix in s3

Parameters
  • BasePath (str) – s3 prefix to where you want the file registry
  • UpdateAfter (str) – After what lift block should the new files be marked as processed
  • HiveDatabaseName (str) – The hive database name for the file registry
  • HiveTableName (str) – The hive table name for the file registry
S3FullScan:
    Type: fileregistry::s3_full_scan
    Properties:
        BasePath: s3://datalake/file-registry/dateset-a
        UpdateAfter: WriteToDatabase
        HiveDatabaseName: file_registry
        HiveTableName: dataset-a