2.6. NodeJS Application, Part 2: Sending Sensor Data to Cloud Storage

2.6.1. Introduction

This is the second part of a series of tutorials based upon an example Node project, written in JavaScript. This part of the tutorial details how to set up Amazon Kinesis Firehose to send sensor data to an Amazon S3 bucket.

This tutorial will build on the code developed in NodeJS Application, Part 1: Reading Device Sensor Data from a Gateway.

2.6.2. Setting up a strategy pattern

We will be using AWS as our example cloud provider, but other tutorials will cover using other providers. To begin, let’s set up a strategy pattern to make the cloud provider a modular piece of the application.

  1. Create a directory to hold our cloud providers:

    root@080030717-00055:~/nodejs-sensor-cloud# mkdir providers
    
  2. Create the file providers/providerStrategy.js using the following code. This will provide the interface for each concrete strategy to inherit.

    'use strict';
    
    const os = require('os');
    
    /**
     * Defines the public interface for all Cloud Provider strategies
     */
    class ProviderStrategy {
      constructor(config) {
        this.config = config;
        this.hostname = os.hostname();
        this.osName = os.type();
        this.NotYetImplementedError = new Error('Not yet implemented');
      }
    
      /**
       * Sends information about this gateway to the Cloud Provider
       * to get it connected. Should return the Gateway's authentication
       *  credentials.
       */
      connectGateway() {
        return Promise.reject(this.NotYetImplementedError);
      }
    
      /**
       * Inform the Cloud Provider that the Gateway is alive and well
       */
      sendHeartbeat() {
        return Promise.reject(this.NotYetImplementedError);
      }
    
      /**
       * Send sensor telemetry data to the Cloud Provider
       */
      sendTelemetry(json) {
        return Promise.reject(this.NotYetImplementedError);
      }
    
      /**
       * Inform the Cloud Provider about a new sensor device that
       * this Gateway is connected to
       */
      connectEndDevice(device) {
        return Promise.reject(this.NotYetImplementedError);
      }
    }
    
    module.exports = ProviderStrategy;
    
  3. Create the file providers/aws.js using the follwoing code. This will implement the Provider Strategy methods to work specifically with AWS. For now, it’s just a stub because it’s easier to test that the strategy pattern works if it’s not too complex.

    'use strict';
    
    const ProviderStrategy = require('./providerStrategy.js');
    
    class AWSProvider extends ProviderStrategy {
      constructor(config) {
        super(config);
        this.name = 'AWS';
      }
    
      connectGateway() {
        console.log('[AWS] connectGateway');
      }
    
      sendTelemetry(telemetry) {
        console.log(`[AWS] sendTelemetry ${JSON.stringify(telemetry)}`);
      }
    }
    
    module.exports = AWSProvider;
    
  4. Create the file providers/provider.js using the following code. This will provide the connection between the concrete Provider (aws.js) and the main script (index.js). The goal is for the main script to have no knowledge of how a given Provider accomplishes its tasks.

    'use strict';
    
    const AWSProvider = require('./aws.js');
    
    /**
     * A Cloud Provider object for a calling script to refer to.
     */
    class Provider {
      /**
       * Takes in a full config object (see config.json.sample).
       * Constructs a Provider based on the "cloudProvider" key in config.
       * Sets the Provider's concrete strategy if valid.
       */
      constructor(config) {
        if (typeof config.cloudProvider === 'undefined' ||
            Object.keys(config.cloudProvider).length === 0) {
          throw new Error('No cloudProvider in config');
        }
        const provider = Object.keys(config.cloudProvider)[0];
        const providerConfig = JSON.stringify(config.cloudProvider[provider], null, 4);
        console.log(`Provider: ${provider}`);
        console.log(`Config: ${providerConfig}`);
        switch (provider) {
          case 'aws':
            this.strategy = new AWSProvider(config.cloudProvider[provider]);
            break;
          default:
            throw new Error(`Unrecognized cloud provider '${provider}' in config.`);
        }
      }
    
      connectGateway() {
        return this.strategy.connectGateway();
      }
      sendHeartbeat() {
        return this.strategy.sendHeartbeat();
      }
      sendTelemetry(telemetry) {
        return this.strategy.sendTelemetry(telemetry);
      }
      connectEndDevice(device) {
        return this.strategy.connectEndDevice(device);
      }
    }
    
    module.exports = Provider;
    
  5. This application knows to use the AWS cloud provider because of a value specified in a configuration file. Create a new file config.json and give it the contents of config.json.sample:

    {
      "cloudProvider": {
        "aws": {
          "accessKeyId": "",
          "secretAccessKey": "",
          "region": "",
          "deliveryStream": ""
        }
      }
    }
    
  6. Modify index.js to use the config and provider. At the top of the script, define new constants:

    'use strict';
    
    const noble = require('noble');
    
    const Device = require('./device');
    const rigadoSensorBeacon = require('./rigado-sensor-beacon');
    const CloudProvider = require('./providers/provider');
    const config = require('./config.json');
    
    /**
     * Connected devices
     * @global
     */
    const devices = {};
    
    /**
     * Cloud Provider access
     * @global
     */
    const provider = new CloudProvider(config);
    
    function startScanning() {
    ...
    
  7. Finally, we need to update each device to send its telemetry through the cloud provider. In index.js, modify the onDiscover method to match the following:

    ...
    device.setUp()
      .then(() => {
        device.onLuxometerChange((d, lux) => {
          const telemetry = {
            deviceID: d.id,
            light: lux.toFixed(1),
          };
          provider.sendTelemetry(telemetry);
        });
    
        // Wait for as long as possible before scanning again to avoid race
        // conditions
        startScanning();
        ...
    
  8. Test the application on the Gateway. Upload the application files if necessary, install the new dependencies, and run the application:

    root@080030717-00055:~/nodejs-sensor-cloud# npm install
    ...
    root@080030717-00055:~/nodejs-sensor-cloud# node index.js
    Provider: aws
    Config: {
        "awsAccessKeyId": "",
        "awsSecretAccessKey": "",
        "region": ""
    }
    Scanning started
    Discovered: f8287ceb48b1
    Scanning stopped
    f8287ceb48b1: readFirmwareRevision
    f8287ceb48b1: firmware revision = 1.0.4 (5)
    f8287ceb48b1: enableLuxometer
    f8287ceb48b1: set up onluxchange
    f8287ceb48b1: notifyLuxometer
    Scanning started
    [AWS] sendTelemetry {"deviceID":"f8287ceb48b1","light":"151.0"}
    [AWS] sendTelemetry {"deviceID":"f8287ceb48b1","light":"150.0"}
    [AWS] sendTelemetry {"deviceID":"f8287ceb48b1","light":"150.0"}
    [AWS] sendTelemetry {"deviceID":"f8287ceb48b1","light":"150.0"}
    ...
    

2.6.3. Setting up an Amazon S3 bucket to receive data

The data will ultimately be sent to Amazon S3, so the next step is to set up the bucket, or location, for that data.

  1. Start by logging into your AWS console.

  2. Verify the region that you’re interacting with. Use the drop-down menu in the top right corner to select your region. The example presented here will use Oregon (us-west-2).

  3. Select the S3 service.

  4. Click Create Bucket to create a new bucket.

  5. Give the bucket an applicable name, such as sensorbeacon-luxometer.

    Note

    Bucket names need to be unique across all of Amazon S3, even between accounts. It may be useful to append your account ID to the names of any buckets you create. For more information on Amazon’s account identifiers, see their documentation.

  6. Verify the region the bucket should be in.

  7. Click Next.

  8. This tutorial will not utilize Versioning, Logging or Tags, so click Next again.

  9. In later steps we will further configure the permissions, so, for now, ignore the permissions here and click Next again.

  10. Click Create Bucket.

2.6.4. Setting up an Amazon Kinesis Firehose Delivery Stream

The Gateway will send its data to an Amazon Kinesis Firehose Delivery Stream, which will buffer it and pass it along to Amazon S3.

  1. Open the Services menu at the top left, and select Kinesis.
  2. Click Go to the Firehose console to enter Kinesis Firehose.
  3. Click Create Delivery Stream.
  4. Select Amazon S3 as the destination.
  5. Give the stream a name, such as sensorbeacon-luxometer.
  6. Select your new bucket. You may also add an applicable prefix to further classify inside the bucket. This would be useful, for instance, if you had multiple Delivery Streams directing different kinds of data into the same bucket.
  7. Click Next.
  8. On the Configuration page, you may want to configure the S3 buffer settings a little. Applications that require more “realtime” response to sensor data should reduce buffer size and interval to the minimum.
  9. At the bottom of the configuration form, select the IAM Role Firehose delivery IAM role. This will open a dialog to create or update a policy that has the proper permissions. Select Create a new Role Policy and click Allow.
  10. Back in the Delivery Stream configuration menu, click Next.
  11. Finally, click Create Delivery Stream.

2.6.5. Setting up write permissions for the Firehose

Our application could use the administrator’s credentials to interact with AWS, but this is discouraged. In this step we’ll configure a policy, group, and user the Gateway can use to write data to the Firehose.

  1. Open the Service menu at the top left, and select IAM.

  2. Select the Users page from the navigation menu on the left.

  3. Click Add User at the top.

  4. Give the user an applicable name. In this case, we are going to only operate this user account from one Gateway, so we will name the user based on that: gateway55.

  5. Enable Programmatic access.

  6. Click Next: Permissions.

  7. We are going to provide this user the permissions they need through a Group, so make sure Add user to group is selected, then click Create group.

  8. Give the group an applicable name, such as sensor-gateways.

  9. We are going to set a custom policy, so click Create Policy.

  10. Select the Policy Generator.

  11. Fill out the form with the following values:

    • AWS Service: Amazon Kinesis Firehose

    • Actions: DescribeDeliveryStream, PutRecord, and PutRecordBatch

    • Amazon Resource Name: enter the ARN for your new Delivery Stream. It will follow the format

      arn:aws:firehose:REGION:ACCOUNT-ID:deliverystream/DELIVERY-STREAM-NAME
      

      so, in the example case

      arn:aws:firehose:us-west-2:123456789012:deliverystream/sensorbeacon-luxometer
      

      Note

      See AWS Account Identifiers for help finding your Account ID.

  12. Click Add Statement

  13. The form will reset, and a list of the policy’s statements will appear below it. Fill out the form again to add a second statement:

    • AWS Service: Amazon Kinesis Firehose
    • Actions: ListDeliveryStreams
    • Amazon Resource Name: * (asterisk)
  14. Click Add Statement and see the new statement populate in the list.

  15. Click Next Step.

  16. Give the policy an applicable name, such as gateway-firehose-write.

  17. Click Create Policy.

  18. Go back to the page where you were creating a new group, and click Refresh.

  19. Filter the policy list by selecting Customer managed, or enter your policy name in the search box.

  20. Select the checkbox of your new policy.

  21. Click Create group.

  22. The new group should automatically show up in the groups list of the user permissions page. If it does not, click Refresh.

  23. Make sure the checkbox for your new group is checked, and click Next: Review

  24. Verify that the user has programmatic access and is a member of your new group.

  25. Click Create user.

  26. Click Show to show the new user’s secret access key.

  27. Copy the user’s secret access key and access key ID into config.json. You may also want to store them somewhere safe that isn’t part of this example application. You will not get another chance to see this secret access key from AWS.

  28. Update your config.json with your region and the delivery stream name.

    {
      "cloudProvider": {
        "aws": {
          "accessKeyId": "REDACTED",
          "secretAccessKey": "REDACTED",
          "region": "us-west-2",
          "deliveryStream": "sensorbeacon-luxometer"
        }
      }
    }
    
  29. Finally, check the AWS IAM settings.

    • Under IAM Policies, the newly created policy should be attached to the new group and the new role.
    • Under IAM Users, the new user should be a member of the new group.

2.6.6. Updating the application to send data

For our node application to communicate to Amazon Kinesis Firehose, we will use a third-party dependency.

  1. Install the npm packages firehoser:

    root@080030717-00055:~/nodejs-sensor-cloud# npm install --save firehoser
    
  2. Modify providers/aws.js to utilize firehoser:

    'use strict';
    
    const AWS = require('aws-sdk');
    const firehoser = require('firehoser');
    
    const ProviderStrategy = require('./providerStrategy.js');
    
    class AWSProvider extends ProviderStrategy {
      constructor(config) {
        super(config);
        this.name = 'AWS';
    
        AWS.config.update({
          accessKeyId: config.accessKeyId,
          secretAccessKey: config.secretAccessKey,
          region: config.region,
        });
    
        this.firehose = new firehoser.JSONDeliveryStream(config.deliveryStream)
      }
    
      connectGateway() {
        console.log('[AWS] connectGateway');
      }
    
      sendTelemetry(telemetry) {
        telemetry.date = Date.now();
        this.firehose.putRecord(telemetry)
          .then(() => {
             console.log('[AWS] sent telemetry');
          })
          .catch((err) => {
             console.log(`[AWS] error sending telemetry: ${JSON.stringify(err)}`);
          });
      }
    }
    
    module.exports = AWSProvider;
    
  3. To include timestamp information with each telemetry entry, another third-party package is required. Install dateformat:

    root@080030717-00055:~/nodejs-sensor-cloud# npm install --save dateformat
    
  4. In providers/aws.js, update the sendTelemetry method with the following:

    const firehoser = require('firehoser');
    const dateFormat = require('dateformat');
    
    const ProviderStrategy = require('./providerStrategy.js');
    
    sendTelemetry(telemetry) {
      const now = new Date();
      telemetry.timestamp = dateFormat(now, "yyyy-mm-dd hh:mm:ss.l");
      this.firehose.putRecord(telemetry)
      ...
    

    This will send a date in a format that Amazon is able to read.

  5. Copy the files over to the Gateway (if applicable) and run the application:

    root@080030717-00055:~/nodejs-sensor-cloud# node index.js
    Provider: aws
    Config: {
        "accessKeyId": "REDACTED",
        "secretAccessKey": "REDACTED",
        "region": "us-west-2",
        "deliveryStream": "sensorbeacon-luxometer"
    }
    Scanning started
    Discovered: f8287ceb48b1
    Scanning stopped
    f8287ceb48b1: readFirmwareRevision
    f8287ceb48b1: firmware revision = 1.0.4 (5)
    f8287ceb48b1: enableLuxometer
    f8287ceb48b1: set up onluxchange
    f8287ceb48b1: notifyLuxometer
    Scanning started
    [AWS] sent telemetry
    [AWS] sent telemetry
    [AWS] sent telemetry
    ...
    

A few minutes later, you should see the record show up in the S3 bucket!

Note

If you find that you hit an API limit on number of requests, you can try grouping the telemetry data. In providers/aws.js, replace the JSONDeliveryStream with QueuableJSONDeliveryStream, like so:

this.firehose = new firehoser.QueuableJSONDeliveryStream(config.deliveryStream, maxDelay, maxQueued)

where maxDelay is the maximum number of milliseconds to wait, and maxQueued is the maximum number of records to wait for. As soon as one of those limits has been reached the firehoser will send the data to Amazon Kinesis Firehose.

Note

The final code for this tutorial is hosted on GitLab.