Here we describe how to write an activity that outputs a simple "hello name" message, where "name" is a value provided as input to the activity. It is therefore a bit more complex than the "hello world" activity of the previous section. As you shall see, there are also two ways in which the activity can be implemented.
This is a specification of our "hello name" activity, written in a way compliant with the way we document activities in OGSA-DAI.
Summary:
uk.org.ogsadai.tutorials.HelloName
.
uk.org.ogsadai.tutorials.activity.HelloNameActivity
.
uk.org.ogsadai.tutorials.activity.client.HelloName
.
Inputs:
input
. Type:
java.lang.String
. A name.
Outputs:
output
. Type:
java.lang.String
. A "hello name"
message.
Configuration parameters: none.
Activity input/output ordering: none.
Activity contracts: none.
Target data resource: none. Our "hello name" activity is not associated with any data resource.
Behaviour:
There are two ways we can write our activity. The first is to sub-class the class:
uk.org.ogsadai.activity.ActivityBase
as we did for "hello world" activity. The second is to extend the class:
uk.org.ogsadai.activity.MatchedIterativeActivity
We will do each in turn and explain the differences between each approach.
You will remember that it is our convention that all activity
implementation classes end with
Activity
. So our activity class will
be called HelloNameActivity
.
Create the following directory structure:
$ mkdir uk $ mkdir uk/org $ mkdir uk/org/ogsadai $ mkdir uk/org/ogsadai/tutorials $ mkdir uk/org/ogsadai/tutorials/activity
Now create a file
HelloNameActivity.java
in this
directory
Add the following content to your file:
package uk.org.ogsadai.tutorials.activity; import uk.org.ogsadai.activity.ActivityBase; public class HelloNameActivity extends ActivityBase { }
You will recall that, by extending from
ActivityBase
, the only method we need
to implement is the process
method.
We need to place all the activity logic within the
process
method. For our
activity, we have to get the output pipe for the activity
and write the data (the "hello name" message) to it. But we also need
to get the input pipe too, so we can get the name to use in the
message.
We need to validate that the output pipe exists, as we did for our
"hello world" activity. We also need to validate that our input pipe
exists. This is done in a similar way as for output pipes, by use of
an validateInput
method of
ActivityBase
. So we have:
super.validateInput("input"); super.validateOutput("output");
validateInput
will cause the
appropriate exception to be thrown if the client has not specified an
input called input
. The exception
thrown is
InvalidActivityInputsException
. This
exception extends
ActivityUserException
.
Now that we have validated that the output pipe exists, we need to get a block writer for the output pipe. We do this the same way as for the "hello world" activity. However it will be useful if we save a reference to the block writer in a member variable of the activity. So we have:
private BlockWriter mOutputBlockWriter;
And in our process
method:
mOutputBlockWriter = getOutput("output");
We need a means of reading data from the input pipe. This is done via an interface called:
uk.org.ogsadai.activity.io.BlockReader
It will be useful if we save a reference to the block reader in a member variable of the activity. So we have:
private BlockReader mInputBlockReader;
To obtain the BlockReader
for an
input pipe, we use the getInput
method of the ActivityBase
super-class.
mInputBlockReader = getInput("input");
The BlockReader
is used to read data
blocks (any Java object) from the activity's input pipe using the
read
method. This method has the
following signature:
/** * Reads a block of data from the pipe. This operation blocks until a * block of data is available or the pipe is closed. * * @return the data block or ControlBlock.NO_MORE_DATA if * the pipe has been closed and there is no more data to read * @throws PipeIOException * If a problem occurs preventing a data block from being read * from the pipe. * @throws PipeTerminatedException * If the pipe read operation is interrupted indicating that the * request containing the pipe is being terminated. * @throws DataError * If the producer has signaled and error and the last * available block has been read by the consumer. */ public Object read() throws PipeIOException, PipeTerminatedException, DataError;
The read
method will block if the
input pipe is empty. If this is the case another activity will have to
write some data to the pipe before the
read
method will return. If a
ControlBlock.NO_MORE_DATA
object is received then this signals that the activity can expect no
more data and so can complete.
We can now provide the functionality of the
process
method:
Object block; // Pull data from the input block reader until // ControlBlock.NO_MORE_DATA is receved. while ((block = mInputBlockReader.read()) != ControlBlock.NO_MORE_DATA) { if (block instanceof String) { String name = (String)block; mOutputBlockWriter.write("Hello " + name); } else { // User input error - this activity only supports Strings. throw new InvalidInputValueException( "input", // Input name. String.class, // Expected class. block.getClass()); // Actual class. } }
Note that the activity only expects to receive objects of type
String
in the input. If the block is
not of type String
an exception is
thrown.
And that's it. Our activity is complete! The implementation is:
package uk.org.ogsadai.tutorials.activity; import uk.org.ogsadai.activity.ActivityBase; import uk.org.ogsadai.activity.ActivityProcessingException; import uk.org.ogsadai.activity.ActivityTerminatedException; import uk.org.ogsadai.activity.ActivityUserException; import uk.org.ogsadai.activity.io.ActivityPipeProcessingException; import uk.org.ogsadai.activity.io.BlockReader; import uk.org.ogsadai.activity.io.BlockWriter; import uk.org.ogsadai.activity.io.ControlBlock; import uk.org.ogsadai.activity.io.InvalidInputValueException; import uk.org.ogsadai.activity.io.PipeClosedException; import uk.org.ogsadai.activity.io.PipeIOException; import uk.org.ogsadai.activity.io.PipeTerminatedException; /** * An activity that outputs the string "Hello [name]!" for * each name received. */ public class HelloNameActivity extends ActivityBase { /** Block reader using to read the activity's input. */ private BlockReader mInputBlockReader; /** Block writer used to write the activity's output. */ private BlockWriter mOutputBlockWriter; /** * Runs the activity. */ public void process() throws ActivityUserException, ActivityProcessingException, ActivityTerminatedException { // Validate we have an input called "input". validateInput("input"); // Validate we have an output called "output". validateOutput("output"); // Get the block reader to read data from this input mInputBlockReader = getInput("input"); // Get the block writer to write data to this output mOutputBlockWriter = getOutput("output"); try { Object block; // Pull data from the input block reader until // ControlBlock.NO_MORE_DATA is received. while ((block = mInputBlockReader.read()) != ControlBlock.NO_MORE_DATA) { if (block instanceof String) { String name = (String)block; mOutputBlockWriter.write("Hello " + name); } else { // User input error - this activity only supports Strings. throw new InvalidInputValueException( "input", // Input name. String.class, // Expected class. block.getClass()); // Actual class. } } } catch (PipeClosedException e) { // Consumer does not want any more data, just stop. } catch (PipeIOException e) { throw new ActivityPipeProcessingException(e); } catch (PipeTerminatedException e) { throw new ActivityTerminatedException(); } } }
Implementing a loop that reads each input and throws exceptions when inputs are of the wrong type can be a repetitive task. The activity base class
uk.org.ogsadai.activity.MatchedIterativeActivity
can be used instead of the
ActivityBase
class to remove some of
this burden from you.
When we extend from this class we have to implement four methods - one
to get and configure the inputs -
getIterationInputs
- and three to
execute the functionality of the activity -
preprocess
,
processIteration
and
postprocess
. Together, these do what
the process
method did previously.
The MatchedIterativeActivity
base class implements the following algorithm:
preprocess(); WHILE more processing to do processIteration(Object[] iterationInputs); END-WHILE postprocess(); cleanUp();
The first method, getIterationInputs
,
has the following signature:
/** * Gets the iteration inputs. * * @return an array of ActivityInput objects. */ protected abstract ActivityInput[] getIterationInputs();
This method is used to tell the super class about the activity's
inputs. Here, we have a single input called
input
that expects to receive data of
type String
, so we implement this
method as:
/** * Returns details about the inputs. */ protected ActivityInput[] getIterationInputs() { return new ActivityInput[] { new TypedActivityInput("input", String.class)}; }
There are a variety of different classes that implement the
ActivityInput
interface. You should
choose one appropriate to the type of input expected. Here we use a
TypedActivityInput
so that we can
specify that every block received on the
input
pipe should be of type
String
. The base class will therefore
take appropriate actions to notify the client of an error if any
blocks received on this input pipe are not of type
String
.
The preprocess
method is called when
the activity is started.
For our activity all we have to do in the
preprocess
method is validate the
output and and get the BlockWriter
:
protected void preprocess() throws ActivityUserException, ActivityProcessingException, ActivityTerminatedException { validateOutput("output"); mOutputBlockWriter = getOutput("output"); }
When the activity is running, the base class reads the input values
from the input pipe and the
processInteration
method is called
once for each input value.
In the processIteration
method we
simply need to get the input, process it and write to the output:
protected void processIteration(Object[] iterationData) throws ActivityProcessingException, ActivityTerminatedException, ActivityUserException { try { // We have one input so it will be available at the 0th // element in the array. String name = (String)iterationData[0]; mOutputBlockWriter.write("Hello " + name); } catch (PipeClosedException e) { // Consumer does not want any more data, just stop. } catch (PipeIOException e) { throw new ActivityPipeProcessingException(e); } catch (PipeTerminatedException e) { throw new ActivityTerminatedException(); } }
Note how the input value was obtained. The parameter to the method
is an object array. Each object in this array corresponds to an input
specified in the ActivityInput
array
returned from the getIterationInputs
method. In that method we specified that this input should be of type
String
so it is safe to obtain it and
cast it to a String
with the line:
String name = (String)iterationData[0];
The rest of this method is simply error handling identical to that used in our first version of our activity.
When there is no more data the
postprocess
method is called. There
is also an empty implementation of a
cleanUp
method provided in the base
class that can be overridden if desired. The difference between
postprocess
and
cleanUp
is that
postprocess
will only be called if
the previous stages have executed successfully but
cleanUp
will always be called even if
earlier stages have thrown an exception.
For our activity we have no post-processing to do so our method becomes:
/** * Post-processing. */ protected void postprocess() throws ActivityUserException, ActivityProcessingException, ActivityTerminatedException { // No post-processing. }
And that's it. The implementation now is:
package uk.org.ogsadai.tutorials.activity; import uk.org.ogsadai.activity.ActivityProcessingException; import uk.org.ogsadai.activity.ActivityTerminatedException; import uk.org.ogsadai.activity.ActivityUserException; import uk.org.ogsadai.activity.MatchedIterativeActivity; import uk.org.ogsadai.activity.io.ActivityInput; import uk.org.ogsadai.activity.io.ActivityPipeProcessingException; import uk.org.ogsadai.activity.io.BlockWriter; import uk.org.ogsadai.activity.io.PipeClosedException; import uk.org.ogsadai.activity.io.PipeIOException; import uk.org.ogsadai.activity.io.PipeTerminatedException; import uk.org.ogsadai.activity.io.TypedActivityInput; /** * Simple activity that receives names and outputs "Hello [name]" for * each name received. In this case the activity is implemented by * extending MatchedIterativeActivity. */ public class HelloName extends MatchedIterativeActivity { /** Block writer used to write the activity's output. */ private BlockWriter mOutputBlockWriter; /** * Returns the details about the inputs. */ protected ActivityInput[] getIterationInputs() { return new ActivityInput[] { new TypedActivityInput("input", String.class)}; } /** * Pre-processing. */ protected void preprocess() throws ActivityUserException, ActivityProcessingException, ActivityTerminatedException { validateOutput("output"); mOutputBlockWriter = getOutput("output"); } /** * Process a single iteration. */ protected void processIteration(Object[] iterationData) throws ActivityProcessingException, ActivityTerminatedException, ActivityUserException { try { String name = (String) iterationData[0]; mOutputBlockWriter.write("Hello " + name); } catch (PipeClosedException e) { // Consumer does not want any more data, just stop. } catch (PipeIOException e) { throw new ActivityPipeProcessingException(e); } catch (PipeTerminatedException e) { throw new ActivityTerminatedException(); } } /** * Post-processing. */ protected void postprocess() throws ActivityUserException, ActivityProcessingException, ActivityTerminatedException { // No post-processing } }
As can be seen, using the
MatchedIterativeActivity
base class
can make the development of activities that have inputs simpler than
using ActivityBase
as the base class.
We now need to deploy the activity onto an OGSA-DAI server and then configure a resource to expose the activity. Since our activity is not specific to any data resource we expose it via the data request execution resource.
Create a JAR containing your activity. You can do this as follows:
$ jar -cvf helloName.jar uk
And then put it in a temporary directory e.g.:
$ mkdir tmp $ mv helloName.jar tmp
Now we deploy the activity onto the server and then expose the activity via the data request execution resource. This is done as follows.
Now we deploy the activity onto the server and then expose the activity via the data request execution resource. This is done as follows (for full information, please see Chapter 51, How to deploy an activity and Chapter 52, How to extend a resource's activities).
Write an OGSA-DAI configuration file,
config.txt
:
Activity add uk.org.ogsadai.tutorials.HelloName uk.org.ogsadai.tutorials.activity.HelloNameActivity "A hello name activity" Resource addActivity DataRequestExecutionResource uk.org.ogsadai.tutorials.HelloName uk.org.ogsadai.tutorials.HelloName
Run:
$ ant -Dtomcat.dir=$CATALINA_HOME -Dconfig.file=config.txt -Djar.dir=tmp configure
You will now need to restart your container.
Having written an activity we should now write a client proxy class.
Remember that, by convention, the client proxy class has the same name
as the activity class except that it should not end with
Activity
. So our client proxy class
will be called HelloName
.
$ mkdir uk $ mkdir uk/org $ mkdir uk/org/ogsadai $ mkdir uk/org/ogsadai/tutorials $ mkdir uk/org/ogsadai/tutorials/activity $ mkdir uk/org/ogsadai/tutorials/activity/client
Now create a file HelloName.java
in
this directory.
Remember that client proxy activity classes must implement the interface:
uk.org.ogsadai.client.toolkit.Activity
and that the easiest way to write a client proxy class is to inherit from:
uk.org.ogsadai.client.toolkit.activity.BaseActivity
Add the following to your file:
package uk.org.ogsadai.tutorials.activity.client; import uk.org.ogsadai.client.toolkit.activity.BaseActivity; /** * Client toolkit activity used to call the HelloName activity. */ public class HelloName extends BaseActivity { }
The client proxy should contain a member variable for each input and output:
private ActivityInput mInput; private ActivityOutput mOutput;
We now need to implement the constructor that passes the default activity name to the base class and initialise the input and output variables:
public HelloName() { super(new ActivityName("uk.org.ogsadai.tutorials.HelloName")); mInput = new SimpleActivityInput("input"); mOutput = new SimpleActivityOutput("output"); }
You will notice that the input is initialised in a similar way to the output.
We now need to implement the three abstract protected methods that are supported by the base class that provide details of the activity's inputs, outputs and validates these. For inputs, the method is as follows:
protected ActivityInput[] getInputs() { return new ActivityInput[]{mInput}; }
You will notice that this is much the same as for the output as we saw when developing the "hello world" activity. And, for the outputs, our method is the same as for the "hello world" activity i.e.:
protected ActivityOutput[] getOutputs() { return new ActivityOutput[]{mOutput}; }
As for "hello world" there is no additional input or output validation required so our validation method is:
protected void validateIOState() throws ActivityIOIllegalStateException { // No further validation to do }
We now have to implement the methods that will be used by client developers to connect client proxy activities together and so form a workflow. As for our "hello world" activity, for our output the method is:
/** * Gets the output so that it can be connected to the input of other * activities. * * @return the activity output. */ public SingleActivityOutput getOutput() { return mOutput.getSingleActivityOutputs()[0]; }
But for this activity we have an input, so we also need to provide a way for clients to provide inputs to this activity from their program either by providing values to the activity directly (as what we term input literals) or by allowing them to connect the input of this activity to the output of another activity.
To allow client developers to connect the input to the outputs of other activities we provide the following method:
/** * Connects the input to the given output. * * @param output * Output to connect to. */ public void connectInput(SingleActivityOutput output) { mInput.connect(output); }
All inputs should have a method such as this. The method is generally
called connectXInput
where X
is the
name of the input. But when there is only a single input called input
this method can be called
connectInput
as it is here.
Finally, we should also provide a method that allows the client developer to add values to the input directly rather than connect it to an output. This method is:
/** * Adds an value to the input. * * @param name * Name to add to the input. */ public void addInput(String name) { mInput.add(new StringData(name)); }
Note how the string is wrapped in the
uk.org.ogsadai.data.StringData
wrapper. This ensures that the client toolkit marks it up as a string
so it can be correctly interpreted by the OGSA-DAI server.
We now need to provide our
hasNext
X and
next
X methods to
allow client developers to access any data. These are as for our
"hello world" activity.
/** * Gets if the activity has a next output value. * * @return true if there is another output value, false otherwise. * @throws DataStreamErrorException * If there is an error on the data stream. * @throws UnexpectedDataValueException * If there is an unexpected data value on the data stream. * @throws DataSourceUsageException * If there is an error reading from a data source. */ public boolean hasNextOutput() throws DataStreamErrorException, UnexpectedDataValueException, DataSourceUsageException { return mOutput.getDataValueIterator().hasNext(); } /** * Gets the next output value. * * @return the next output value. * @throws DataStreamErrorException * If there is an error on the data stream. * @throws UnexpectedDataValueException * If there is an unexpected data value on the data stream. * @throws DataSourceUsageException * If there is an error reading from a data source. */ public String nextOutput() throws DataStreamErrorException, UnexpectedDataValueException, DataSourceUsageException { return mOutput.getDataValueIterator().nextAsString(); }
And that's us done. The full client proxy activity implementation is:
package uk.org.ogsadai.tutorials.activity.client; import uk.org.ogsadai.activity.ActivityName; import uk.org.ogsadai.client.toolkit.ActivityOutput; import uk.org.ogsadai.client.toolkit.SingleActivityOutput; import uk.org.ogsadai.client.toolkit.activity.ActivityInput; import uk.org.ogsadai.client.toolkit.activity.BaseActivity; import uk.org.ogsadai.client.toolkit.activity.SimpleActivityInput; import uk.org.ogsadai.client.toolkit.activity.SimpleActivityOutput; import uk.org.ogsadai.client.toolkit.exception.ActivityIOIllegalStateException; import uk.org.ogsadai.client.toolkit.exception.DataSourceUsageException; import uk.org.ogsadai.client.toolkit.exception.DataStreamErrorException; import uk.org.ogsadai.client.toolkit.exception.UnexpectedDataValueException; import uk.org.ogsadai.data.StringData; /** * Client toolkit activity used to call the HelloName activity. */ public class HelloName extends BaseActivity { /** Default activity name */ public final static ActivityName DEFAULT_ACTIVITY_NAME = new ActivityName("uk.org.ogsadai.tutorials.HelloName"); /** Activity input. */ private ActivityInput mInput; /** Activity output. */ private ActivityOutput mOutput; /** * Constructor. */ public HelloName() { super(DEFAULT_ACTIVITY_NAME); mInput = new SimpleActivityInput("input"); mOutput = new SimpleActivityOutput("output"); } /** * Adds an value to the input. * * @param name * Name to add to the input. */ public void addInput(String name) { mInput.add(new StringData(name)); } /** * Connects the input to the given output. * * @param output * Output to connect to. */ public void connectInput(SingleActivityOutput output) { mInput.connect(output); } /** * Gets the output so that it can be connected to the input of * other activities. * * @return the activity output. */ public SingleActivityOutput getOutput() { return mOutput.getSingleActivityOutputs()[0]; } /** * Gets if the activity has a next output value. * * @return true if there is another output value, false otherwise. * @throws DataStreamErrorException * If there is an error on the data stream. * @throws UnexpectedDataValueException * If there is an unexpected data value on the data stream. * @throws DataSourceUsageException * If there is an error reading from a data source. */ public boolean hasNextOutput() throws DataStreamErrorException, UnexpectedDataValueException, DataSourceUsageException { return mOutput.getDataValueIterator().hasNext(); } /** * Gets the next output value. * * @return the next output value. * @throws DataStreamErrorException * If there is an error on the data stream. * @throws UnexpectedDataValueException * If there is an unexpected data value on the data stream. * @throws DataSourceUsageException * If there is an error reading from a data source. */ public String nextOutput() throws DataStreamErrorException, UnexpectedDataValueException, DataSourceUsageException { return mOutput.getDataValueIterator().nextAsString(); } /** * Gets the activity inputs. */ protected ActivityInput[] getInputs() { return new ActivityInput[]{mInput}; } /** * Gets the activity outputs. */ protected ActivityOutput[] getOutputs() { return new ActivityOutput[]{mOutput}; } /** * Validates the data of the inputs and outputs. */ protected void validateIOState() throws ActivityIOIllegalStateException { // No further validation to do } }
Here is a simple client that you can use to test the "hello name" activity. In the sample code the server URL is
http://localhost:8080/dai/services/
When writing your client you should, of course, use the URL of your OGSA-DAI server
import java.net.URL; import uk.org.ogsadai.client.toolkit.DataRequestExecutionResource; import uk.org.ogsadai.client.toolkit.PipelineWorkflow; import uk.org.ogsadai.client.toolkit.RequestExecutionType; import uk.org.ogsadai.client.toolkit.ServerProxy; import uk.org.ogsadai.client.toolkit.activities.delivery.DeliverToRequestStatus; import uk.org.ogsadai.client.toolkit.exception.RequestExecutionException; import uk.org.ogsadai.resource.ResourceID; import uk.org.ogsadai.tutorials.activity.client.HelloName; /** * Application for the HelloName activity tutorial. */ public class HelloNameClient { /** * Main method. * * @param args * Unused. * @throws Exception if an unexpected error occurs */ public static void main(String[] args) throws Exception { URL serverBaseUrl = new URL("http://localhost:8080/dai/services/"); ResourceID drerID = new ResourceID("DataRequestExecutionResource"); // Get server proxy. ServerProxy serverProxy = new ServerProxy(); serverProxy.setDefaultBaseServicesURL(serverBaseUrl); // Get DRER proxy. DataRequestExecutionResource drer = serverProxy.getDataRequestExecutionResource(drerID); // Create the activities HelloName helloName = new HelloName(); // Add some names as input literals to the HelloName activity. helloName.addInput("Rod"); helloName.addInput("Jane"); helloName.addInput("Freddy"); DeliverToRequestStatus deliverToRequestStatus = new DeliverToRequestStatus(); // Connect the output of HelloName to DeliverToRequestStatus deliverToRequestStatus.connectInput(helloName.getOutput()); // Create the workflow. PipelineWorkflow pipeline = new PipelineWorkflow(); pipeline.add(helloName); pipeline.add(deliverToRequestStatus); // Excecute the workflow. try { drer.execute(pipeline, RequestExecutionType.SYNCHRONOUS); } catch (RequestExecutionException e) { System.out.println("There was an error executing the workflow"); System.out.println(e.getRequestResource().getRequestStatus()); throw e; } // Get the result and display it. while (helloName.hasNextOutput()) { System.out.println(helloName.nextOutput()); } } }
To compile the client just do:
$ javac HelloNameClient
And running it should give the result:
$ java HelloNameClient Hello Rod Hello Jane Hello Freddy