OGSA-DAI data sinks are OGSA-DAI resources. They are created using the CreateDataSink activity. A request can then be sent to stream data from them via the ReadFromDataSink activity. A client can then stream data into the sink, and so into a workflow via the OGSA-DAI data sink service. Alternatively, they can request that another server stream the data by sending a request that contains the DeliverToDataSink activity. Data sinks support a "push" mode of data delivery.
Delivery via data sinks is appropriate for:
Delivery via data sinks is not appropriate for:
When pushing data via a data sink service the client can push the data a block at a time, N blocks at a time or all the blocks at once. The best option will depend on the scenario, but pushing all the all blocks at once will typically be not feasible for large datasets. By tuning the number of blocks pushed at a time the best performance for a scenario can be identified.
The client toolkit provides a variation of the following methods for pushing data to a data sink:
void putValues(DataValue[] values)
: reads n
data values.
int putValuesNB(DataValue[] values, int start, int length)
: reads up to n
data values (non blocking call).
void putValue(DataValue value)
: reads the next data value.
Of these methods putValues
and
putValuesNB
are the preferred methods.
The putValues
method is the simplest
to use but will block until all specified number of blocks is put in the data
sink. This allows the possibility that the client request
can timeout and cause errors. It is therefore best to limit the use of
this method to these cases when the data flow is fast enough to be confident
that the client request will never time out.
The putValuesNB
method offers a more robust alternative as it is unlikely to time out
unless the server is inaccessible. This method can be more tricky
to use however as calling it repeatedly in a tight loop may lead to
performance problems. This is because zero (or too few ) data values will be
accepted if the data sink could not write all data given to it. Consider for
example a slow data sink that is unable to put data as fast as the client is
sending. This results in many calls to
putValuesNB
putting no data at all. The client
will then have to re-send the data. Putting such an operation inside a tight
loop might degrade both client and server performance. Besides unnecessary CPU
consumption, it may also result in too many ephemeral sockets being created on
the client side, eventually causing the client to run out of available sockets
and crash. For this reason it's useful to use a
call controller to control the frequency of calls.
The non-blocking call putValuesNB
calls a call controller before and after it puts data. An example
implementation is:
uk.org.ogsadai.client.toolkit.PausingResourceCallController
This controller pauses when a value is about to be put, hence delaying the call to a data sink. It starts pausing when the number of requests crosses a threshold. The amount of time to pause for and the threshold are parametrised. The default call controller used in the client toolkit classes is,
uk.org.ogsadai.client.toolkit.NullResourceCallController
which doesn't do anything to control the frequency. There are two reasons behind this choice.
PausingResourceCallController
is not suitable,
users are encouraged to write their own call controller by implementing the
uk.org.ogsadai.client.toolkit.DataSourceResourceCallController
interface.
Here is an example of using a data sink to push SQL queries to a request that executes those queries.
Before you can use a data sink it is necessary to create one. The CreateDataSink activity creates data sink.
import uk.org.ogsadai.client.toolkit.activities.delivery.DeliverToRequestStatus; import uk.org.ogsadai.client.toolkit.activities.management.CreateDataSink; import uk.org.ogsadai.client.toolkit.DataRequestExecutionResource; import uk.org.ogsadai.client.toolkit.PipelineWorkflow; import uk.org.ogsadai.client.toolkit.RequestExecutionType; import uk.org.ogsadai.client.toolkit.RequestResource; DataRequestExecutionResource drer = ...; // Create 2 activities... CreateDataSink createDataSink = new CreateDataSink(); DeliverToRequestStatus deliverToRequestStatus = new DeliverToRequestStatus(); deliverToRequestStatus.connectInput(createDataSink.getResultOutput()); // Create workflow... PipelineWorkflow workflow = new PipelineWorkflow(); workflow.add(createDataSink); workflow.add(deliverToRequestStatus); // Submit request... RequestResource requestResource = drer.execute(workflow, RequestExecutionType.SYNCHRONOUS); // Get data returned - the ID of the new data sink. ResourceID dataSinkID = createDataSink.nextResult();
Tip | |
---|---|
As an alternative to submitting a workflow that invokes
CreateDataSink to create the data sink the client toolkit
can do that for you. The, much shorter, sequence of steps is:
DataSinkResource dataSink = ResourceFactory.createDataSink(serverProxy, drer); |
Now we can submit our workflow to run SQL queries where the query comes via the data sink.
import uk.org.ogsadai.client.toolkit.activities.sql.SQLQuery; import uk.org.ogsadai.client.toolkit.activities.delivery.DeliverToRequestStatus; import uk.org.ogsadai.client.toolkit.activities.delivery.ReadFromDataSink; import uk.org.ogsadai.client.toolkit.activities.transform.TupleToWebRowSetCharArrays; import uk.org.ogsadai.client.toolkit.DataRequestExecutionResource; import uk.org.ogsadai.client.toolkit.PipelineWorkflow; import uk.org.ogsadai.client.toolkit.RequestExecutionType; import uk.org.ogsadai.client.toolkit.RequestResource; dataRequestExecutionResource drer = ...; // Create 4 activities... ReadFromDataSink readFromDataSink = new ReadFromDataSink(); readFromDataSink.setResourceID(dataSinkID); SQLQuery query = new SQLQuery(); query.setResourceID("MySQLDataResource"); query.connectExpressionInput(readFromDataSink.getOutput()); TupleToWebRowSetCharArrays tupleToWebRowSet = new TupleToWebRowSetCharArrays(); tupleToWebRowSet.connectDataInput(query.getDataOutput()); DeliverToRequestStatus deliverToRequestStatus = new DeliverToRequestStatus(); deliverToRequestStatus.connectInput(tupleToWebRowSet.getResultOutput()); // Create workflow... PipelineWorkflow workflow = new PipelineWorkflow(); workflow.add(readFromDataSink); workflow.add(query); workflow.add(tupleToWebRowSet); workflow.add(deliverToRequestStatus); // Submit request... RequestResource requestResource = drer.execute(workflow, RequestExecutionType.ASYNCHRONOUS); // Get results... if (tupleToWebRowSet.hasNextResult()) { ResultSet resultSet = tupleToWebRowSet.nextResultAsResultSet(); ... }
The above client would block when it submits the request as it would be waiting for the server to complete. And the server would block waiting for data to be pushed to the data sink. So we could execute another client to provide this data. The example below shows one that uses a blocking call.
import uk.org.ogsadai.client.toolkit.activities.transform.TupleToWebRowSetCharArrays; import uk.org.ogsadai.client.toolkit.DataSinkResource; import uk.org.ogsadai.client.toolkit.ServerProxy; import uk.org.ogsadai.data.DataValue; ServerProxy serverProxy = ...; // Get data source client-side proxy. DataSinkResource dataSink = serverProxy.getDataSinkResource(dataSourceID); // Get data DataValue[] values = getDataFromSomewhere(); // Put data dataSink.putValues(values); // Tell the sink that there is no more data. dataSink.close();
An alternative implementation using the non-blocking
putValuesNB
method is:
import uk.org.ogsadai.client.toolkit.activities.transform.TupleToWebRowSetCharArrays; import uk.org.ogsadai.client.toolkit.DataSinkResource; import uk.org.ogsadai.client.toolkit.PausingResourceCallController; import uk.org.ogsadai.client.toolkit.ServerProxy; import uk.org.ogsadai.data.DataValue; ServerProxy serverProxy = ...; // Get data source client-side proxy. DataSinkResource dataSink = null; PausingResourceCallController controller = new PausingResourceCallController(); controller.setPauseAfterRequests(1000L); controller.setSleepInterval(10L); dataSink.setDataSinkResourceController(controller); // Get data DataValue[] values = getDataFromSomewhere(); // Put data int start = 0; while(start < values.length) { start += dataSink.putValuesNB(values, start, (values.length-start)); } // Tell the sink that there is no more data. dataSink.close();
On receiving the data - in these example queries - the original workflow would unblock and continue to execute. By closing the data sink from this client the data sink knows it can expect no more data. The associated ReadFromDataSink activity on being informed of this will complete.
If a data sink service which is busy (or even failed for some reason) then the client requires a mechanism to re-submit the request. This can be done by using the optional sequence numbers:
long sequenceNumber = 0; // can submit integers 0, 1, 2, and so on dataSink.putValue(new StringData("SELECT * FROM littleblackbook", sequenceNumber);
Now, if the call with the sequence number takes too long then another request can be re-submitted with the same sequence number. If the case was where the data sink resource was busy and now it receives two consecutive requests with the same sequence number then the first request will be executed as expected whereas the second request will be ignored. However, if the second request appears to be hanging too then the client can keep re-submitting with the same sequence number until it reaches a point where it is safe to assume the service has failed.