Eclipse Sparkplug® Cloud Connector
The org.eclipse.kura.cloudconnection.sparkplug.mqtt.provider
package provides a Eclipse Kura Cloud Connection that implements the Eclipse Sparkplug® v3.0.0 specification.
Minimum requirements
This addon is compatible with Kura 5.2+.
Introduction to Eclipse Sparkplug
from Eclipse Sparkplug
Sparkplug is an open software specification that provides MQTT clients the framework to seamlessly integrate data from their applications, sensors, devices, and gateways within the MQTT Infrastructure. It is specifically designed for use in Industrial Internet of Things (IIoT) architectures to ensure a high level of reliability and interoperability.
The specification aims fulfill the following 3 goals:
- Define a common MQTT topic namespace.
- Define a common MQTT state management.
- Define a common MQTT payload.
To achieve that, the Eclipse Sparkplug specification defines an architecture (see picture below) and 4 main actors:
- Device: a collection of related data points, which may represent a physical device (like a PLC, a set of sensors, etc.) and that notify the value or quality change of their data points. In this cloud connection, a Device is represented by the attached Cloud Publishers.
- Edge Node: the gateway that is responsible of the interaction with the MQTT broker and that estabilished sessions with the data-consuming Host Applications. This Cloud Connection assumes the role of Edge Node.
- MQTT Server: a MQTT server that supports the v3.1.1 (or v5.0) version of the protocol.
- Host Application: the data-consuming application that subscribes to the MQTT messages generated by the Edge Nodes. A Primary Host Application is responsible of controlling and monitoring Edge Nodes, which can be configured to modify their behavior based on the state of the Primary Host Application (for example, the application goes offline). You can imagine the Primary Host Application for Eclipse Kura being Eclipse Kapua.
The main principles upon which the Specification is based on can be summarized as follows:
- PubSub Protocol: this is the main topology upon which the architecture is developed.
- Report by Exception (RBE): messages need to be sent by the Edge Node only when values at the edge change, and the message should contain only the value/metrics that changed. There is no need for continuos polling; although it is higly discouraged, it is not mandatory to have Edge Nodes apply the RBE.
- Continuos Session Awareness: Host Applications are aware of the state of the Edge Nodes, and Edge Nodes are aware of the state of the Host Applications. This continuos session awareness is achieved by the means of birth and death certificates and state messages (continue the reading to find out more) and is the key to allow reporting by exception.
- Birth and Death certificates: these messages represent the state of Edge Nodes and Devices (online/offline + data that will be reported). The birth messages are always the first ones that are sent from the Edge Node, and the delivery of death certificates is ensured through the MQTT Will message, even if the connection is lost ungracefully. The birth messages always contain all the metrics that the Device or Edge Node will ever report on. If a new metric is added or a metric gets removed, then a new session needs to be estabilished.
-
Connection Persistence: with the mechanisms above, the connection does not need to be persistent. For example, an Edge Node that disconnects gracefully with a MQTT DISCONNECT packet will not be seen as "dead" from the host application because no death certificate has been triggered (Will messages are sent only on failures). Hence, the Edge Node can implement a logic where it remains connected only during the timeframe needed for sending the new data.
Warning
This Cloud Connection maintains a persistent connection to the MQTT server.
Eclipse Sparkplug Topic Namespace
All clients using the specification must adhere to the following topic namespace:
where:
namespace
: defines the structure of the remaining elements and the encoding for the payload. With Sparkplug v3.0.0 the namespace to utilize isspBv1.0
.group_id
: some Edge Nodes can be related to each other identifying a group (for example, Edge Nodes in a given plant). This element is the identifier for the group.message_type
: defines how to interpret and handle the payload. It encapsulates the semantic of the message and can be one of the following elements:NBIRTH
/NDEATH
: Edge Node birth and death certificates.DBIRTH
/DDEATH
: Device birth and death certificates.NDATA
/DDATA
: message containing data reported by the Edge Node or Device.NCMD
/DMCD
: message containing commands for the Edge Node or Device.STATE
: message from the Primary Host Application indicating the state (offline/online) of the main consumer.
edge_node_id
: the identifier for the Edge Node.device_id
(optional): the identifier for the Device. Must be unique under the sameedge_node_id
and must be always present on messages belonging to Devices (D- type messages).
The combination of group_id
and edge_node_id
must be unique and is called Edge Node Descriptor.
Tip
It is advised to have group_id
, edge_node_id
, and device_id
as small but as descriptive as possible, for better efficiency.
Operational Behavior
This introduction will focus more on the Edge Node and Device, as they are of more interest for this Cloud Connection.
Session Management
The session estabilishment procedure ensures the Edge Node to be subscribed to command-type messages for receiveing commands from the Host Application.
An Edge Node can (optionally, but incouraged) specify to be aware of a primary host application state and, in such case, it needs to subscribe to the relative STATE messages. The Edge Node will send the birth certificate (and thus completing the session init procedure) only after receiving the STATE message denoting the Primary Host Application is online. After connection, if the Edge Node receives a STATE message denoting the Primary Host Application is offline, it must restart the session estabilishment procedure.
On connection, the Edge Node sets a MQTT Will message containing a NDEATH certificate. Doing so, if the MQTT broker does not receive any communication within the Keep Alive period (client lost connection), it will send the Edge Node NDEATH certificate on all subscribers. A birth/death sequence number bdSeq
is maintained in the Edge Node to match NBIRTH with NDEATH messages in the Host Application. Each bdSeq
in the NDEATH message is matched with the corresponding bdSeq
of the previous NBIRTH message. This allows the Host Application tracking the state of the Edge Nodes and mark not up-to-date metrics as STALE.
A Device can send a device DBIRTH message after a Edge Node NBIRTH has been sent. The DBIRTH message contains all the metrics that the device will ever report on. If a new metric is added or an existing one removed, then the Device session needs to be re-estabilished. If the Edge Node looses the connection to some of its Devices, then it needs to send a Device DDEATH certificate on his behalf. The data-consuming Host Application will then mark that particular Device as offline and mark its metrics as STALE. Once the session is estabilished, the Device can publish the changed metrics using the DDATA message type.
Multiple MQTT Server Topologies
A Primary Host Application must publish its STATE message every time it connects to the MQTT broker. This ensures the Edge Nodes to be aware of its status as long as they remain connected to the MQTT server.
At any point in time, an Edge Node can be connected to at most one MQTT server. If multiple MQTT servers are defined each time the Edge Node receives an offline STATE message from its Primary Host Application it needs to terminate the session and estabilish a new one with the next MQTT broker.
Further Resources
Cloud Connection Configuration
Cloud Endpoint Layer Configuration
The cloud endpoint layer allows to attach CloudPublisher
s and CloudSubscriber
s to publish/subscribe messages on Sparkplug topics.
Sparkplug Device
Each CloudPublisher
attached to this cloud connection acts as a Sparkplug Device. The corresponding configuration is shown in the picture below.
The parameter specified as device.id
will dictate the Sparkplug device identifier used to publish messages from this cloud publisher. A device DBIRTH
message is immediately sent from this publisher when the first publish occurs or when a the set of published metrics is changed. The subsequent messages will be published as Sparkplug device data (DDATA
message type).
The Sparkplug Device implemented by this publisher does not support the following features (optional in the Eclipse Sparkplug specification):
tck-id-operational-behavior-device-ddeath
: Device death messages (DDEATH
message type) since the usual way to publish data from the Wire Graph using a WireAsset attached to a CloudPublisher has no implementation for reporting error states (seeWireAsset.onWireReceive
)tck-id-payloads-alias-uniqueness
: Sparkplug aliases for metricstck-id-message-flow-device-dcmd-subscribe
: writing to outputs, hence it will not subscribe to device command messages (DCMD
message type)
Sparkplug Device Payload
The payload of DDATA
message will be encoded using the Sparkplug B Protobuf definition converting the KuraPayload
into Sparkplug payload as follows:
-
Metrics from
KuraPayload.metric()
become Sparkplug metrics. Only the name, timestamp, datatype and value components are added. The timestamp is set to the publishing instant. The datatype is inferred from the Java type as follows:Java Type Sparkplug DataType Boolean
DataType.Boolean
byte[]
DataType.Bytes
Double
DataType.Double
Float
DataType.Float
Byte
DataType.Int8
Short
DataType.Int16
Integer
DataType.Int32
Long
DataType.Int64
String
DataType.String
Date
DataType.DateTime
BigInteger
DataType.UInt64
All other Java types will cause the application to throw an Exception.
-
KuraPayload.getBody()
, if non null, will be copied into the body of the Sparkplug payload -
KuraPayload.getPosition()
, if not null, will be used to create the following metrics from theKuraPosition
object, if the value in there is not null (with the corresponding Sparkplug data types):- kura.position.altitude:
DataType.Double
- kura.position.heading:
DataType.Double
- kura.position.latitude:
DataType.Double
- kura.position.longitude:
DataType.Double
- kura.position.precision:
DataType.Double
- kura.position.satellites:
DataType.Int32
- kura.position.status
DataType.Int32
- kura.position.speed:
DataType.Double
- kura.position.timestamp:
DataType.DateTime
- kura.position.altitude:
-
KuraPayload.getTimestamp()
, if not null, it will be used as the timestamp metric of the Sparkplug payload
Sparkplug Subscriber
This cloud connections allows creating a Cloud Subscriber that will subscribe to a generic set of topics. The configuration is shown in the picture below.
It is assumed that the payloads received by this Cloud Connection are encoded using the Sparkplug B Protobuf definition. Users of this Cloud Subscriber should expect to receive KuraMessage
s containing a KuraPayload
such that:
- If non null, the Sparkplug body is used to set
KuraPayload.setBody()
- If non null, the Sparkplug seq Metric is converted into a Kura metric with name
seq
- If non null, the Sparkplug timestamp is used as Eclipse Kura's payload timestamp
-
All Sparkplug Metrics are converted into Kura metrics with the same name and the following conversion rules:
Sparkplug ValueCase Java Type Boolean
boolean
Bytes
byte[]
Dataset
byte[]
Double
double
Extension
byte[]
Float
float
Integer
int
Long
long
String
String
Template
byte[]
default null
The metric timestamp is not supported in Eclipse Kura, therefore this information is lost at conversion.
Data Service Layer Configuration
The DataService
layer used in this component is the org.eclipse.kura.data.DataService
implementation. Please refer to the Data Service Configuraion page for further details.
Data Transport Layer Configuration
The Sparkplug Data Transport layer bridges the incoming requests to the underlying Eclipse Paho MQTT v3.1.1 client following the Sparkplug specification. In particular, the Data Transport Layer ensures the following.
- Edge Node Sparkplug Session Estabilishment: the Edge Node (this Cloud Connection) estabilishes a new Sparkplug session upon connection. An optional Primary Host Application ID can be specified (see picture below) to make the Cloud Connection wait until the Primary Host Application is online before publishing NBIRTH and DBIRTH messages.
- Edge Node Sparkplug Session Termination: upon disconnection, the Edge Node follows the required specification statements. In particular, multiple space-separated Server URIs can be specified in the component's configuration (see picture below). When a Primary Host Application ID is defined and it receives a STATE message denoting that the configured primary application is offline, then reconnection attempts are made cycling through the Server URIs list. When the last server fails to connect, then the traversal is restarted from the start of the list.
- Edge Node NCMD handler: upon reception of a valid NCMD message, the Transport layer checks if it contains a Node Control/Rebirth metric, and, if set to
true
, restarts the session estabilishment procedure without sending an MQTT CONNECT packet (client connection is not closed, only BIRTH messages are re-sent).
The Sparkplug Data Transport layer is configured to wait for a random period of time between 0sec and 5sec before each connection attempt. This is to ensure that, on large deployments, the target MQTT servers and Host Applications will dilute session estabilishment requests by some margin. This behavior is not part of the Sparkplug specification.
This cloud connection supports SSL connections to the connecting broker. The option SslManagerService.target allows, as an OSGi target filter, to specify the pid of the SslManagerService
instance to use for creating SSL connections for broker URIs that use the ssl://
protocol. The default socket factory is used otherwise.
Sparkplug Implementation Details
Edge Node
- The NBIRTH message sent by this cloud connection will not contain any metrics, except for the mandatory ones required by the specification.
- This cloud connection does not send any NDATA messages in its default implementation.