Dear all,
we would like to push on the GeoServer repo a new Community Module which aims to improve the Catalog and Data notification mechanism. Please find below more technical details on that.
Can I proceed with that?
···
Best Regards,
Alessio Fabiani.
====================================================
Notification community module
The notification community module is meant to be a pluggable system to listen, summarize and notify events triggered by GeoServer data and configuration manipulation to some external source, in some agreed upon format.
The potential events of interest are:
-
Catalog configuration changes (insert/update/removal of layers, styles, workspaces, stores, groups and so on)
-
Data changes via WFS-T (anything that can affect the data precise bounding box)
-
Service configuration changes (insert/update/removal of global or per workspace service configuration).
-
OGC requests themselves (with an overlap with monitoring)
For the initial implementation only catalog configuration changes and data changes are of interest, more can be added later.
The system is required to pose a minimal impact on GeoServer own activities, so all events need to be queued and notified to external system asynchronously. This poses also some constraint on event listening, as some events contain information that can only be used while it’s being notified, and will become invalid or unreachable at a later time.
The system is also required to be completely pluggable in terms of notification destinations, potential targets can be direct HTTP calls to external system, message queues, log files, email.
The message format can also vary depending on the target and intended usage, both in terms of contents, e.g., it could be full of details or simply an indication of what changed, and encoding, e.g., xml, json, text, html.
Overall architecture
The overall architecture is depicted in the following diagram:
The system basically generates a set of events, has a configuration to match them with a desired tool to send the message out (the processor). The sender can be conceived as a the combination of an “encoder” that generates the message payload and a “sender”, but some real world cases might require for them to be combined in a single entity, so the design is not enforcing a split, but suggesting it to be leverage when sensible.
Each message is combined with its processor and send into a destination queue, where a thread pool picks the events and runs their processor. For some type of events, like catalog ones, the thread pool will have to be configured with just one thread to make sure the events are sent in the right order to the destinations.
The message multiplexer will be configured by a XML configuration file using a structure like the following:
1000
1000
1
type = catalog or type = transaction
…
Notice the filters, as each queue can be the target of different type of messages. The filters are meant to be expressed CQL and target the properties of a Notification. The first implementation of the system might be ignoring the filters.
Main interfaces
“Notification” objects are built by the listeners and put in the main queue, and have a basic structure:
public interface Notification {
/**
- The type of event
*/
public enum Type { Catalog, Data, /* Request, Service */};
/**
- The event action, if applicable
*/
public enum Action { Add, Remove, Update, None };
/**
-
An event handle, identifying the event (can be coming from an external system to avoid
-
re-processing notifications for action the external system has undertaken)
*/
public String getHandle();
/**
- The event type
*/
public Type getType();
/**
-
The event action
-
@return
*/
public Action getAction();
/**
-
The “object” of the event, could be what has been created/inserted/modified, the container
-
of it, the request, and so on. Typically a catalog object, a service object, or a Request
-
@return
*/
public Object getObject();
/**
-
A set of “properties” attached to the event, could be properties being changed, the bounds
-
being affected, and so on
-
@return
*/
public Map<String, Object> getActionProperties();
/**
- The user triggering the change, if any
*/
public String getUser();
}
The multiplexer builds ProcessableNotifications which are then put in target specific queues by attaching a MessageProcessor to them. Here are some interfaces for them:
/**
- Processes notifications in some way
*/
public interface NotificationProcessor {
void process(Notification notification) throws IOException;
}
public class DefaultNotificationProcessor implements NotificationProcessor {
NotificationEncoder encoder;
NotificationSender sender;
public DefaultNotificationProcessor(NotificationEncoder encoder, NotificationSender sender) {
super();
this.encoder = encoder;
this.sender = sender;
}
@anonymised.com
public void process(Notification notification) throws IOException {
byte payload = encoder.encode(notification);
sender.send(notification, payload);
}
}
/**
- Encodes a notification into some paylad format
*/
public interface NotificationEncoder {
public byte encode(Notification notification);
}
/**
- Sends an encoded payload to some destination
*/
public interface NotificationSender {
public void send(Notification notification, byte payload) throws IOException;
}
The configuration will be read and written via XStream, since the configuration is pluggable the code will look for NotificationXStreamInitializer objects to setup XStream:
/**
- Initializes XStream for the notification subsystem
*/
public interface NotificationXStreamInitializer {
void initialize(XStream xstream);
}
The catalog listener
The catalog listener implements CatalogListener and builds notification objects of Type “catalog”. In particular:
-
the “handle” will be fetched from the eventual REST request “handle” request parameter, or be null otherwise (e.g, if the modification happens via admin UI or the handle key is not available in the REST request)
-
the “object” of the notification will always be the CatalogEvent source
-
the “action” will be the corresponding one
-
the “properties” will be null for add/remove events, will map the modified properties and their value after modification for removals
Since the catalog has no transactional support each event will build and send a separate notification (e.g, creating a layer and associating a style will send two notifications).
The transaction listener
The transaction listener implements the TransactionPlugin interface in order to compose notifications only for successful transactions (TransactionListener issues events for all types of transactions instead).
Transaction is a complex beast, a single operation can perform multiple inserts/update/delete against several different layers.
The Transaction listener will thus handle a the following notification build process:
-
On beforeTransaction it’s going to allocate a map from layer to notification in a thread local
-
Each transaction event will be used to accumulate information about changes in a layer (count of features affected, and overall bbox changed)
-
One notification for each modified layer will be issued on afterTransaction, but only if the commit was successful
Notification wise:
-
the “handle” will be fetched from the Transaction request (it has one)
-
the “object” of the notification will be the modified FeatureType
-
the “action” will be empty (detailed information provided in the properties)
-
the “properties” will contain 4 properties
-
“bounds”, reporting the cumulative modified bounds across all transaction elements
-
“inserted”, reporting the number of inserted features
-
“updated”, reporting the number of updated features
-
“removed”, reporting the number of removed features
The GeoNode payload encoder
Each event will be notified to GeoNode via RabbitMQ and will be processed by GeoNode using Kombu. Kombu requires messages to be encoded in JSON.
The message will be a JSON object containing the following basic properties:
-
“id”, a unique identifier for the notification (can be auto-generated)
-
“type”, the type of the notification
-
“action”, the notification action
-
“generator”, fixed to “GeoServer”
-
“timestamp”, a ISO encoded timestamp of when the notification was created
-
“user”, the user triggering the change
-
“originator”, the ip/machine name of the host from which the message is coming from
-
“source”, a object reporting what is the affected resource (each resource will have a different encoding)
-
“properties”, the map of properties from the notification, if any
Source encoding always contains:
-
“id”, which is normally the catalog identifier,
-
“type”, which is the class name of the resource being modified (e.g., “DataStorenfo”, “LayerInfo”)
The following table reports the properties to be also included in the encoding depending on the source info type:
Object type |
Properties |
---|---|
WorkspaceInfo |
- name - namespaceURI |
NamespaceInfo |
Same as WorkspaceInfo |
ResourceInfo (abstract) |
- name - workspace - nativeName - store (fully qualified name) - geographicBunds - bounds |
FeatureTypeInfo |
Extends from ResourceInfo, adds no extra properties |
CoverageInfo |
Extends from ResourceInfo, adds no extra properties |
WMSLayerInfo |
Extends from ResourceInfo, adds no extra properties |
StoreInfo (and all subclasses) |
- name - workspace |
PublishedInfo (abstract) |
- name - workspace |
LayerInfo |
Extends from PublishedInfo adding: - defaultStyle (name) - styles (list of names) - resourceType (vector, raster, wms) - geographicBounds - bounds |
LayerGroupInfo |
Extends from PublishedInfo adding: - mode (group type) - root layer (fully qualified name, if available) - root layer style (fully qualified name, if available) - layers (list of fully qualified name of immediate children along with their style, or not reported if using the default) |
Bounds are always specified as minx/miny/maxx/maxy, their CRS is in a CRS property specified as “EPSG:xyzw”, the axis order is forced to east/north:
{ “minx”: -100, “miny”: 40, “maxx”: -80, “maxy”: 80, crs: “EPSG:4326” }
Here are some examples of messages to be encoded.
Layer creation:
{
“id”:123e4567-e89b-12d3-a456-426655440001,
“type”:“Catalog”,
“action”:“add”
“generator”:“GeoServer”,
“timestamp”: “2002-03-01T13:00:00Z”,
“user”: “admin”,
“originator”: “10.1.25.30”,
“source”: {
“id”:“LayerInfoImpl–570ae188:124761b8d78:-7fc0”,
“type”:“LayerInfo”,
“type”: “VECTOR”,
“name”:“states”,
“nativeName”:“states”,
“workspace”:“topp”,
“defaultStyle”:“polygon”,
“styles”: [“line”,“point”],
“bounds”: { “minx”: -100, “miny”: 40, “maxx”: -80, “maxy”: 80, crs: “EPSG:4326” },
“nativeBounds”: { “minx”: -100, “miny”: 40, “maxx”: -80, “maxy”: 80, crs: “EPSG:4326” }
]
}
}
Layer group creation:
{
“id”:123e4567-e89b-12d3-a456-426655440001,
“type”:“Catalog”,
“action”:“add”
“generator”:“GeoServer”,
“timestamp”: “2002-03-01T13:00:00Z”,
“user”: “admin”,
“originator”: “10.1.25.30”,
“source”: {
“id”:“abc-d5r1232135”,
“type”:“LayerGroupInfo”,
“mode”: “Single”,
“name”:“theGroup”,
“workspace”:“topp”,
“layers”: [ {“name”: “ny:roads”, style : “line”}, {“name”: “nestedGroup”} ],
“bounds”: { “minx”: -100, “miny”: 40, “maxx”: -80, “maxy”: 80, crs: “EPSG:4326” },
“nativeBounds”: { “minx”: -100, “miny”: 40, “maxx”: -80, “maxy”: 80, crs: “EPSG:4326” }
}
}
Features modified in topp:states:
{
“id”:123e4567-e89b-12d3-a456-426655440001,
“type”:“Data”,
“generator”:“GeoServer”,
“timestamp”: “2002-03-01T13:00:00Z”,
“user”: “editor”,
“originator”: “10.1.25.30”,
“source”: {
“id”:“abc-d5r1232135”,
“type”:“FeatureInfo”,
“name”:“states”,
“nativeName”:“states”,
“workspace”:“topp”,
“defaultStyle”:“polygon”
“styles”: [“line”,“point”], “bounds”: { “minx”: -100, “miny”: 40, “maxx”: -80, “maxy”: 80, crs: “EPSG:4326” },
“nativeBounds”: { “minx”: -100, “miny”: 40, “maxx”: -55, “maxy”: 80, crs: “EPSG:4326” }
},
properties : {
“bounds”: { “minx”: -100, “miny”: 40, “maxx”: -80, “maxy”: 80, crs: “EPSG:4326” },
“inserted”: 15,
“updated”: 1,
“removed”: 5
}
}
The RabbitMQ sender
This sender will take the JSON payloads and send them to the RabbitMQ server. It must be configurable and compliant with RabbitMQ APIs.
In particular it must be possible to configure AMQ Channel properties like:
-
URI
-
Host and Port
-
Credentials
-
Exchange Declaration (Name and Type)
-
Queue Declaration (Name, Exchange and Routing Key)
Those options must be configured through the notification XML configuration file, using xstream, as reported before.
For the GeoNode use case, we are interested in particular to “fanout” exchange types.
Example Class and Configuration
Sender must be built upon NotificationConfiguration. XStreamSerializer will instantiate the class through reflection serializer (in most of the cases) or a custom one.
…
…
The sender is a class implementing the NotificationSender interface and containing the properties needed to connect and send the message to the destination.
An example of a possible implementation of the RabbitMQSender could be the following
/**
- Sends an encoded payload to some destination
*/
public abstract class RabbitMQSender implements NotificationSender {
protected String host;
protected String virtualHost;
protected int port;
protected String username;
protected String password;
protected String uri;
protected Connection conn;
protected Channel channel;
public void initialize() throws Exception {
if (uri == null) {
this.uri=“amqp://”+this.username+“:”+this.password+”@”+this.host+“:”+this.port+“/”+this.virtualHost;
}
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(this.uri);
conn = factory.newConnection();
channel = conn.createChannel();
}
public void close() throws Exception {
if (this.conn != null) {
this.conn.close();
}
if (this.channel != null) {
this.channel.close();
}
}
// Prepare Connection Channel
public void send(Notification notification, byte payload) throws IOException {
try {
this.initialize();
this.sendMessage(notification, payload);
} finally {
this.close();
}
}
// Send message to the Queue by using Channel
public abstract void send(Notification notification, byte payload) throws IOException;
}
// Fanout Queue Types are needed by consumers like GeoNode
public class FanoutRabbitMQSender extends RabbitMQSender {
public static final String EXCHANGE_TYPE = “fanout”;
protected String exchangeName;
protected String routingKey;
@anonymised.com
public void sendMessage(Notification notification, byte payload) throws IOException {
channel.exchangeDeclare(exchangeName, EXCHANGE_TYPE);
channel.basicPublish(exchangeName, routingKey, null, payload);
}
}
==
GeoServer Professional Services from the experts!
Visit http://goo.gl/it488V for more information.
Ing. Alessio Fabiani
@alfa7691
Founder/Technical Lead
GeoSolutions S.A.S.
Via di Montramito 3/A
55054 Massarosa (LU)
Italy
phone: +39 0584 962313
fax: +39 0584 1660272
mob: +39 331 6233686
http://www.geo-solutions.it
http://twitter.com/geosolutions_it
AVVERTENZE AI SENSI DEL D.Lgs. 196/2003
Le informazioni contenute in questo messaggio di posta elettronica e/o nel/i file/s allegato/i sono da considerarsi strettamente riservate. Il loro utilizzo è consentito esclusivamente al destinatario del messaggio, per le finalità indicate nel messaggio stesso. Qualora riceviate questo messaggio senza esserne il destinatario, Vi preghiamo cortesemente di darcene notizia via e-mail e di procedere alla distruzione del messaggio stesso, cancellandolo dal Vostro sistema. Conservare il messaggio stesso, divulgarlo anche in parte, distribuirlo ad altri soggetti, copiarlo, od utilizzarlo per finalità diverse, costituisce comportamento contrario ai principi dettati dal D.Lgs. 196/2003.
The information in this message and/or attachments, is intended solely for the attention and use of the named addressee(s) and may be confidential or proprietary in nature or covered by the provisions of privacy act (Legislative Decree June, 30 2003, no.196 - Italy’s New Data Protection Code).Any use not in accord with its purpose, any disclosure, reproduction, copying, distribution, or either dissemination, either whole or partial, is strictly forbidden except previous formal approval of the named addressee(s). If you are not the intended recipient, please contact immediately the sender by telephone, fax or e-mail and delete the information in this message that has been received in error. The sender does not give any warranty or accept liability as the content, accuracy or completeness of sent messages and accepts no responsibility for changes made after they were sent or for other risks which arise as a result of e-mail transmission, viruses, etc.