Chapter 90. Writing effective clients - using data sinks for delivery

90.1. Introduction
90.2. Scenarios
90.3. Operations
90.4. Examples

90.1. Introduction

OGSA-DAI data sinks are OGSA-DAI resources. They are created using the CreateDataSink activity. A request can then be sent to stream data from them via the ReadFromDataSink activity. A client can then stream data into the sink, and so into a workflow via the OGSA-DAI data sink service. Alternatively, they can request that another server stream the data by sending a request that contains the DeliverToDataSink activity. Data sinks support a "push" mode of data delivery.

90.2. Scenarios

Delivery via data sinks is appropriate for:

  • Scenarios involving the transfer of large data sets, e.g. 1,000,000 rows and above.
  • Scenarios where there is no access to an FTP server.

Delivery via data sinks is not appropriate for:

90.3. Operations

When pushing data via a data sink service the client can push the data a block at a time, N blocks at a time or all the blocks at once. The best option will depend on the scenario, but pushing all the all blocks at once will typically be not feasible for large datasets. By tuning the number of blocks pushed at a time the best performance for a scenario can be identified.

The client toolkit provides a variation of the following methods for pushing data to a data sink:

  • void putValues(DataValue[] values): reads n data values.
  • int putValuesNB(DataValue[] values, int start, int length): reads up to n data values (non blocking call).
  • void putValue(DataValue value): reads the next data value.

Of these methods putValues and putValuesNB are the preferred methods. The putValues method is the simplest to use but will block until all specified number of blocks is put in the data sink. This allows the possibility that the client request can timeout and cause errors. It is therefore best to limit the use of this method to these cases when the data flow is fast enough to be confident that the client request will never time out.

The putValuesNB method offers a more robust alternative as it is unlikely to time out unless the server is inaccessible. This method can be more tricky to use however as calling it repeatedly in a tight loop may lead to performance problems. This is because zero (or too few ) data values will be accepted if the data sink could not write all data given to it. Consider for example a slow data sink that is unable to put data as fast as the client is sending. This results in many calls to putValuesNB putting no data at all. The client will then have to re-send the data. Putting such an operation inside a tight loop might degrade both client and server performance. Besides unnecessary CPU consumption, it may also result in too many ephemeral sockets being created on the client side, eventually causing the client to run out of available sockets and crash. For this reason it's useful to use a call controller to control the frequency of calls.

The non-blocking call putValuesNB calls a call controller before and after it puts data. An example implementation is:

uk.org.ogsadai.client.toolkit.PausingResourceCallController

This controller pauses when a value is about to be put, hence delaying the call to a data sink. It starts pausing when the number of requests crosses a threshold. The amount of time to pause for and the threshold are parametrised. The default call controller used in the client toolkit classes is,

uk.org.ogsadai.client.toolkit.NullResourceCallController

which doesn't do anything to control the frequency. There are two reasons behind this choice.

  • A call controller may not be required at all. It might well be the case that a non-blocking call works well without regulating the call in any way.
  • If indeed required, how to regulate the frequency, and what parameters to use (if any) are problem specific questions. If PausingResourceCallController is not suitable, users are encouraged to write their own call controller by implementing the uk.org.ogsadai.client.toolkit.DataSourceResourceCallController interface.

90.4. Examples

Here is an example of using a data sink to push SQL queries to a request that executes those queries.

Before you can use a data sink it is necessary to create one. The CreateDataSink activity creates data sink.

import uk.org.ogsadai.client.toolkit.activities.delivery.DeliverToRequestStatus;
import uk.org.ogsadai.client.toolkit.activities.management.CreateDataSink;
import uk.org.ogsadai.client.toolkit.DataRequestExecutionResource;
import uk.org.ogsadai.client.toolkit.PipelineWorkflow;
import uk.org.ogsadai.client.toolkit.RequestExecutionType;
import uk.org.ogsadai.client.toolkit.RequestResource;

DataRequestExecutionResource drer = ...;

// Create 2 activities...
CreateDataSink createDataSink = new CreateDataSink();

DeliverToRequestStatus deliverToRequestStatus = new DeliverToRequestStatus();
deliverToRequestStatus.connectInput(createDataSink.getResultOutput());

// Create workflow...
PipelineWorkflow workflow = new PipelineWorkflow();
workflow.add(createDataSink);
workflow.add(deliverToRequestStatus);

// Submit request...
RequestResource requestResource =
    drer.execute(workflow, RequestExecutionType.SYNCHRONOUS);

// Get data returned - the ID of the new data sink.
ResourceID dataSinkID = createDataSink.nextResult();
[Tip]Tip
As an alternative to submitting a workflow that invokes CreateDataSink to create the data sink the client toolkit can do that for you. The, much shorter, sequence of steps is:
DataSinkResource dataSink =
    ResourceFactory.createDataSink(serverProxy, drer);

Now we can submit our workflow to run SQL queries where the query comes via the data sink.

import uk.org.ogsadai.client.toolkit.activities.sql.SQLQuery;
import uk.org.ogsadai.client.toolkit.activities.delivery.DeliverToRequestStatus;
import uk.org.ogsadai.client.toolkit.activities.delivery.ReadFromDataSink;
import uk.org.ogsadai.client.toolkit.activities.transform.TupleToWebRowSetCharArrays;
import uk.org.ogsadai.client.toolkit.DataRequestExecutionResource;
import uk.org.ogsadai.client.toolkit.PipelineWorkflow;
import uk.org.ogsadai.client.toolkit.RequestExecutionType;
import uk.org.ogsadai.client.toolkit.RequestResource;

dataRequestExecutionResource drer = ...;

// Create 4 activities...
ReadFromDataSink readFromDataSink = new ReadFromDataSink();
readFromDataSink.setResourceID(dataSinkID);

SQLQuery query = new SQLQuery();
query.setResourceID("MySQLDataResource");
query.connectExpressionInput(readFromDataSink.getOutput());

TupleToWebRowSetCharArrays tupleToWebRowSet =
    new TupleToWebRowSetCharArrays();
tupleToWebRowSet.connectDataInput(query.getDataOutput());

DeliverToRequestStatus deliverToRequestStatus = new DeliverToRequestStatus();
deliverToRequestStatus.connectInput(tupleToWebRowSet.getResultOutput());

// Create workflow...
PipelineWorkflow workflow = new PipelineWorkflow();
workflow.add(readFromDataSink);
workflow.add(query);
workflow.add(tupleToWebRowSet);
workflow.add(deliverToRequestStatus);

// Submit request...
RequestResource requestResource = 
    drer.execute(workflow, RequestExecutionType.ASYNCHRONOUS);

// Get results...
if (tupleToWebRowSet.hasNextResult())
{
    ResultSet resultSet = tupleToWebRowSet.nextResultAsResultSet();
    ...
}

The above client would block when it submits the request as it would be waiting for the server to complete. And the server would block waiting for data to be pushed to the data sink. So we could execute another client to provide this data. The example below shows one that uses a blocking call.

import uk.org.ogsadai.client.toolkit.activities.transform.TupleToWebRowSetCharArrays;
import uk.org.ogsadai.client.toolkit.DataSinkResource;
import uk.org.ogsadai.client.toolkit.ServerProxy;
import uk.org.ogsadai.data.DataValue;

ServerProxy serverProxy = ...;
// Get data source client-side proxy.
DataSinkResource dataSink = serverProxy.getDataSinkResource(dataSourceID);
// Get data
DataValue[] values = getDataFromSomewhere();
// Put data
dataSink.putValues(values);
// Tell the sink that there is no more data.
dataSink.close();

An alternative implementation using the non-blocking putValuesNB method is:

import uk.org.ogsadai.client.toolkit.activities.transform.TupleToWebRowSetCharArrays;
import uk.org.ogsadai.client.toolkit.DataSinkResource;
import uk.org.ogsadai.client.toolkit.PausingResourceCallController;
import uk.org.ogsadai.client.toolkit.ServerProxy;
import uk.org.ogsadai.data.DataValue;

ServerProxy serverProxy = ...;

// Get data source client-side proxy.
DataSinkResource dataSink = null;
PausingResourceCallController controller = new PausingResourceCallController();
controller.setPauseAfterRequests(1000L);
controller.setSleepInterval(10L);
dataSink.setDataSinkResourceController(controller);
// Get data
DataValue[] values = getDataFromSomewhere();
// Put data
int start = 0;
while(start < values.length)
{
    start += dataSink.putValuesNB(values, start, (values.length-start));
}
// Tell the sink that there is no more data.
dataSink.close();

On receiving the data - in these example queries - the original workflow would unblock and continue to execute. By closing the data sink from this client the data sink knows it can expect no more data. The associated ReadFromDataSink activity on being informed of this will complete.

If a data sink service which is busy (or even failed for some reason) then the client requires a mechanism to re-submit the request. This can be done by using the optional sequence numbers:

  long sequenceNumber = 0;    // can submit integers 0, 1, 2, and so on
  dataSink.putValue(new StringData("SELECT * FROM littleblackbook", sequenceNumber);

Now, if the call with the sequence number takes too long then another request can be re-submitted with the same sequence number. If the case was where the data sink resource was busy and now it receives two consecutive requests with the same sequence number then the first request will be executed as expected whereas the second request will be ignored. However, if the second request appears to be hanging too then the client can keep re-submitting with the same sequence number until it reaches a point where it is safe to assume the service has failed.