Processors Configuration

Draco contains a set of processors for storing context data like NGSI events. This document is intended to show, how to configure the NGSI processor listed above:

  • Listen_HTTP (configured as source for receiving notifications from Orion Context Broker)
  • NGSIToMySQL (for storing NGSI events into MySQL Database)
  • NGSIToPostgreSQL (for storing NGSI events into PostgreSQL Database)
  • NGSIToMongo (for storing NGSI events into Mongo Database)

Listen_HTTP Processor

Fist, we need to have Draco up and running. Then you need to follow the steps to add a new processor showed in the Draco GUI section.

Once you are in the add processor window, write inside of the filter box "Http" in order to filter the processors by that keyword and select the "ListenHTTP" processor.

The Listen HTTP processor starts an HTTP Server and listens on a given base path to transform incoming requests into FlowFiles. The default URI of the Service will be http://{hostname}:{port}/v2/notify. Only HEAD and POST requests are supported. GET, PUT, and DELETE will result in an error and the HTTP response status code 405. We use this NiFi native processor for receiving the HTTP notifications coming from Orion Context Broker.

The configuration needed for this processor is showed in the figure above.

listen-processor

Where:

Name Default Value Allowable Values Description
Base Path contentListener Base path for incoming connections. This property has to match with the notify attribute of the subscription made in ORION. Expression Language: true (will be evaluated using variable registry only)
Listening Port no The port to listen on for incoming connections. This value needs to be included in the subscription. Expression Language: true (will be evaluated using variable registry only)
Max Data to Receive per Second no The maximum amount of data to receive per second; this allows the bandwidth to be throttled to a specified data rate; if not specified, the data rate is not throttled
SSL Context Service no Controller Service API, RestrictedSSLContextServiceImplementation, StandardRestrictedSSLContextService
Authorized DN Pattern .* A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.
Max Unconfirmed Flowfile Time 60 secs The maximum amount of time to wait for a FlowFile to be confirmed before it is removed from the cache
HTTP Headers to receive as Attributes (Regex) no Specifies the Regular Expression that determines the names of HTTP Headers that should be passed along as FlowFile attributes. You can capture all the headers by setting to .* otherwise you have to include at least Fiware-service, Fiware-ServicePath and Optionally X-Auth-Token
Return Code 200 The HTTP return code returned after every HTTP call.

NGSIToMySQL Processor

The NGSIToMySQL processor takes the FLowFile generated by the ListenHTTP processor and transforms it into an NGSI event. After, this processor is going to communicate with the DBCPConnectionPoll controller service (previously set in the processor's properties). Finally, the processor, using the controller features, creates the database, the tables and insert the data into the MySQL database using the structure defined by the NGSI standard.

mysql-processor

Name Default Value Allowable Values Description
JDBC Connection Pool no Controller service for connecting to a specific database engine
NGSI version v2 List of supported versions of NGSI (v2 and ld), currently only support v2
Data Model db-by-entity The data model for creating the tables when an event have been received you can choose between: db-by-service-path or db-by-entity, default value is db-by-service-path
Attribute persistence row row, column The mode of storing the data inside of the table
Default Service test In case you dont set the Fiware-Service header in the context broker, this value will be used as Fiware-Service
Default Service path /path In case you dont set the Fiware-ServicePath header in the context broker, this value will be used as Fiware-ServicePath
Enable encoding true true, false True applies the new encoding, false applies the old encoding.
Enable lowercase true true, false True for creating the Schema and Tables name with lowercase.
Batch size 10 The preferred number of FlowFiles to put to the database in a single transaction
Rollback on failure false true, false Specify how to handle error. By default (false), if an error occurs while processing a FlowFile, the FlowFile will be routed to 'failure' or 'retry' relationship based on error type, and processor can continue with next FlowFile. Instead, you may want to rollback currently processed FlowFiles and stop further processing immediately. In that case, you can do so by enabling this 'Rollback On Failure' property. If enabled, failed FlowFiles will stay in the input relationship without penalizing it and being processed repeatedly until it gets processed successfully or removed by other means. It is important to set adequate 'Yield Duration' to avoid retrying too frequently.

NGSIToPostgreSQL Processor

The NGSIToPostgreSQL processor takes the FLowFile generated by the ListenHTTP processor and transforms it into an NGSI event. After, this processor is going to communicate with the DBCPConnectionPoll controller service (previously set in the processor's properties). Finally, the processor, using the controller features, creates the database, the tables and insert the data into the PostgreSQL database using the structure defined by the NGSI standard.

postgresql-processor

Name Default Value Allowable Values Description
JDBC Connection Pool no Controller service for connecting to a specific database engine
NGSI version v2 List of supported versions of NGSI (v2 and ld), currently only support v2
Data Model db-by-entity The data model for creating the tables when an event have been received you can choose between: db-by-service-path or db-by-entity, default value is db-by-service-path
Attribute persistence row row, column The mode of storing the data inside of the table
Default Service test In case you dont set the Fiware-Service header in the context broker, this value will be used as Fiware-Service
Default Service path /path In case you dont set the Fiware-ServicePath header in the context broker, this value will be used as Fiware-ServicePath
Enable encoding true true, false True applies the new encoding, false applies the old encoding.
Enable lowercase true true, false True for creating the Schema and Tables name with lowercase.
Batch size 10 The preferred number of FlowFiles to put to the database in a single transaction
Rollback on failure false true, false Specify how to handle error. By default (false), if an error occurs while processing a FlowFile, the FlowFile will be routed to 'failure' or 'retry' relationship based on error type, and processor can continue with next FlowFile. Instead, you may want to rollback currently processed FlowFiles and stop further processing immediately. In that case, you can do so by enabling this 'Rollback On Failure' property. If enabled, failed FlowFiles will stay in the input relationship without penalizing it and being processed repeatedly until it gets processed successfully or removed by other means. It is important to set adequate 'Yield Duration' to avoid retrying too frequently.

NGSIToMongo Processor

The NGSIToMongo processor takes the FLowFile generated by the ListenHTTP processor and transforms it into an NGSI event. Finally, the processor, creates the schema, documents and collections with the data into the Mongo database using the structure defined by the NGSI standard.

mongo-processor

Name Default Value Allowable Values Description
Mongo URI no "MongoURI, typically of the form: mongodb://host1[:port1],host2[:port2],...]". Also you can add user and pass, example: mongodb://user:password@host1:port
NGSI version v2 List of supported version of NGSI (v2 and ld), currently only support v2
Data Model db-by-entity The data model for creating the tables when an event have been received you can choose between: db-by-service-path or db-by-entity, default value is db-by-service-path
Attribute persistence row row, column The mode of storing the data inside of the table
Default Service test In case you dont set the Fiware-Service header in the context broker, this value will be used as Fiware-Service
Default Service path /path In case you dont set the Fiware-ServicePath header in the context broker, this value will be used as Fiware-ServicePath
Enable encoding true true, false True applies the new encoding, false applies the old encoding.
Enable lowercase true true, false True for creating the Schema and Tables name with lowercase.
Database prefix sth_
Collection prefix sth_ system. is not accepted.
Data Expiration 0 Collections will be removed if older than the value specified in seconds. The reference of time is the one stored in the recvTime property. Set to 0 if not wanting this policy.
Collections Size 0 The oldest data (according to insertion time) will be removed if the size of the data collection gets bigger than the value specified in bytes. Notice that the size-based truncation policy takes precedence over the time-based one. Set to 0 if not wanting this policy. Minimum value (different than 0) is 4096 bytes.
Max Documents 0 The oldest data (according to insertion time) will be removed if the number of documents in the data collections goes beyond the specified value. Set to 0 if not wanting this policy.

Once you have your processors configured, you need to connect the source (Listen HTTP processor) and the sinks (NGSIToMySQL, NGSIToPostgreSQL, NGSIToMongo ). In the next section we will explain how to establish and configure this connection.