Optimize ETL Jobs by Clustering and Partitioning in PDI
Pentaho+ Data Integration – Clustering and Partitioning
Problem
Every Pentaho+ ETL administrator deals with some kind of data processing headache at some point in their career.
As data volumes grow, Pentaho+ ETL processes start to take longer to complete. Cue the complaints from business users:
- “Why do our data loads take so long to run?”
- “Why can’t we get our reports out earlier?”
- “Why can’t I see results once I get into office?”
Performance issues are something that many organizations start to see when scaling up their platforms to handle more data.
These issues can be attributed to many different things. Sub-optimal database configurations and indexing strategies are common culprits that don’t show themselves until larger data volumes are processed.
However, performance issues can often be attributed to Pentaho+ ETL code starting to struggle under the load. Ensuring that the design of your Pentaho+ ETL processes is scalable from the beginning will greatly lower the chances that the ETL component of the equation is not the issue.
Imagine we have to make some very complex transformations and finally load a huge amount of data into your target warehouse. How can we make the Pentaho+ ETL to process huge volume of data and load faster?
Solution
You have two solutions to handle this task:
SCALE UP:
Build a strong unique Pentaho+ Data Integration(PDI) server with a lot of RAM and CPU. This unique server will handle all the work by itself.
SCALE OUT:
Create an array of smaller servers. Each of them will handle a small part of the work.
Clustering is scaling out. You divide the global workload and distribute it across many nodes, these smaller tasks will be processed in parallel. The global performance equals the slowest node of your cluster.
If we consider PDI, a Pentaho+ cluster is composed of:
ONE MASTER:
this node is acting like a conductor, assigning the sub-tasks to the slaves and merging the results coming back from the slaves when the subtasks are done.
SLAVES:
from 1 to many. The slaves are the nodes that will really do the job, process the tasks and then send back the results to the master for reconciliation.
The more Pentaho+ Data Integration(PDI) slaves we implement, better the performance.
Partitioning simply splits a data set into a number of subsets according to a rule that is applied on a row of data. This rule can be anything you can come up with and this includes no rule at all. However, if no rule is applied we simply call it (round robin) row distribution.
The reason for partitioning data up is invariably linked to parallel processing since it makes it possible to execute certain tasks in parallel where this is otherwise not possible.
Steps
Assuming all slave servers are up and running as per the steps mentioned in the below
Carte Cluster
A Carte cluster consists of two or more Carte slave servers and a Carte master server.
When you run a transformation, the different parts of it are distributed across Carte slave server nodes for processing, while the Carte master server node tracks the progress.
Configure a Static Carte Cluster
Follow the directions below to set up static Carte slave servers.
Copy over any required JDBC drivers and PDI plugins from your development instances of PDI to the Carte instances.
Run the Carte script with an IP address, hostname, or domain name of this server, and the port number you want it to be available on.
./carte.sh 127.0.0.1 8081
If you will be executing content stored in a DI Repository, copy the repositories.xml file from the .kettle directory on your workstation to the same location on your Carte slave. Without this file, the Carte slave will be unable to connect to the DI Repository to retrieve content.
Ensure that the Carte service is running as intended, accessible from your primary PDI development machines, and that it can run your jobs and transformations.
To start this slave server every time the operating system boots, create a startup or init script to run Carte at boot time with the same options you tested with.
Configure a Dynamic Carte Cluster
This procedure is only necessary for dynamic cluster scenarios in which one Carte server will control multiple slave Carte instances.
The following instructions explain how to create carte-master-config.xml and carte-slave-config.xml files.
You can rename these files if you want, but you must specify the content in the files as per the instructions.
Configure Carte Master Server
Follow the process below to configure the Carte Master Server.
Copy over any required JDBC drivers from your development instances of PDI to the Carte instances.
Create a carte-master-config.xml configuration file using the following example as a template:
<slave_config> <!-- on a master server, the slaveserver node contains information about this Carte instance --> <slaveserver> <name>Master</name> <hostname>yourhostname</hostname> <port>8077</port> <username>cluster</username> <password>cluster</password> <master>Y</master> </slaveserver> </slave_config>
The of the Master server must be unique among all Carte instances in the cluster.
Run the Carte script with the carte-slave-config.xml parameter. Note that if you placed the carte-master-config.xml file in a different directory than the Carte script, you will need to add the path to the file to the command.
./carte.sh carte-master-config.xml
Ensure that the Carte service is running as intended.
To start this master server every time the operating system boots, create a startup or init script to run Carte at boot time.
You now have a Carte master server to use in a dynamic cluster. Next, configure the Carte slave servers.
Configure Carte Slave Servers
Follow the directions below to set up static Carte slave servers.
Follow the process to configure the Carte Master Server.
Make sure the Master server is running.
Copy over any required JDBC drivers from your development instances of PDI to the Carte instances.
In the /pentaho/design-tools/ directory, create a carte-slave-config.xml configuration file using the following example as a template:
<slave_config> <!-- the masters node defines one or more load balancing Carte instances that will manage this slave -- masters> <slaveserver> <name>Master</name> <hostname>yourhostname</hostname> <port>8077</port> <!-- uncomment the next line if you want the DI Server to act as the load balancer --> <!-- <webAppName>pentaho-di </webAppName> --> <username>cluster</username> <password>cluster</password> <master>Y> </slaveserver> </master> <report_to_masters>Y</report_to_masters> <!-- the slaveserver node contains information about this Carte slave instance 1 --> <slaveserver> <name>Slave a </name> <hostname>yourhostname </hostname> <port>8078 </port> <username>cluster </username> <password>cluster </password> <master>N </master> </slaveserver> </slave_config> <!-- the slaveserver node contains information about this Carte slave instance 2 --> <slaveserver> <name>Slave b </name> <hostname>yourhostname </hostname> <port>8079 </port <username>cluster </username> <password>cluster </password> <master>N </master> </slaveserver> </slave_config>
The slave server must be unique among all Carte instances in the cluster.
Save and close the file.
Run the Carte script with the carte-slave-config.xml parameter. Note that if you placed the carte-slave-config.xml file in a different directory than the Carte script, you will need to add the path to the file to the command.
./carte.sh carte-slave-config.xml
If you will be executing content stored in a DI Repository, copy the repositories.xml file from the .kettle directory on your workstation to the same location on your Carte slave. Without this file, the Carte slave will be unable to connect to the DI Repository to retrieve PDI content.
It is easy to verify that the instances are running by pointing a browser at them. The following URL’s can be used to connect carte instances started above.
http://yourhostname:8077
http://yourhostname:8078
http://yourhostname:8079
The default username and password for Carte is cluster/cluster
Define Kettle Cluster Schema
Create a new transformation.
Click on the View tab on the left-hand side and right click on Slave server and choose New.
Add the Carte servers we started earlier on one by one and define one as the slave server. Note the default carte user is cluster and the default password is cluster.
Next right click on Kettle cluster schemas and choose New.
Provide a Schema name and then click on Select slave servers. Mark all of them in the pop-up window and select OK.
Next, we want to make sure that Kettle can connect to all of the carte servers.
Right click on the cluster schema you just created and choose Monitor all slave servers:
For each of the servers Spoon will open a monitoring tab/window. Check the log in each monitoring window for error messages.
Define Clustering For Step
Add a Text input step for example
Right-click on the Text input step and choose Clustering.
In the Cluster schema dialog choose the cluster schema you created earlier on:
Click OK.
Note that the Text input step has a clustering indicator now:
Note: Only the steps that you assign the cluster schema this way will be run on the slave servers.
All other ones will be run on the master server.
Our input dataset:
In this example, all the steps will be executed on the slaves (as indicated by the Cx2).
To run the transformation on our local environment, click the execute button and choose “Execute clustered”
The last option Show transformations is not necessary for running the transformation, but helps to understand how Kettle creates individual transformations for your slave servers and master server in the background.
How to create a partitioning schema
Create a new transformation (or open an existing one). Click on the View tab on the left-hand side and right click on Partition schemas. Choose New:
If we want to define a dynamic schema. Tick Dynamically create the schema definition and set the Number of partitions by slave server to 1:
How to assign the partition schema
Right-click on the step that we want to assign the partition schema to and choose Partitioning.
We will be given following options:
For our purposes we want to choose Remainder of division. In the next dialog choose the partitioning schema you created earlier on:
Next specify which field should be used for partitioning. In our case this is the city field:
That’s it. Now partitioning will be dynamically applied to this step.
Why apply data partitioning on distributed ETL transformation?
As we have 2 slave servers running, the data will be dynamically partitioned into 2 sets based on the city field.
If we don’t use partitioning in our transformation, each slave server would received data in a round robin fashion (randomly), so each data set could contain records for New York in example.
Each slave creates an aggregate and when we combine the data on the master we can possibly end up we two aggregates for New York. T
his would then require an additional sort and aggregation step on the master to arrive at a final clean aggregate.
To avoid this kind of scenario, it is best to define data partitioning, so that each slave server receives a “unique” set of data.
Note, this is just one reason why you should apply partitioning.
No partitioning schema applied:
With partitioning schema applied:
Notice the difference between the two output datasets!
Also note the additional red icon [Dx1] in the above screenshot of the transformation. This indicates that a partitioning schema is applied to this particular step