Skip to content

feature: real-time layer#2270

Draft
steff-o wants to merge 3 commits intoorigo-map:masterfrom
ornskoldsvikskommun:realtime-layer
Draft

feature: real-time layer#2270
steff-o wants to merge 3 commits intoorigo-map:masterfrom
ornskoldsvikskommun:realtime-layer

Conversation

@steff-o
Copy link
Contributor

@steff-o steff-o commented Dec 15, 2025

Resolves #2262.

Creating as draft as we are not ready with the backend, which may seriously change the requirements on the layer type. But it is working for you all to enjoy during the Christmas holiday.

In order to test it you will need a backend sending SSE events. The event should be a named event update for updates and inserts, they are treated equally like PUT. The payload should be exactly one feature encoded as GeoJson. For deletions a delete event should be send with an object with one property called id containing the id (as previously sent as id in GeoJson) to delete. To connect to the server the server should accept a query-argument named layer.

There is no fetching of an initial state, so the layer will be empty until events arrive. But the server may send a series of events on connect. Nothing special about them, but it will emulate an initial state. I have also considered adding support for fetching an initial state through a WFS-endpoint, but since the rest of the code does not use WFS and the source may not be backed up by a database I thought it is easier if the real-time server provides it as ordinary events. For large datasets it could possibly become slow, but by sending the events individually instead of one giant combined initial event it will not reach buffer limits.

I have implemented a reference implementation of a server to demonstrate the functionality without having to create a backend: https://github.com/ornskoldsvikskommun/origo-realtime-reference-api It is pretty simple and uses a PostGis database with triggers as source.

The implementation uses Server Side Events (SSE) as specified by the EventSource specification. It has some serious limitations when using HTTP/1.1 as it will for each layer hog one of the six available connections for each remote host. Normally this is not a problem as most server sides can be written to support HTTP/2, either natively or by putting a HTTP/2 capable reverse proxy (HAProxy, Nginx, IIS or whatever) in front of it. The only situation where it can't be avoided is when using IIS with Windows Integrated Authentication. It will work, but connection will be downgraded to HTTP/1.1. Some other limitations is that it is not possible to send custom headers and the connection can only be instantiated with a GET-request.

Features

  • Automatic update of changed features, attributes and geometries
  • Disconnect from events when not visible
  • Automatic reconnect when visible
  • Toaster when connection is lost
  • Toaster when connection is restored
  • Feature info, but popup does not move when object moves as featureInfo is a can of worms.
  • Named sources, just like WFS layers.

Configuration

"source": {
    "realtime": {
      "url": "http://localhost:3003/subscribe", // A query param layer=id will be added. Other query params are possible in the base
      "projection": "EPSG:3006" // Defaults to 4326, so not needed if following spec. Can also be set on layer.
  },

"layers": [
{
      "name": "realtime punktlager", // Internal Origo name
      "id": "punktlager", // Sent as layer parameter to source. If not present name is used
      "title": "RealTime Punkt",
      "group": "root",
      "source": "realtime", // A named source 
      "type": "REALTIME", // Must be REALTIME
      "attributes": [
        {
          "title": "Fritext",
          "name": "fritext",
          "type": "text"
        }
      ],
      "projection": "EPSG:3006", // Defaults to 4326, so not needed if following spec. If present overrides source
      "geometryName": "the_geom", // name of geometry in OL for created features
      "visible": true
    }
]

Future Improvements

Here are some ideas. Some may be added to this PR, some will have to wait for someone to need it.

  • Support for websockets if it turns out that SSE isn't as good as I thought. It could easily be implemented as an alternative transport in the same source class
  • Configurable CORS. Right now it is always on
  • Update popup on events
  • Support for multiple popups would be cool
  • Better support for initial state
  • Configurable if layer should disconnect when hidden
  • Filters

@02JanDal
Copy link

Very interesting! I've been looking at something similar previously.

May I suggest aligning the API with OGC API Publish-Subscribe Workflow (note that while the specification uses MQTT as an example, it does not prescribe any delivery mechanism so SSE or WebSockets can be used, it should mostly be a question of using the CloudEvent payload, and potentially adding a new OGC API Features layer type on which realtime can be enabled, rather than a dedicated realtime layer time)? pygeoapi has an existing implementation of the standard, against which a client could be tested.

@steff-o
Copy link
Contributor Author

steff-o commented Feb 2, 2026

@02JanDal, interesting input. Before I wrote this implementation I did a quick survey to see if there were any standards to use before making my own format as standards would be the way to go. Being a GIS application, the first place I looked was OGC, but I thought OGC API Features pub/sub only supported MQTT and I wanted to keep the implementation dependency free and very lightweight footprint and then chose SSE with custom events as in general there seemed to be no clear standard that stood out.

After I read up on OGC API again, most of the OGC API features seems to be about searchability and metadata, which in the end will give you a link to the broker where an actual stream can be found. My implementation cuts to the chase and goes straight to the SSE stream as the author of the Origo configuration would most likely know the correct connection parameters.

My goal was from the beginning to implement a lightweight client which communicates with a custom server side that is easy to implement for different source data instead of making layer types for every possible type of real time source. The aim was to only make the server side a proxy to external real time data sources. As there are too many different kinds of real time sources, I thought that hiding implementation server side would make it easy to provide for any kind of source.

It is a good point that a real time layer could be a OCG Feature layer with real time support, but in theory any kind of layer could potentially return a link to a stream where future updates to the initially received data will be published. In fact one of my first ideas was to extend the WFS layer with real time capabilities, but with static configuration instead of receiving a URL to the stream. WFS would been used to get the initial state, followed by a SSE stream with updates.

When testing my implementation I have also identified the possible need of a "landing url" before connecting to the actual stream. In my case it is mainly because SSE does not support enough mechanisms to provide for authentication and error handling. Taking inspiration from Trafikverkets real time api: https://data.trafikverket.se/documentation/datacache/the-request such an implementation would make use of a REST POST endpoint where the initial filters and authentication is sent. The initial request would then create a session and return a link to the stream for that session using the session id as parameter. When you think about it, that is pretty much like an OGC Features api request that returns a link to a stream.

So, your comment has really questioned my descicions and I'm totally uncertain where to go next. Implementing a complete OGC Features Api layer seems like a bigger task than I expected, but in the end probably useful. We actaully have an issue on creating a OCG Features layer (#1977). Adding real time capabilities to a layer is actually a pretty small task. My implementation is less than 100 relevant lines of code. Implementing the events in the OGC proposed Geojson payload format would be no big task either. It's pretty much the same format as I use although my format is not as formal. Both are basically a GeoJson wrapped in a control object with some event information . It would add a bit to the overhead though.

Maybe I could have a look at implementing a very minimal OGC Features Api layer with an SSE stream using the geojson or CloudEvents payload.

@steff-o
Copy link
Contributor Author

steff-o commented Feb 16, 2026

After looking into it more in detail, I still find the OGC API Pub/sub road interesting, but I can not see how to combine it with SSE and still maintain combability with the standards involved:

  • OCG API is in itself protocol agnostic, but there is no way of telling that a hub uses SSE as transport
    • The hub-field that can specify an URL to the hub. HTTP is mentioned as protocol, but not how to use it, so it could be any protocol on top of HTTP basically
    • The sevice-desc field could return a AsyncApi description, which can be used to further specify the capabilities of the hub, but I can't find any way to specify SSE as transport.
  • CloudEvents have no specification for SSE
    • It has an HTTP specification, but that seems more like a response format for polling or PUTs as it specifies the content type to be cloud-events. SSE must return Content-Type: text/event-stream and have some explicit fields set on top level in the message.
    • Technically it is easy to wrap a CloudEvent in an SSE event and promote id to top level and embedd the entire message in the data field, but it will be non-standard as there is no standard.

All in all, it seems to me that following the hub-link would require some knowledge on how the hub works before trying to connect when using the EventSource class for SSE. We would still only actually support SSE, but as it is not discoverable it is probably not future proof as it seems to me like there is no standard way of supporting SSE as transport.

This has given me a lot of headache and it now leaves me with tree options:

  1. Proceed with the current implementation wich will give us a super lightweight but completely propriety implementation and possibly some limitations regarding authentication.
  2. Meet halfway, and create a OCG feature API layer that follows the hub-link and assumes an SSE-stream with CloudEvent payload wrapped in SSE events. This will require a custom backend. It would be easy to adjust by adding another transport if a standard product emerges.
  3. Implement OGC Feature API using websockets as transport instead of SSE. CloudEvents has a specification for websockets but it is still in draft (which the entire pub/sub spec also is anyway). Only support for hub-link is needed. No AsyncApi service-desc support would be needed as protocol is explicit from url and if we only implement CloudEvents and send that as only supported sub-protocol we don't have to discover it. This would follow the (upcoming) standard, but I doubt that there will be a standard product that supports it anyway as the backend would be pretty specific to proxy arbitrary real time sources.

Right now I'm leaning towards 3. The main benefit over 2 is that websockets seems more likely to conform to standards. Drawback with websockets is that I find them a bit harder to implement robustly and could potentially cause more trouble in loadbalancers and reverse proxys etc.

I could have misunderstood some things, so feel free to correct me @02JanDal .

@02JanDal
Copy link

Given that it's the only existing server implementation for this I'm aware of but also usually is one of the first to implement new OGC API standards, I'd probably lean towards whatever would work with pygeoapi. Annoyingly the documentation doesn't provide a lot of hints if one doesn't want to use MQTT though. It should be relatively trivial to create a simple broker that accepts SSE clients and events as sent by pygeoapi's HTTP-broker option, at least for testing (might also be an option to integrate that into origo-server).

Your option 3. definitely has the upside of following the existing (draft) specifications the closest. On the other hand, SSE would be the better technical choice here as it's a unidirectional data flow.

I think a decent way to tell the client that a hub speaks SSE would be to simple use the link type attribute set to text/event-stream:

"links": [
  ...
  {
    "rel": "hub",
    "href": "https://...",
    "type": "text/event-stream"
  }
]

Alternatively, I don't think simply deciding that Origo only speaks SSE (for now) and documenting that would be terrible either.

@steff-o
Copy link
Contributor Author

steff-o commented Feb 19, 2026

Yes, the pygeoapi documentation is pretty vague on that point. After looking at the code changes for the PR where pub/sub was introduced it looks to me like pygeoapi does not actually implement a broker of its own. It looks like it publishes changes to a broker and in the AsyncApi you can point out where that broker is. The HTTP pub/sub configuration is implemented as a POST to the configured URL and the MQTT configuration is implemented as a publish event call to a broker. So my conclusion is that it still would be necessary to implement some sort of broker that accepts subscriptions from origo to use pygeoapi.

Implementing such a broker is pretty trivial regardless of transport. If pygeoapi has a change discovery mechanism that publishes POST events to a configured broker, that would be a great start for a generic implementation as writing a broker that accepts POST:ed events and turn them into websocket/SEE events to send to subscribers (i.e Origo) is a walk in the park.

For more complex scenarios a custom backend would still be useful. The following use cases would benefit from implementing both OCG Api and broker in the same server side or at least have the possibility to share sessions.

  • Authorization on the channel. Both websockets and SSE have very limited possibilities for authorization without implementing an authorization sceheme on top of the transport, which would be non-standard.
    • The call to OGC api could authorize the call and set a cookie that the broker could read as use a pre-authentication
    • or generate a "private" url to the broker with a session id
  • Filtering in the stream based on the initial query
    • The connection to OGC api could generate a "private" url to the broker with a session id
    • The connection to OGC api could set a session cookie that the broker could read
  • Receiving the current state and ensure no events are missed
    • The OGC api request could return the current state of the data and changes will be received in the stream. To ensure that no events are missed between the response and connection to the broker, the api could return a "private" url that contains id of the last sent event in the response or a session identifier

@steff-o steff-o closed this Feb 27, 2026
@steff-o
Copy link
Contributor Author

steff-o commented Feb 27, 2026

GitHub automatically closed this PR as the new changes resulted in that all previous code was nullified. Opening again with a brand new approach replacing SSE with websockets.

Completely new implementation based on OCG Features Api pub/sub pattern and websockets

This implementation adds a new layer type that implements a very basic OCG Features Api source. It only supports getting an items collection, without any filter, bbox och CRS support. Reason for this is that it only serves as a bootstrap for a real time stream. However it can easily be augmented to support more of the standard.

If the response contains a hub-link to a stream it connects to the stream and receives updates. To be future proof the layer must be configured with real time support, as default is to NOT follow links. It only supports websockets as hub endpoint. It does not support retrieving AsyncApi documents to discover links to stream hubs.

Since data can be of different kind with respect to current/inital state and if missed events are crucial it is possible to configure the layer to support different scenarios to reflect how the server handles initial states and reconnects.

  • For data that has a current state (a backing table) that should be sent as an initial state to each connection client the server could either:
    • Send the current content of the table as features in the items response and a link to the stream with following updates. In order to make sure no events are missed between the items response and connecting to the stream, the server could generate a link with a query parameter holding the last event id from the items response or create a session and set set session id as query param. Both can also be set as cookies instead.
    • Return an empty set in the items response and return all features as events on connection.
  • For data that must not miss any events on reconnect (seldom updated events) there are two possibilities:
    • The client can perform a full reload of the layer by calling the features api endpoint to get a new hub-link to try
    • The client can try to reconnect to the websocket only. To make sure no events are missed the server must keep track of the last sent event-id in a session as the reconnect call does not contain and last received id as websockets have no support for that and implementing such a mechanism in the client would be non-standard.

The layer implementation supports

  • Automatic update of changed features, attributes and geometries
  • Optional disconnect from events when not visible
  • Automatic reconnect when visible
  • Optional automatic reconnect when connection is lost
  • Toaster when connection is lost
  • Toaster when connection is restored
  • Feature info, but popup does not move when object moves as featureInfo is a can of worms.
  • Named sources, just like WFS layers.

Considered standards

Testing

As there are no know servers that implement a websocket hub it is pretty tough to test. I have updated the reference server at https://github.com/ornskoldsvikskommun/origo-realtime-reference-api to work with this layer type, but since I have developed both it could be the same errors on both side.

Configuration

First define a source:

"source": {
    "realtime-ref": {
      "url": "http://localhost:3004/featuresapi/",
      "projection": "EPSG:3006"
    },

Then a layer:

{
      "name": "realtime linjelager ",
      "id": "linjelager",
      "title": "RealTime Linje",
      "group": "root",
      "source": "realtime-ref",
      "type": "FEATURESAPI",
      "realtimeReconnect": "full", // one of 'none', 'full', 'stream', default 'full'
      "realtime": true, // Enable real time updates
      "realtimeDisconnectOnHide": true, // Disconnect when layer is hidden and reconnect in visible. Default true
      "editable": false,
      "attributes": [
        {
          "title": "Fritext",
          "name": "fritext",
          "type": "text"
        }
      ],
      "geometryName": "the_geom",
      "geometryType": "LineString",
      "visible": true
    },

The realtimeReconnect setting is used to control how reconnects are performed depending on the server's capabilities.

  • full is useful if the server does can not handle catch up on missed events or streams are temporary and a new stream has to be set up for each connection. In full mode a new request is made to the features api items endpoint to receive new initial state and a new stream url.
  • stream is useful if it does not matter if events are missed or the server has a catch up mechanism. In stream mode it only reconnects to the websocket using the url from the original request.

@steff-o steff-o reopened this Feb 27, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Real time layer

2 participants