Chapter 151. DQP for service developers

151.1. Writing DQP scalar functions
151.2. Writing DQP aggregate functions
151.2.1. Average function including support for BigDecimal
151.3. Deploying a DQP function

One of the key aims of the design of OGSA-DAI's DQP functionality is to make it as extensible as possible to allow developers to customise it to their own requirements.

Key extensibility points include:

Unfortunately it is not been possible to document all these features prior to the release of OGSA-DAI 3.2.2. Anybody wishing to utilise this functionality is strongly encouraged to contact the OGSA-DAI team for support (Chapter 4, Information, help and support) and visit our SourceForge site where this user doc is provided online and will be updated as we write additional material. The online user doc is at: http://sourceforge.net/apps/trac/ogsa-dai/wiki/UserDocumentation

151.1. Writing DQP scalar functions

A scalar function is a function that takes as input a number of attribute values and returns a single attribute value. This section shows how to write a scalar function by writing a, rather pointless, function to add two numbers (you can always do this in SQL by writing a + b").

To write a scalar function write a class that extends uk.org.ogsadai.dqp.lqp.udf.LogicalExecutableFunctionBase.

We now have a number of methods to write. The first of these is the default constructor. This must call the super class and specify the number of input parameters. For our function, called Add, this will be 2.

public Add()
{
   super(2);
}   

Next we have to write getName which returns the name of the function. This will be the name as it must be used in the SQL.

public String getName()
{
    return "Add";
}   

Next we write a function to say what type of function we are implementing. Here it is a scalar function:

public FunctionType getType()
{
    return FunctionType.UDF_SCALAR;
}   

Next we write the configure method that is given the type of the parameters and throws an exception if the types are incorrect.Here we accept all the numeric types for both inputs:

public void configure(int... types) throws TypeMismatchException
{
    for( int i=0; i<2; ++i)
    {
        switch(types[i])
        {
            case TupleTypes._SHORT:
            case TupleTypes._LONG:
            case TupleTypes._INT:
            case TupleTypes._DOUBLE:
            case TupleTypes._FLOAT:
            case TupleTypes._BIGDECIMAL:
                break;
            default:  
                throw new TypeMismatchException(types[i]);
        }
    }
}   

Next we must write another constructor. This constructor takes one argument which is another instance of the class being written, in this case Add. This constructor is used to make copies of the function when constructing and optimising query plans. This constructor must generate a new instance that is in the same state as the parameter instance was immediately after the configure method was called. In the case of Add the configure method does not alter the instance state so our constructor is very simple:

public void Add(Add add)
{
    this();
}
}   

Next we write a function that returns the output type. This method will be called after configure has been called so it is possible to have an output type that depends on the input types. But here we will always use double:

public int getOutputType()
{
    case TupleTypes._DOUBLE;
}   

Next we have to write the put method which will receive the values. We must remember to handle Null values. Note this method does not return the result. There is another method for that, so we store the result in the mResult member variable:

public void put(Object... parameters)
{
    if (parameters[0] == Null.VALUE || parameters[1] == Null.VALUE)
    {
        mResult = Null.VALUE;
    }
    else
    {
        mResult = new Double( ((Number)parameters[0]).doubleValue() + 
                              ((Number)parameters[1]).doubleValue() );
    }
}   

Finally we need the method to return the result:

public Object getResult()
{
    return mResult;
}   

For completeness here are OGSA-DAI the imports we require:

import uk.org.ogsadai.dqp.converters.TypeMismatchException;
import uk.org.ogsadai.dqp.lqp.udf.FunctionType;
import uk.org.ogsadai.dqp.lqp.udf.LogicalExecutableFunctionBase;
import uk.org.ogsadai.tuple.Null;
import uk.org.ogsadai.tuple.TupleTypes;   

The class needs one member variable:

private Object mResult;   

151.2. Writing DQP aggregate functions

An aggregation function is a function that aggregates a collection of values for an attribute and returns a single attribute value. The function to calculate an attribute's average value (AVG) is an example of aggregation function.

Here we present the implementation of the AVG aggregation function to demonstrate how to write aggregation functions. In this implementation we will support short, long, integer, double and float values but for simplicity will not support BigDecimal values. Section 151.2.1, “Average function including support for BigDecimal” describes the complexities that arrise when we additionally support BigDecimal values.

To write an aggregation function we must write a class that extends uk.org.ogsadai.dqp.lqp.udf.aggregate.SQLAggregateFunction.

import uk.org.ogsadai.dqp.lqp.udf.aggregate.SQLAggregateFunction;

class Average extends SQLAggregateFunction
{
}   

To calculate the average we wish to store the sum of all the attribute values and also the number of values we have processed. To store these values we add two member variables:

private long   mCount;
private double mSum;

We now have a number of methods to write. Let's go through them one at a time.

The default constructor must call the super class.

public Average()
{
   super();
}

Next we have to write getName which returns the name of the function. This will be the name as it must be used in the SQL.

public String getName()
{
    return "AVG";
}

Next we write the configure method that is given the type of the parameter and throws an exception if the type is incorrect. Here was accept all the numeric types except BigDecimal:

public void configure(int... types) throws TypeMismatchException
{
    switch(types[0])
    {
        case TupleTypes._SHORT:
        case TupleTypes._LONG:
        case TupleTypes._INT:
        case TupleTypes._DOUBLE:
        case TupleTypes._FLOAT:
            break;
        default:  
            throw new TypeMismatchException(types[0]);
    }
}   

Next we write a function that returns the output type. This method will be called after configure has been called so it is possible to have an output type that depends on the input types. But here we will always use double:

public int getOutputType()
{
    return TupleTypes._DOUBLE;
}   

Next we have to write the put method which will receive the values one at a time. We must remember to handle Null values.

public void put(Object... parameters)
{
    Object parameter = parameters[0];
    if (parameter != Null.VALUE)
    {
        mCount++;
        mSum += ((Number) parameter).doubleValue();
    }
}   

Next we must write a method that returns the result:

public Object getResult()
{
    if (mCount != 0)
    {
        return new Double(mSum / mCount);
    }
    else
    {
        return Null.VALUE;
    }
}   

That completes the basic implementation of the aggregation function but we must now add methods that allow the aggregations function's state to be serialised, deserialised and merged. These functions allow the execution of the aggregation task handled in a scalable way.

The merge function must merge the results of a partial aggregation with the current aggregation. The parameter give to the merge function will always be another instance of the function being implemented:

public void merge(SerialisableFunction function)
{
    Average avg = (Average)function;
    mCount += avg.mCount;
    mSum += avg.mSum;
}   

The serialise function write the function's state to disk:

public void serialise(DataOutputStream output) throws IOException
{
    output.writeDouble(mSum);
    output.writeLong(mCount);
}   

The deserialise function must construct a new version of the class from the state stored on the disk:

public SerialisableFunction deserialise(DataInputStream input) throws IOException
{
    Average avg = new Average(this);
    
    avg.mSum = input.readDouble();
    avg.mCount = input.readLong();
    return avg;
}   

Lastly we must write a constructor that takes another instance of our class as a parameter. The configure method will never be called on the instance being constructed so these method must copy any state normally held as a result of the configure call. It must not copy any state held as a result of calls to the put method. In our implementation of the configure method we store no state so this version of the constructor is very simple:

public Average(Average avg)
{
    this();
}   

For a more complex example of this constructor see Section 151.2.1, “Average function including support for BigDecimal”.

The full implementation is:

package my.functions;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import uk.org.ogsadai.dqp.converters.TypeMismatchException;
import uk.org.ogsadai.dqp.lqp.udf.aggregate.SQLAggregateFunction;
import uk.org.ogsadai.tuple.Null;
import uk.org.ogsadai.tuple.TupleTypes;
import uk.org.ogsadai.tuple.serialise.SerialisableFunction;

/**
 * Average function.
 *
 * @author The OGSA-DAI Project Team.
 */
public class Average extends SQLAggregateFunction
{
    /** Current count. */
    private long mCount;
    /** Current sum of values. */
    private double mSum;

    /**
     * Default constructor.
     */
    public Average()
    {
    }
    
    /**
     * Type state copy constructor. The current data values are not copied.
     * 
     * @param avg
     *            function to copy
     */
    public Average(Average avg)
    {
        this();
    }
    
    /**
     * {@inheritDoc}
     */
    public String getName()
    {
        return "AVG";
    }

    /**
     * {@inheritDoc}
     */
    public void configure(int... types) throws TypeMismatchException
    {
        switch(types[0])
        {
            case TupleTypes._SHORT:
            case TupleTypes._LONG:
            case TupleTypes._INT:
            case TupleTypes._DOUBLE:
            case TupleTypes._FLOAT:
                break;
            default:  
                throw new TypeMismatchException(types[0]);
        }
    }

    /**
     * {@inheritDoc}
     */
    public int getOutputType()
    {
        return TupleTypes._DOUBLE;
    }

    /**
     * {@inheritDoc}
     */
    public void put(Object... parameters)
    {
        Object parameter = parameters[0];
        if (parameter != Null.VALUE)
        {
            mCount++;
            mSum += ((Number) parameter).doubleValue();
        }
    }

    /**
     * {@inheritDoc}
     */
    public Object getResult()
    {
        if (mCount != 0)
        {
            return new Double(mSum / mCount);
        }
        else
        {
            return Null.VALUE;
        }
    }


    /**
     * {@inheritDoc}
     */
    public void merge(SerialisableFunction function)
    {
        Average avg = (Average)function;
        mCount += avg.mCount;
        mSum += avg.mSum;
    }
    
    /**
     * {@inheritDoc}
     */
    public SerialisableFunction deserialise(DataInputStream input) 
        throws IOException
    {
        Average avg = new Average(this);
        avg.mSum = input.readDouble();
        avg.mCount = input.readLong();
        return avg;
    }

    /**
     * {@inheritDoc}
     */
    public void serialise(DataOutputStream output) throws IOException
    {
        output.writeDouble(mSum);
        output.writeLong(mCount);
    }
}   

151.2.1. Average function including support for BigDecimal

The implementation of the Average function included in OGSA-DAI also supports data in BigDecimal objects. This requires the configure method to store state relating to the type of the data being processed. The fact that the configure method stores state means that the parameterised constructor must also copy this part of the object's state.

The objects member variables (state) are:

public class Average extends SQLAggregateFunction
{
    /** Current count. */
    private long mCount;
    /** Current sum of values. */
    private double mSum;
    /** Current sum of values in a big decimal or null. */
    private BigDecimal mSumBD;
    /** Are we using big decimals? */
    private boolean mIsBigDecimal;
}

The configure method is:

public void configure(int... types) throws TypeMismatchException
{
    switch(types[0])
    {
        case TupleTypes._SHORT:
        case TupleTypes._LONG:
        case TupleTypes._INT:
        case TupleTypes._DOUBLE:
        case TupleTypes._FLOAT:
            mIsBigDecimal = false;
            break;
        case TupleTypes._BIGDECIMAL:
            mIsBigDecimal = true;
            mSumBD = BigDecimal.ZERO;
            break;
        default:  
            throw new TypeMismatchException(types[0]);
    }
}   

The parameterised constructor is:

public Average(Average avg)
{
    this();
    mIsBigDecimal = avg.mIsBigDecimal;
    if (mIsBigDecimal)
    {
        mSumBD = BigDecimal.ZERO;
    }
}   

Note that the type state ( mIsBigDecimal) is copied from the parameter but the data state (mCount, mSum, mSunBD) is not copied.

For reference the full implementation of the BigDecimal version is shown here:

package my.functions;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.math.BigDecimal;

import uk.org.ogsadai.dqp.converters.TypeMismatchException;
import uk.org.ogsadai.dqp.lqp.udf.aggregate.SQLAggregateFunction;
import uk.org.ogsadai.tuple.Null;
import uk.org.ogsadai.tuple.TupleTypes;
import uk.org.ogsadai.tuple.serialise.SerialisableFunction;

/**
 * Average function.
 *
 * @author The OGSA-DAI Project Team.
 */
public class Average extends SQLAggregateFunction
{
    /** Current count. */
    private long mCount;
    /** Current sum of values. */
    private double mSum;
    /** Current sum of values in a big decimal or null. */
    private BigDecimal mSumBD;
    /** Are we using big decimals? */
    private boolean mIsBigDecimal;

    /**
     * Default constructor.
     */
    public Average()
    {
    }
    
    /**
     * Type state copy constructor. The current data values are not copied.
     * 
     * @param avg
     *            function to copy
     */
    public Average(Average avg)
    {
        this();
        mIsBigDecimal = avg.mIsBigDecimal;
        if (mIsBigDecimal)
        {
            mSumBD = BigDecimal.ZERO;
        }
    }
    
    /**
     * {@inheritDoc}
     */
    public String getName()
    {
        return "AVG";
    }

    /**
     * {@inheritDoc}
     */
    public void configure(int... types) throws TypeMismatchException 
    {
        switch(types[0])
        {
            case TupleTypes._SHORT:
            case TupleTypes._LONG:
            case TupleTypes._INT:
            case TupleTypes._DOUBLE:
            case TupleTypes._FLOAT:
                    mIsBigDecimal = false;
            break;
                case TupleTypes._BIGDECIMAL:
                    mIsBigDecimal = true;
                    mSumBD = BigDecimal.ZERO;
                    break;
            default:  
            throw new TypeMismatchException(types[0]);
        }
    }

    /**
     * {@inheritDoc}
     */
    public int getOutputType()
    {
        if (mIsBigDecimal)
        {
            return TupleTypes._BIGDECIMAL;
        }
        else
        {
            return TupleTypes._DOUBLE;
        }
    }

    /**
     * {@inheritDoc}
     */
    public void put(Object... parameters)
    {
        Object parameter = parameters[0];
        if (parameter != Null.VALUE)
        {
            mCount++;
            add((Number)parameter);
        }
    }

    /**
     * Adds the given number to the current sum.
     * 
     * @param number
     *            number to add
     */
    private void add(Number number)
    {
        if (mIsBigDecimal)
        {
            mSumBD = mSumBD.add((BigDecimal)number);
        }
        else
        {
            mSum += number.doubleValue();
        }
    }

    /**
     * {@inheritDoc}
     */
    public Object getResult()
    {
        if (mCount != 0)
        {
            if (mIsBigDecimal)
            {
                // we want scale of at least 5 in case we're diving integers
                if (mSumBD.scale() < 5)
                {
                    return mSumBD.divide(
                        new BigDecimal(mCount), 5, BigDecimal.ROUND_HALF_UP);
                }
                else
                {
                    return mSumBD.divide(
                        new BigDecimal(mCount), BigDecimal.ROUND_HALF_UP);
                }
            }
            else
            {
                return new Double(mSum / mCount);
            }
        }
        else
        {
            return Null.VALUE;
        }
    }

    /**
     * {@inheritDoc}
     */
    public void merge(SerialisableFunction function)
    {
        Average avg = (Average)function;
        mCount += avg.mCount;
        mSum += avg.mSum;
        if (avg.mIsBigDecimal)
        {
            mSumBD.add(avg.mSumBD);
            mIsBigDecimal = true;
        }
        else
        {
            mIsBigDecimal = false;
        }
    }
    
    /**
     * {@inheritDoc}
     */
    public SerialisableFunction deserialise(DataInputStream input) 
        throws IOException
    {
        Average avg = new Average(this);
        avg.mIsBigDecimal = input.readBoolean();
        if (avg.mIsBigDecimal)
        {
            avg.mSumBD = new BigDecimal(input.readUTF());
        }
        else
        {
            avg.mSum = input.readDouble();
        }
        avg.mCount = input.readLong();
        return avg;
    }

    /**
     * {@inheritDoc}
     */
    public void serialise(DataOutputStream output) throws IOException
    {
        output.writeBoolean(mIsBigDecimal);
        if (mIsBigDecimal)
        {
            output.writeUTF(mSumBD.toString());
        }
        else
        {
            output.writeDouble(mSum);
        }
        output.writeLong(mCount);
    }
}   

151.3. Deploying a DQP function

In order for DQP to successfully use a user defined function that function must be installed at all servers in the federation. To install a user defined function on an OGSA-DAI server:

  1. Add the jar containing your user defined functions to the OGSA-DAI server's jar directory. The server's JAR files are located in:

    $CATALINA_HOME/webapps/wsrf/WEB-INF/lib/
    

    if using Tomcat.

    $GLOBUS_LOCATION/lib/
    

    if using the Globus Toolkit container.

  2. Add the class name of your DQP functions to the DQP functions configuration file. To find the location of this file look in the JNDI file for the server. The server's JNDI file is located in:

    $CATALINA_HOME/webapps/wsrf/WEB-INF/etc/dai/jndi-config.xml
    

    if using Tomcat.

    $GLOBUS_LOCATION/etc/dai/jndi-config.xml
    

    if using the Globus Toolkit container.

    In the JNDI file there will be an entry for a resource called ogsadai/misc/uk.org.ogsadai.dqp.FUNCTION_REPOSITORY_CONFIG. Corresponding to this entry there will be a parameter called config. The value of this parameter is be the path to the DQP functions configuration file.

    Find the DQP functions configuration file and add the class name of each of your function classes to the file.
  3. Restart the OGSA-DAI container.

[Important]Important

Remember to do this for every OGSA-DAI server in your federation.