Automated workflows for Apache Airflow

Automated workflows for Apache Airflow

Data Warehouse Automation is much broader than the generation and deployment of DDL and ELT code only. One of the area's that should also be automated is run and monitoring. In this area, VaultSpeed chose to integrate with Apache Airflow. Airflow is one of the most extensive and popular workflow & scheduling tools available and VaultSpeed generates the workflows (or DAG's) to run and monitor the execution of loads using Airflow.

Intro

The ultimate goal of building a data hub or data warehouse is to store data and make it accessible to users throughout the organisation. To do that you need to start loading data into it. One of the advantages of data vault 2.0 and the use of hash keys in particular is that objects have almost no loading dependencies. This means that with data vault, your loading process can reach very high degrees of parallelism.
Another advantage is that data can be loaded at multiple speeds. You might want to load some objects in (near) real time. Hourly, daily or even monthly load cycles might be satisfactory for other data sources.
In every case, you need to set up the workflow to load each object. And you need to be able to schedule, host and monitor this loading process.
In this article we will explain how to leverage automation for another important part of the data warehouse: develop, schedule and monitor your workflows.

 

VaultSpeed setup

VaultSpeed generates DDL and ELT code for your data warehouse, data hub, ... Once deployed on the target environment, these components form the backbone of your data ingestion process. The next step is to organize these objects into a proper workflow and build proper Flow Management Control.

Parameters

The first step in this process is to setup some parameters inside VaultSpeed. This setup will influence how the Airflow DAGs are built:

 

FMC parameters

 

  • USE_FMC: Whether or not you will use Flow Management Control, this will cause some extra objects to be generated with your DDL.
  • FMC_DYNAMIC_LOADING_WINDOW: You can choose whether the beginning of the loading window is determined by the end of the last successful run. Or alternatively, you can choose that the load will have a static loading window but will wait for the successful execution of the previous run.
  • SCHEMA_PL_PROCEDURES: schema where your Presentation Layer procedures are stored, this is used when generating code for loading the PL.
  • FMC_GENERATE_SRC_LOADING_SCRIPTS: If enabled, extra tasks will be added to the generated workflow that will transfer data from your source system to the data warehouse.
  • FMC_SKIP_COMPLETED_TASKS: In case of an error you need to restart your load. This parameter indicates whether tasks in the FMC are run only once if they were successful, or if all tasks are rerun upon restarting.
  • SOURCE_PL_DEP: This parameter determines whether the load of the Business Vault & Presentation Layer should wait for the load of a source to be successful or simply completed.

Add workflows

Once you have setup the parameters it is time to generate Airflow code. In our Flow Management Control screen, you can find all workflows and settings. Adding an FMC workflow is quite easy:

 

Adding an FMC Workflow

 

You can generate flows for your Raw Data Vault or Business Vault.
You have to select a load type, data vault and one of your sources. VaultSpeed always builds a single flow for a single source. You can add dependencies between them in Airflow.

Choose a DAG name (A Directed Acyclic Graph (DAG) is the name Airflow uses for a workflow). This name should be unique to this workflow e.g. <dv name>_<source name>_<load type>. Each DAG should also have a description.

We also offer a feature to enable task grouping. When using SQL code and mini batches, this reduces the overhead of creating connections to the DB. So if connecting takes a lot of time compared to actual mapping execution, enabling this option should save you some load time.

Choose a start date, this is the date and time at which your initial load will run, and the start of your first incremental load window.

By setting the concurrency, you can indicate how many tasks in this workflow are allowed to be executed at the same time. This depends on the scalability of the target platform and the resource usage (CPU) of your airflow scheduler and workers (can easily be changed later).

Finally you enter a name for your source and target database connections, these connections will be defined in Airflow later.

When you choose the incremental flow type, you need to set a schedule interval. This is the time between incremental loads, this can be

"@hourly" : Run once an hour at the beginning of the hour
"@daily" : Run once a day at midnight
"@weekly" : Run once a week at midnight on Sunday morning
"@monthly" : Run once a month at midnight of the first day of the month
"@yearly" : Run once a year at midnight of January 1
cron expression:(e.g. "0 0 * * *")
python timedelta() function  (e.g. timedelta(minutes=15))

Generate code

To generate code for a workflow, you will need to pick a version of your generated ELT’s. Select the one for which you want to build your workflow. Hitting "Start Generation", will launch generation of the code.

A few files will be generated in the back, first is a python script containing the actual DAG. Also some JSON files are added containing the rest of the setup.

 

Generating workflows

 

You can always have a look at previous workflow generations, their settings at that time, the data vault and source release, when they were generated and the name of the zipfile containing the generated python code.

Airflow Setup

To get started with Airflow you need to setup a metadata database and install Airflow. Once you have a running Airflow environment you need to finish some setup. Our Vaultspeed FMC plugin for Airflow does most of this for you.

After setting up database connections and adding some variables you can import your newly generated DAGs. You can do this manually, but of course automated deployment from VaultSpeed straight to Airflow (or through a versioning system) is also possible.

 

DAGs in Airflow

 

To start executing loads, start the scheduler and the workers.
When you unpause the freshly imported workflow, your DAG will start running.
Current day and time need to be past the start date in case of the initial load. The incremental load will start after start date + interval.

The picture below shows the DAG we generated in this example. You can clearly see we used the grouping function as it has grouped tasks into four parallel loading sets per layer.

 

Generated Airflow DAG

 

Airflow offers an intuitive interface in which you can monitor workflows, view logs and more. You can get a good overview of execution times and search for outliers. Upon failure you can inspect logs of separate tasks and start debugging.

 

Conclusion

Building a data warehouse, data hub or any other type of integration system requires building data structures and ELT code. But those two components are useless without workflows. You can build them manually, but VaultSpeed offers a solution that is less time consuming and more cost effective.

Automation in VaultSpeed is not limited to code generation and deployment. VaultSpeed also automates your workflows. When considering a standard workflow solution, Apache Airflow is our tool of choice.

About Apache Airflow: It is very well established in the market and is open source. It features an intuitive interface and makes it easy to scale out workers horizontally when you need to execute lots of tasks in parallel. It is easy to run both locally or in a (managed) cloud environment.

With a few steps, you can setup workflow generation, versioning and auto-deployment into your Airflow environment. Once completed, you can start loading data.

VaultSpeed’s FMC workflow generation is sold as a separate module in addition to the standard environment. It targets clients looking for a proper solution to organize workflows in their integration platforms. More details on pricing for this automation module are available upon request.

 

 

 

 


Multi-Active Satellites and Related Links

Multi-Active Satellites and Related Links

by Dirk Vermeiren, CTO - VaultSpeed

Introduction

The power of VaultSpeed lies in the extended pre-defined templates that generate code based on a number of configuration parameters. In case you work with a template based solution, you will have to solve these extended issues inside the templates you build. Additionally you will have to test and maintain them.
In a number of blogs we want to address some complex challenges and how a solution is provided out of the box by VaultSpeed.

This blog describes the challenges and the solution for the implementation of Links between Multi-Active Satellites with subsequence source attribute(s).

The challenge described in this post is twofold: First we need to define the level of detail for the link. Second, we also need to decide about what happens when a delete occurs on the source. Do we choose for the link record to be “virtually” deleted or not?

What is a Multi-Active Satellite ?

A Multi-Active Satellite is a Satellite that can have multiple active records for the same business key. There can be different types of Multi-Active Satellites :

A Multi-Active Satellite with subsequence source attribute(s) :

In this case there is an attribute in the source object that will make the data in the Satellite unique again. The combination of the Business Key(s), subsequence attribute(s) and load date will be the unique key of the Satellite.
A language code for example can be a subsequence attribute in a source system.

A Multi-Active Satellite without a subsequence source attribute

This is a less common type but it means there is no unique source attribute. In this case we intodruce a generated sequence number per Business Key to ensure the combination of the Business Key(s), the generated subsequence attribute and load date is unique.

This blog describes one of the scenario’s possible when combining Multi-Active Satellites with Links. It explains how VaultSpeed’s logic solves these challenges as a standard functionality. Just by setting a few parameters.

The case uses data models describing a fictive motor bike company. An example that is commonly used in VaultSpeed demo’s or documentation.

The Case

Sources

The challenges are explained using 2 Multi-Active Objects within the VaultSpeed MOTO SALES demo source. These objects are the Product_Features and Product_Features_Category tables as shown on Figure 1. You can see some example records on Figure 2 for Product Features and Figure 3 for Product Feature Categories.

 

Figure 1
The Language codes in both of the objects are the source Subsequence attributes and there is a foreign key from the product feature object to the product feature category object based on the product_feature_cat_id attribute.

 

Figure 2
Product Features
Figure 3
Product Feature Categories

 

The source case has the following characteristics :

  • The Subsequence Attribute is a source attribute
    • In both tables the Business Key Attribute (indicated in Red) is not the same as the Primary Key attribute (indicated in Blue).
      It means that the relationship is not based on the Business Key but on other attributes. A lookup is needed for building the Link HashKey as described in a previous blog on our website.
  • The Primary key is at a lower level of detail than the business Key in both the source objects, product_feature and product_feature_cat
    • The rows indicated in Blue show the primary key, the rows indicated in Red, contain the Business Key.
      As you can see, for the same Business Key value you have multiple primary Key Values.
      It means that for a lookup via the Business key, multiple primary keys will be returned. The logic must be able to cope with it.
  • Both Objects in the relationship are Multi-Active Objects
    • The product_feature object as well as the product_feature_cat object are multi-active objects. A lookup of a business key for the link can result in multiple records being returned.

Raw Data Vault

  • The Raw Data Vault uses the INSERT ONLY LOGIC approach
    With INSERT ONLY LOGIC, there are NO load_end_dates and the only actions allowed are inserts. This means that a DELETE results in the insert of a new record with the same attribute values as the previous record and the DELETE_FLAG is set equal to Y

Setup in VaultSpeed

The Setup of the source and the Raw Data Vault for the above implementation of Source and Data Vault is very simple and requires changing 1 parameter, setting 1 switch per object and indicating 1 subsequence attribute per source object.

  • Build the Raw Data Vault using INSERT ONLY LOGIC
    VaultSpeed System Parameter : INSERT_ONLY_LOGIC = Y
  • The Product_Features and Product_Features Category Source Object are Multi-Active.
    Set VaultSpeed Switch Multi-Active to YES
    using the Graphical Source Editor. This indicates that the Product Feature Source Object is a Multi-Active Satellite (Figure 4).

    • By right clicking on the product feature and product feature class Language attributes you can set their attribute types to “Subsequence Attribute” to indicate that that they are a source based Subsequence Attribute (Figure 5).

 

Figure 4
Figure 5

Links between Multi-Active Objects

We encounter a number of specific choices and challenges that need to be covered by the loading logic for the Links between Multi-Active Objects:

Subsequence in Link or Satellite on Link ?

Link

No: the Subsequence Attribute is a part of the Key of the Satellite, but it is not a part of the Business key. So when building the Link you just load the link object with the unique combinations of the hash on the business keys of the relationship.

Satellite On Link

Here you could make the choice to add the SubSequence number to the Satellite on Link. In our example, we created a link between 2 Hubs with Multi-Active Satellites.

In this case, if you take the Subsequence Attribute in account in the Satellite on Hub you would get the cartesian product of all the combinations of all the subsequence attributes values of a Link key as shown on figure 8. The example case would have been even more compelling if the Multi-Active data had different Subsequence Attributes.

Considering this, VaultSpeed has chosen not to include the Subsequence Attributes in the Satellite on Link.

 

Figure 8

What about the Delete logic ?

When no subsequence attribute is present in the Satellite on Link there is some special management required when handling deletes on the Link Satellite.

A delete flag for the Satellite on Link can be placed to ‘Y’ only if all the multi-active records have been deleted. So not when only part of the multi-active records are being deleted.

We need to build a join with the Satellite that owns the data at the foreign key side of the relationship to ensure all the records are really deleted. Only then the delete record with delete_flag = Y can be inserted in the Satellite on Link (This insert is because of INSERT ONLY logic).

The logic explained


,dist_del_stg as
(
SELECT 
distinct product_features_hkey
FROM moto_stg_ms_tst_ref_scn1.stg_product_features stg1 
WHERE operation = 'D'
)

DIST_DEL_STG selects the delete records arriving from staging. They are used to limit the staging content and the Satellite content to the new records that have deletes.



,find_del_and_related_stg_rec as 
(
SELECT 
stg1.product_features_hkey,
stg1.pro_fea_lan_code_seq,
stg1.load_date,
stg1.load_cycle_id,
stg1.trans_timestamp,
stg1.operation,
stg1.error_code_fkprfe_pfca 
FROM moto_stg_ms_tst_ref_scn1.stg_product_features stg1
     JOIN dist_del_stg dds 
          ON dds.product_features_hkey = stg1.product_features_hkey
)

FIND_DEL_AND_RELATED_STG_REC selects the Delete record data from staging.



,find_del_related_sat_rec as 
(
SELECT 
sat.product_features_hkey,
sat.pro_fea_lan_code_seq, 
sat.load_date, 
sat.load_cycle_id,
sat.trans_timestamp,
sat.delete_flag,
sat.load_end_date,
max(load_end_date) 
          over (partition by sat.product_features_hkey,
                             sat.pro_fea_lan_code_seq
                order by     sat.load_date) 
       as max_load_end_date 
FROM moto_fl_tst_ref_scn1.sat_ms_product_features sat 
     JOIN dist_del_stg dds 
          ON dds.product_features_hkey = sat.product_features_hkey 
     LEFT OUTER JOIN moto_stg_ms_tst_ref_scn1.stg_product_features stg1 
          ON stg1.product_features_hkey = sat.product_features_hkey 
             AND stg1.pro_fea_lan_code_seq = sat.pro_fea_lan_code_seq 
             AND stg1.load_date = sat.load_date 
WHERE stg1.product_features_hkey is NULL 
      AND stg1.pro_fea_lan_code_seq IS NULL 
      AND stg1.load_date IS NULL
      AND sat.delete_flag = ‘N’
) 

FIND_DEL_RELATED_SAT_REC selects the records from the foreign key related satellite which have deletes in the incoming data. We need the MAX(Load_date) to find the active record in an INSERT ONLY Satellite. Also notice that the PARTITION BY is on the Hash Key AND the Subsequence Attribute as this is the unique key for a record in combination with the load_date.



,find_active_del_related_sat_rec as 
(
SELECT
sat.product_features_hkey,
sat.pro_fea_lan_code_seq,
sat.load_date,
sat.load_cycle_id,
sat.trans_timestamp,
sat.delete_flag,
sat.load_end_date
FROM find_del_related_sat_rec sat 
WHERE sat.load_end_date = sat.max_load_end_date
) 

FIND_ACTIVE_DEL_RELATED_SAT_REC does the selection of only the active records based on the MAX_LOAD_DATE calculated by the analytical function in FIND_DEL_RELATED_SAT_REC.



,not_deleted_records_lks AS 
(
SELECT 
COALESCE(fdsat.product_features_hkey, fdstg.product_features_hkey) product_features_hkey,
COUNT(1) count_existing_active 
FROM find_active_del_related_sat_rec fdsat
     FULL OUTER JOIN find_del_and_related_stg_rec fdstg 
          ON fdstg.product_features_hkey = fdsat.product_features_hkey 
             AND fdstg.pro_fea_lan_code_seq = fdsat.pro_fea_lan_code_seq 
WHERE (fdstg.product_features_hkey is null 
       OR fdsat.product_features_hkey is null)
GROUP BY COALESCE(fdsat.product_features_hkey, fdstg.product_features_hkey)
)


NOT_DELETED_RECORD then calculates how many active records there still are in the Satellite based on the incoming data with delete records.

The FULL OUTER JOIN is very important as there could be INSERT records after the DELETE in staging that need to be taken taken into account.

The logic of the Delete Flag that needs to be used in further logic is than shown below.

CASE WHEN COALESCE (NOT_DELETED_RECORDS_LKS.count_existing_active,0) = 0 
AND stg.operation = 'D' THEN 'Y'::text ELSE 'N'::text END delete_flag

If all the incoming records match with the still active records in the Satellite than the count = 0 and the Delete flag of the Satellite on Link can be set equal to Y otherwise the count > 0 and then the delete_flag will be set to “N”.

Conclusion

Through some simple parameterisation and a few manipulations inside the Graphical source Editor in VaultSpeed you get the full support of this feature in the Raw Data Vault layer of your integration system.

The choices made in this setup and their logical consequences were carefully analysed and all necessary logic to support all possible cases were subsequently implemented inside the standard templates of VaultSpeed.

The complexity is hidden for customers and delivers high value to with easy, out of the box configuration and implementation.

 

 


VAULTSPEED Alternative To The Driving Key Implementation In Data Vault 2.0

Alternative To The Driving Key Implementation In Data Vault 2.0

Back in 2017 we introduced the link structure with an example of a Data Vault model in the banking industry. We showed how the model looks like when a link represents either a relationship or a transaction between two business objects. A link can also connect more than two hubs. Furthermore, there is a special case when a part of the hub references stored in a link can change without describing a different relation. This has a great impact on the link satellites. What is the alternative to the Driving Key implementation in Data Vault 2.0?

THE DRIVING KEY

A relation or transaction is often identified by a combination of business keys in one source system. In Data Vault 2.0 this is modelled as a normal link connecting multiple hubs each containing a business key. A link contains also its own hash key, which is calculated over the combination of all parents business keys. So when the link connects four hubs and one business key changes, the new record will show a new link hash key. There is a problem when four business keys describe the relation, but only three of them identify it unique. We can not identify the business object by using only the hash key of the link. The problem is not a modeling error, but we have to identify the correct record in the related satellite when query the data. In Data Vault 2.0 this is called a driving key. It is a consistent key in the relationship and often the primary keys in the source system.

The following tables demonstrate the relationship between an employee and a department from a source system.

Table 1: employee-department relationship

 

The following Data Vault model can be derived from this source structure.

Figure 1: Data Vault model

 

The link table “Empl_Dep” is derived from the table “Employee” in the source system. The Driving Key in this example is the Employee_Number as it is the primary key in the source table, and an employee can work in only one department at the same time. This means, the true Driving Key “lives” in the satellite of the employee. If the department of an employee switches, there is no additional record in the employee’s satellite table, but a new one in the link table, what is legitimate.

Table 2: link data

 

To query the most recent delta you have to query it from the link table, grouped by the driving key.
To sum up you will always have a new link hash key when a business key changes in a relation. The challenge is to identify the driving key, which is a unique business key (or a combination of business keys) for the relationship between the connected hubs. Sometimes you would have to add an additional attribute to get a unique identifier.

Both present an issue for power users with access to the Data Vault model. Without naming conventions there is a risk that a group by statement is performed on more attributes than just the driving key which would lead to unexpected and incorrect aggregate values – even though the data itself is correctly modelled.

When dealing with transactions (e.g. the flight data from our book) there is a better solution available than the driving key: we typically prefer to model such data as a non-historized link and insert technical counter-transactions to the data when a hub reference changes.
In the case of a modified record in the source, we insert two records to the non-historized links: one for the new version of the modified record in the source and one for the old version that still exists in the target (non-historized link) but needs to be countered now – the technical counter record. To distinguish the records from the source and the counter transactions a new column is inserted, often called “Counter”.

The standard value for this counter attribute is 1 for records from the source and -1 for the technical counter transactions. Important: We do not perform any update statements, we still insert only the new counter records. When querying the measures from the satellite you just multiply the measures with the counter value.

Table 3: Link Connection with counter attribute

 

The table 3 shows a link with a counter attribute. When a record changes in the source system it is inserted with the original value and a counter value of -1 in the link table of the data warehouse. For the changed value there is a new link hash key which is also calculated over the descriptive attribute ‘Salary’. The counter value of the new record is 1.

CONCLUSION

Because identifying the driving key of a relation can be a problem in some situations you can use an alternative solution to avoid the driving key. All changes and deletes are tracked using a counter attribute in the non-historized link table. It stores also the descriptive attributes and the link hash key is calculated over all attributes.


VAULTSPEED More advanced Link Object Staging Logic

More advanced Link Object Staging Logic

When the source relationship is not based on the business keys

Note: This is a fictive example, created to explain a case. It is used only to illustrate the more advanced logic that is necessary in the staging layer. It explains how to solve the relationship and load the link table when the foreign/primary key relationship between the source objects is not based on the Business Key.

The relational source model as shown in the image above shows the relationship between an Invoice source table and a Customer table.

The Business Keys are indicated by the U-sign in front of the column. If they do not exist, then the Business Key is indicated by the Unique key. In this model, this means:

  • For invoices, the Business Key is INVOICE_NUMBER
  • For customers, the Business Key is a composite key that consists of these columns:
    • FIRST_NAME
    • LAST_NAME
    • BIRTH_DATE
    • GENDER

The relationship between the 2 tables in this model is based on the customer_id, which is the technical key (surrogate key). It is not based on the business key, meaning the combination of First_name, Last_name,Birth_date and Gender.

As a result:

  • The link table can’t be loaded by using the Invoices table only – where the relationship exists. A lookup is needed to find the Business Keys in the Customers table.
  • Not all the records to solve the relationships will be found during the lookup because the data in the landing area holds incremental data. This means that a lookup in the Customer Hub is necessary to find the missing related records and their Business Keys.

The following image illustrates the lookups that are needed to resolve the link between Invoices and Customers. In the Invoices landing table there are two incremental records which have 5 and 7 as the customer-id. When the lookup is done in the Customers landing table it only holds the record with the Customer_Number = 5, meaning that the record with Customer_Number = 7 is not in the landing area during this incremental load.

The only way to solve this is to do the lookup to the Satellite, on the condition that the Satellite holds the primary key of the Customers table = Customer_Number. Through the lookup to the HUB using the Hash key, the Business Keys can be found resulting in a fully solved Link table.

The following image shows the SQL used to load the invoices Staging table and used to resolve the Link table between Invoices and Customers in an Oracle Database prior to 12C.

Now what if this technical key was also put into the Hub? This would result in one less lookup from the Satellite to the Hub. However, Hubs should only contain Business Keys. It is good practice to put the technical key in the Satellite but make it a special attribute that does not take part in the Change of History. This means it will not be a part of the Hash Difference Attribute calculation. 

The target Data Vault model for this article is shown below