Tuesday, March 20, 2012

A Generic Normalizer step for Kettle

UPDATE: if you're interested in this article you might be interested in the follow-up article as well: A Generic Normalizer for Pentaho Data integration - Revisited.

Abstract

Kettle (a.k.a. Pentaho Data Integration) offers the standard Row Normalizer step to "unpivot" columns to rows. However, this step requires some configuration which presumes its input stream is static, and its structure is known. In this post, I explain how to construct a simple User-defined java class step that implements a generic Row Normalizer step that can unpivot an arbitrary input stream without manual configuration.

The Row Normalizer step


Kettle (a.k.a. Pentaho Data Integration) offers a standard step to "unpivot" columns to rows. This step is called the Row Normalizer. You can see it in action in the screenshot below:

In the screenshot, the input is a table of columns id, first name, and last name. The output is a table of columns id, fieldname, and value. The id column is preserved, but for each row coming from the input stream, two rows are created in the output stream: 1 for the first name field, and 1 for the last name field.

Essentially the Row Normalizer step in this example is configured to treat the first name and last name fields as a repeating group. The repeating group is untangled by dumping all values for either field in the value column. The fieldname column is used to mark the kind of value: some values are of the "first name field" kind (in case they came from the original first name input field), some are from the "last name field" kind (when the derive from the last name input field).

There are several use cases for the operation performed by the Row normalizer step. It could be used to break down a genuine repeating group in order to create a more normalized dataset. Or you might need to convert a relational dataset into a graph consisting of subject-predicate-object tuples for loading a triple store. Or maybe you want to turn a table into a fine-grained stream of changes for auditing.

The problem

The Row normalizer step works great for streams that have a structure that is known in advance. The structure needs to be known in advance in order to specify those fields that are to be considered as repeating group in the step configuration so they can be broken out into separate kinds.

Sometimes, you don't know the structure of the input stream in advance, or it is just to inconvenient to manually specify it. In these cases, you'd really wish you could somehow unpivot any field that happens to be part of the input stream. In other words, you'd need to have a generic Row Normalizer step.

The Solution

In Kettle, there's always a solution, and often more. Here, I'd like to present a solution to dynamically unpivot an arbitrary input stream using a user-defined java class step.

Below is a screenshot of the step configuration:

This configuration allows the step to take an arbitrary input stream and normalize it into a collection of triples consisting of:
  1. An id column. This column delivers generated integer values, and each distinct value uniquely identifies a single input row.
  2. A fieldnum column. This is a generated integer value that uniquely identifies a field within each input row.
  3. A value column. This is a string column that contains the value that appears in the field identified by the fieldnum column within the row identified by the rownum value.

The Row Normalizer versus the UJDC generic Normalizer

For the input data set mentioned in the initial example, the output generated by this UJDC step is shown below:
There are a few differences with regard to the output of kettle's Row Normalizer step:
  1. One obvious difference is that the Row Normalizer step has the ability to attach names to the values, whereas the UJDC step only delivers a generated field position. One the one hand, it's really nice to have field names. On the other hand, this is also one of the weaknesses of the Row Normalizer step, because providing the names most be done manually.
  2. Another difference is that the UDJC step delivers 3 output rows for each input row, instead of the 2 rows delivered by the Row Normalizer step. The "extra" row is due to the id column. Because the id column is the key of the original input data, the Row Normalizer step was configured to only unpivot the first name and last name fields, keeping the id field unscathed: this allows any downstream steps to see which fields belong to which row. The UDJC step however does not know which field or fields form the key of the input stream. Instead, it generates its own key, the rownum field, and the id field is simply treated like any other field and unpivoted, just like the first name and last name fields. So the main difference is that the downstream steps need to use the generated rownum field to see which fields belong to which row.

The Code

The code and comments are pretty straightforward:
static long rownum = 0;
static RowMetaInterface inputRowMeta;
static long numFields;
static String[] fieldNames;

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
{
// get the current row
Object[] r = getRow();

// If the row object is null, we are done processing.
if (r == null) {
setOutputDone();
return false;
}

// If this is the first row, cache some metadata.
// We will reuse this metadata in processing the rest of the rows.
if (first) {
inputRowMeta = getInputRowMeta();
fieldNames = inputRowMeta.getFieldNames();
numFields = fieldNames.length;
}

// Generate a new id number for the current row.
rownum += 1;

// Generate one output row for each field in the input stream.
int fieldnum;
for (fieldnum = 0; fieldnum < numFields; fieldnum++) {
Object[] outputRow = new Object[3];
outputRow[0] = rownum;
// Assign the field id. Note that we need to cast to long to match Kettle's type system.
outputRow[1] = (long)fieldnum+1;
outputRow[2] = inputRowMeta.getString(r, fieldnum);
putRow(data.outputRowMeta, outputRow);
}

return true;
}

Getting Field information

So the UDJC step only generates a number to identify the field. For some purposes it may be useful to pull in other information about the fields, like their name, data type or data format. While we could do this also directly int the UDJC step by writing more java code, it is easier and more flexible to use some of Kettle's built-in steps:
  1. The Get Metadata Structure step. This step takes an input stream, and generates one row for each distinct field. Each of these rows has a number of columns that describe the field from the input stream. One of the fields is a Position field, which uniquely identifies each field from the input stream using a generated integer, just like the fieldnum field from our UJDC step does.
  2. The stream lookup step. This step allows us to combine the output stream of our UJDC step with the output of the Get Metadata structure step. By matching the Position field of the Get Metadata Structure step with the fieldnum field of the UDJC step, we can lookup any metadata fields that we happen to find useful.


Below is a screenshot that shows how all these steps work together:
And here endeth the lesson.

14 comments:

Sean said...

Great lesson! For someone like me who is not good at Java, this is very useful. I know I can tweak the code if needed (though cannot write from scratch).

Q: Can this be used as a plug-in? So it's available always as a step that can be selected?

Thank you!
Sean

rpbouman said...

Hi Sean, thanks for the kind words!

Actually I think it would be pretty easy to turn this into a plugin. Usually the hard part with that is to come up with a sensible GUI - in this case that could be pretty minimal.

I guess the best advice I can give you on writing a plugin is to check out kettle's source code and examine the existing steps and the dummy plugin that is shipped with kettle. Also, Pentaho Kettle Solutions (http://www.amazon.com/Pentaho-Kettle-Solutions-Building-Integration/dp/0470635177) has a chapter devoted to writing plugins.

AnG said...

isn't ir much more simple to use

outputRow[1] = fieldNames[fieldnum];
instead of additional complications with field name lookup

rpbouman said...

@AnG: good point! I didn't test it but I think you're right.

AnG said...

I have problem with the following:

outputRow[2] = inputRowMeta.getString(r, fieldnum);

if the input data type is not a String then you get empty value.

What can be done for a generic solution for all data types?

rpbouman said...

@AnG: I don't know why the getString method doesn't work - I would've thought it would simply return the string representation appropriate for the value and type.

However, I think you could work around it: If you'd add a output field for each kettle type (there are only a few), you could use something like:

inputRowMeta.getValueMeta(fieldnum).getTypeDesc()

to get the data type name for a particular field, and depending on that you could call one of the other getXXX methods to retrieve the value in that same data type.

See: http://javadoc.pentaho.com/kettle/org/pentaho/di/core/row/RowMetaInterface.html#getValueMeta(int)
and
http://javadoc.pentaho.com/kettle/org/pentaho/di/core/row/ValueMetaInterface.html

AnG said...

I've implemented the same solution, just wanted to know if there might something better. Thanks a lot, your example was very helpful.

Pradeep said...

Hi Roland,

I have a requirement where I am required to change the data types of incoming rows. To be more specific , I need to change Boolean data type to number. We don't know the number of incoming columns as well as their data types in advance hence we can not use a select values step here.

Could you please help me here I need a user defined Java class step.

I don't have a in-depth knowledge in Java

vak said...
This comment has been removed by the author.
rpbouman said...

@vak, it looks like this comment does not belong to this post?

Anonymous said...

Roland - I want to use this method to do field level auditing but I want to encapsulate it in a sub transformation to which I can pass the result rows from any step. In your image of "how all these steps work together", instead of a data grid, the input would need to be dynamic in terms of the number of fields/columns and the data types. Could you possibly provide a hint how to make the input to these steps (in your example, the datagrid) dynamic?

rpbouman said...

@Anonymous, this is an excellent question. I just posted a new article revisiting the normalizer, introducing some improvements and showing how to call the normalizer from another transformation.

Please leave any new comments pertaining these questions on the new post:

http://rpbouman.blogspot.nl/2015/03/a-generic-normalizer-for-pentaho-data.html

anamika said...

Hello Thanks for the explanation. If I want to discard the empty row in resulting output file can we do this by custom java expression?

if (r == null) {
//here can we do anything?
setOutputDone();
return false;
}

rpbouman said...

@anamika,

the call the putRow() is what pushes rows out of the step. So if you decide you do not want to output a particular row, don't call putRow() in that case.

I hope this helps!

kind regards,

Roland.

SAP HANA Trick: DISTINCT STRING_AGG

Nowadays, many SQL implementations offer some form of aggregate string concatenation function. Being an aggregate function, it has the effe...