Data Pipelines¶
Introduction¶
The IoT Gateways tab provides an overview of all the incoming data streams to the 4PointX IAoT Platform. Each data stream is called a Pipeline and it can be ingesting various Type of Data (condition, process etc.) from a variety of Data Sources for a Plant.

The tab provides the following overview information.
Total count of IoT Gateways and how many have Heartbeat
Total count of Tags being ingested
Total Samples collected and their trend
- A tabular list of all IoT Gateways along with their details. The following actions can be performed on an IoT Gateway.
Edit
Start/Stop
Delete
Reboot
Factory Reset
Reboot and Factory Reset are available only if Data Source is Edge IoT Gateway.
Note
A Sample is a reading for one Tag collected at an interval set using the Sampling Rate
Use Create New Gateway button to create a new IoT Gateway.

The following Type of Data and the list of Data Sources and Source Sub Types are currently supported. The details of onboarding this data is discussed in the next section.
Type of Data
Condition
Process
Energy
Production
Other
Data Sources and Source Sub Types
- Edge IoT Gateway
Modbus RTU
Modbus TCP
OPC-UA
Analog
File
- Third-Party Apps
Infinite Uptime
CimCon Digital
- Cloud
AWS S3
Google Cloud Storage
Azure Data Lake
Data Onboarding¶
Edge IoT Gateway¶
Data Source = Edge IoT Gateway
Please see this section for the initial configuration steps to be performed on Edge IoT Gateway before using the device for data onboarding.
Modbus RTU¶
Source Sub Type = Modbus RTU
OPC-UA¶
Source Sub Type = OPC-UA
OPC Server > Python (Producer) > Kafka > Python (Consumer) > Elasticsearch¶
The following systems/ software can be onboarded using OPC-UA:
Historians (provided OPC-UA Server license is installed)
KepServer :question_mark:
Modbus TCP¶
…
File¶
Source Sub Type = File
This section describes how to onboard data from CSVs and other flat files into the 4PointX IAoT Platform.
The typical data flow is:
File > Logstash (Producer) > Kafka > Python (Consumer) > Elasticsearch
The flat files will be present in the customer’s environment usually copied to a machine which we call Edge Server. We run the Logstash in Edge Server to tail these files and continuously ingest them into the 4PointX IAoT Workbench.
Steps to onboard data from File¶
- Prepare your data as per the template. Use these steps to test the transformed data for errors. Use the appropriate template. Do not change the order and name of columns in the template. If your data does not have information for a particular column, leave the rows blank but do keep the column header.
For Process, Condition or Energy data: Use csv_template_process_condition_energy.csv
For Production data: Use csv_template_production.csv
- Create a new Pipeline in Configuration > Data Pipelines tab
Select the Type of Data, Site, Plant and Function
Select Data Source as ‘Edge IoT Gateway’ and Source Sub Type as ‘File’
Make note of the pipeline_id. We need it in step 3.iii below. We will be adding pipeline_id to All Pipelines table soon. For now, get it from Discover or Dev Tools
- Start the Logstash producer in the Edge Server
Install Logstash if it is not installed already
- Use the appropriate .conf template and change the following settings inside the template
path: absolute path to the folder where flat files prepared in step 1 are stored (e.g., /home/4px/plant/function/condition_data/*.csv)
sincedb_path: absolute path to the file that receives the runtime logs (e.g., /home/4px/plant/function/condition_data/pipeline.log)
topic_id: pipeline_id
bootstrap_servers: the IP of kafka-broker
- Start the Logstash producer using the following shell command:
sudo /usr/share/logstash/bin/logstash -r -f <path_to_.conf_file>
Come back to Configuration > Data Pipelines tab and start the Pipeline that was created in step 2
You can confirm whether the data is flowing by checking the Total Events and Last Event columns in All Pipelines table. You can also confirm in the Discover by filtering for the required pipeline_id.
Installing Logstash¶
CentOS¶
sudo yum install java
sudp rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
sudo tee /etc/yum.repos.d/elasticsearch.repo <<EOF
[elasticsearch-7.x]
name=Elasticsearch repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
sudo yum install --assumeyes logstash
Testing Transformed Data¶
Once you prepare the data in the required template format, use the following steps to make sure it doesn’t contain any gaps or errors.
The columns should be arranged according to the data_type template attached below.
- the event_timestamp column should be in this format:
%d/%m/%Y %H:%M d: day m: month Y: year H: hour M: minute Example: 20/05/2022 14:21
Ensure that there are no NaN (null) values in event_timestamp, tag__name and device_identifier columns.
Third-Party Apps¶
Infinite Uptime¶
…
CimCon Digital¶
…
Cloud¶
AWS S3¶
…
Google Cloud Storage¶
…
Azure Data Lake¶
…