Hi,
I've been working on trying to get a implementable design for the asynch WPS
work and would like to present it to the dev community.
To recap, when executing a process we have a number of phases:
- Request check (does it even make sense). This is quick
- Input parsing via PPIO. If the input is remote and big this is going
to take a _long_ time.
- Execution (quick or long, depends on how the process is implemented)
- Output encoding. If the process is streaming and/or the output is big this one
too will take its dear time
The output is available in multiple variations (w la flexibility...):
- single output, directly written out (synch process, raw output)
- outputs wrapped in a xml document, directly written out
(synch process, no raw, no store)
- outputs wrapped in a xml document, stored (no status, the
document either is the response or says the process has been
accepted)
- outputs referred by the response xml document, stored (no status, the
document either is the response or says the process has been
accepted), the outputs are stored in their own files
- the above two, but with status support, in which the response document
has to be updated according to the current process state
Requirements I've tried to address:
- possible to control the max number of processes executing (both
synch and asynch)
- give synch processes higher priority since the http connection won't
last forever
- allow pluggable executor controllers, so that people can use them to interact
with a remote execution system with reattachment after a WPS crash/restart,
eventually support restartable processes with checkpoints, allow round robin
handling of WPS server installations (in this case the executor
manager would be clustered
and use some sharing tecnology so that all nodes know about the process and
its state), and eventually even create hollow processes that do not do squat
in GeoServer, but have a way to directly submit the inputs to a remote system
without having to keep a live thread talking to them in GeoServer
- have a way to list and kill processes, so that one day (not this week) we can
show a WPS managment console where the admin can see what is running
and eventually terminate it
- at the very least include input parsing in the overall execution
time, with the
possibly to also include output encoding for async processes (the synch ones
will stream out instead)
To get there I thought about a two level executor managers setup.
The inner level, pluggable, is as follows:
public interface ProcessManager {
/**
* If this process manager can handle the submission
*/
boolean canHandle(Name processName);
/**
* Asynchronous submission, not blocking. Returns an id that can
be used to get the process
* status later.
*
* @param processName The name of the process
* @param inputs The process inputs
* @param priority The process priority, the higher it is, the
sooner the process should be
* executed. This is just a hint, the implementation is
free to disregard it.
* @return The execution id
* @throws ProcessException
*/
String submit(Name processName, Map<String, Object> inputs, int priority)
throws ProcessException;
/**
* Returns the status of an asynch call if the id is known, null otherwise
*/
ExecutionStatus getStatus(String executionId);
/**
* Gets the process output. Will block the caller for at most
"timeout" if the process execution
* is not complete. Once the output is retrieved the process will
be marked as terminated and it
* won't be possible to get its outputs or status anymore.
*
* @param executionId
* @param timeout
* @return
*/
Map<String, Object> getOutput(String executionId, long timeout)
throws ProcessException;
/**
* Cancels the process execution. If the process is queued for
execution it will be
*/
void cancel(String executionId);
/**
* Lists the processes currently running (might include, or not,
ids for the synchronous
* submissions as well)
*
* @return
*/
List<ExecutionStatus> getRunningProcesses();
}
As you can see it's quite close to Gabriel's suggestion, unlike the GeoTools one
does not require to implement odd methods and does not assume the usage of
an ExecutorService.
The submission is always asynch, for synch processes the caller will
just call getOutput(...)
right away and get blocked on it while the aysnch mode will use the process id
to build a response right away.
The priority can be used by implemetors to give synch processes a
higher execution
priority. The default impleemntation will actually use two executor
services so that
asynch processes can never fully stop the synch ones, and have the
asynch processes
run in threads at a lower priority than synch ones.
The "canHandle" method is there because I envison setups in which there might
be more than one ProcessManager, with managers dedicated to work only against
locally running processes and others dedicated to talk to exernal
execution facilities.
In these setups a composite process manager will take care of unifying
the work done
by the various implementations.
The ExecutionStatus is as follows:
public class ExecutionStatus {
enum ProcessState {
QUEUED, RUNNING, COMPLETED, CANCELLED
};
/**
* The process being executed
*/
Name processName;
/**
* The execution id, can be used to retrieve the process results
*/
String executionId;
/**
* Current execution status
*/
ProcessState phase;
}
I did not include a FAILED state since once you know it's complete you get
the output and that will result in an exception in case of failure.
The above apparently does not care for accounting for input parsing and
output encoding,
Input parsing wise I'll use the fact the input is a Map, an interface, to plug
in a Map implementation that does the parsing on demand, basically forcing
in the parsing time during the process execution.
Output wise, it depends. Non stored processes will encode the outputs
in a streaming way, using the request thread.
Stored processes will instead be handled by a WPSExecutionManager
that will wait for the process to be executed and then write out the
response document(s), altering the progress state and values of the
underlying process manager to account for that.
public class WPSExecutionManager {
/**
* Asynchronous submission, not blocking. Returns an id that can
be used to get the process
* status later.
*
* @param ExecuteType The request to be executed
* @param inputs The process inputs
* @return The execution id
* @throws ProcessException
*/
String submit(ExecuteType request) throws ProcessException {
}
/**
* Returns the status of an asynch call if the id is known, null otherwise
*/
ExecutionStatus getStatus(String executionId) {
}
/**
* Returns the file containing the final response for this execution id
*
* @param executionId
* @return
*/
File getStoredResponse(String executionId) {
}
/**
* Returns the execute response for synch requests. This call is blocking.
*
* @param executionId
* @return
*/
ExecuteResponseType getSynchResponse(String executionId) {
}
}
Well, this is it, I'm likely going to implement it starting from Wednesday
(tomorrow is a holiday in Italy) unless there is some major pushback.
Feedback welcomed as usual.
Cheers
Andrea
--
-------------------------------------------------------
Ing. Andrea Aime
GeoSolutions S.A.S.
Tech lead
Via Poggio alle Viti 1187
55054 Massarosa (LU)
Italy
phone: +39 0584 962313
fax: +39 0584 962313
http://www.geo-solutions.it
http://geo-solutions.blogspot.com/
http://www.youtube.com/user/GeoSolutionsIT
http://www.linkedin.com/in/andreaaime
http://twitter.com/geowolf
-------------------------------------------------------