Data Pipelines

Introduction

The IoT Gateways tab provides an overview of all the incoming data streams to the 4PointX IAoT Platform. Each data stream is called a Pipeline and it can be ingesting various Type of Data (condition, process etc.) from a variety of Data Sources for a Plant.

The tab provides the following overview information.

  1. Total count of IoT Gateways and how many have Heartbeat

  2. Total count of Tags being ingested

  3. Total Samples collected and their trend

  4. A tabular list of all IoT Gateways along with their details. The following actions can be performed on an IoT Gateway.
    1. Edit

    2. Start/Stop

    3. Delete

    4. Reboot

    5. Factory Reset

Reboot and Factory Reset are available only if Data Source is Edge IoT Gateway.

Note

A Sample is a reading for one Tag collected at an interval set using the Sampling Rate

Use Create New Gateway button to create a new IoT Gateway.

The following Type of Data and the list of Data Sources and Source Sub Types are currently supported. The details of onboarding this data is discussed in the next section.

Type of Data

  1. Condition

  2. Process

  3. Energy

  4. Production

  5. Other

Data Sources and Source Sub Types

  1. Edge IoT Gateway
    1. Modbus RTU

    2. Modbus TCP

    3. OPC-UA

    4. Analog

    5. File

  2. Third-Party Apps
    1. Infinite Uptime

    2. CimCon Digital

  3. Cloud
    1. AWS S3

    2. Google Cloud Storage

    3. Azure Data Lake

Data Onboarding

Edge IoT Gateway

Data Source = Edge IoT Gateway

Please see this section for the initial configuration steps to be performed on Edge IoT Gateway before using the device for data onboarding.

Modbus RTU

Source Sub Type = Modbus RTU

OPC-UA

Source Sub Type = OPC-UA

OPC Server > Python (Producer) > Kafka > Python (Consumer) > Elasticsearch

The following systems/ software can be onboarded using OPC-UA:

  1. Historians (provided OPC-UA Server license is installed)

  2. KepServer :question_mark:

Modbus TCP

File

Source Sub Type = File

This section describes how to onboard data from CSVs and other flat files into the 4PointX IAoT Platform.

The typical data flow is:

File > Logstash (Producer) > Kafka > Python (Consumer) > Elasticsearch

The flat files will be present in the customer’s environment usually copied to a machine which we call Edge Server. We run the Logstash in Edge Server to tail these files and continuously ingest them into the 4PointX IAoT Workbench.

Steps to onboard data from File
  1. Prepare your data as per the template. Use these steps to test the transformed data for errors. Use the appropriate template. Do not change the order and name of columns in the template. If your data does not have information for a particular column, leave the rows blank but do keep the column header.
    1. For Process, Condition or Energy data: Use csv_template_process_condition_energy.csv

    2. For Production data: Use csv_template_production.csv

  2. Create a new Pipeline in Configuration > Data Pipelines tab
    1. Select the Type of Data, Site, Plant and Function

    2. Select Data Source as ‘Edge IoT Gateway’ and Source Sub Type as ‘File’

    3. Make note of the pipeline_id. We need it in step 3.iii below. We will be adding pipeline_id to All Pipelines table soon. For now, get it from Discover or Dev Tools

  3. Start the Logstash producer in the Edge Server
    1. Install Logstash if it is not installed already

    2. Use the appropriate .conf template and change the following settings inside the template
      1. path: absolute path to the folder where flat files prepared in step 1 are stored (e.g., /home/4px/plant/function/condition_data/*.csv)

      2. sincedb_path: absolute path to the file that receives the runtime logs (e.g., /home/4px/plant/function/condition_data/pipeline.log)

      3. topic_id: pipeline_id

      4. bootstrap_servers: the IP of kafka-broker

    3. Start the Logstash producer using the following shell command:
      sudo /usr/share/logstash/bin/logstash -r -f <path_to_.conf_file>
      
  4. Come back to Configuration > Data Pipelines tab and start the Pipeline that was created in step 2

  5. You can confirm whether the data is flowing by checking the Total Events and Last Event columns in All Pipelines table. You can also confirm in the Discover by filtering for the required pipeline_id.

Installing Logstash
CentOS
sudo yum install java
sudp rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
sudo tee /etc/yum.repos.d/elasticsearch.repo <<EOF
[elasticsearch-7.x]
name=Elasticsearch repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
EOF
sudo yum install --assumeyes logstash
Testing Transformed Data

Once you prepare the data in the required template format, use the following steps to make sure it doesn’t contain any gaps or errors.

  1. The columns should be arranged according to the data_type template attached below.

  2. the event_timestamp column should be in this format:
    %d/%m/%Y %H:%M
    
    d: day
    m: month
    Y: year
    H: hour
    M: minute
    
    Example: 20/05/2022 14:21
    

     

  3. Ensure that there are no NaN (null) values in event_timestamp, tag__name and device_identifier columns.

UNKNOWN_ATTACHMENT

Third-Party Apps

Infinite Uptime

CimCon Digital

 

Cloud

AWS S3

 

Google Cloud Storage

Azure Data Lake