In this example, we will use a Data Connector to integrate with Google Cloud by forwarding events to a Cloud Function.
When the data has been received by a Cloud Function, it can, in turn, feed the data into any of Google’s vast selection of databases, event buses, stream processing, or machine-learning tools, just to name a few.
Before you begin
You need to have completed the Google Cloud integration guide.
Setup
First, you need to create a project, a database, some tables in the database, and a Cloud Function.
Create a Project
- Log in to the Google Cloud Platform Console
- Open the dropdown menu in the top left corner and click Getting started
- Click the tile which says Create an empty project
- Give it the name “Disruptive Integration”
- Select the Billing Account (“My Billing Account” for free trials)
- Leave everything else as the default and click Create
Make sure you have the new Project selected by clicking on the drop-down to the left of the top search field. Locate and select the “Disruptive Integration” project.
Create a Database
- Open the dropdown menu in the top left corner and click SQL
- Click Create instance
- Choose PostgreSQL
- Fill in
- Instance ID: “event-database”
- Default user password - pick one and remember for later
- Click Create
- Wait for the database to deploy. After creation, you have a default user called “postgres”, with the password you chose. You also have an empty database called “postgres” inside the new PostgreSQL instance.
- When the database has been deployed, click on the name of the database to open the overview.
- On the overview, locate the Instance connection name (something like disruptive-integration:us-central1:test) and note this down as you will need it later.
Create tables
Next, we need to create the database tables where the events will be stored.
Data Connectors have an at-least-once guarantee. To prevent duplicate rows, we make the event_id unique.
- Start Cloud Shell by clicking the Cloud Shell icon to the right of the search bar.
If prompted, then click “START CLOUD SHELL”.
- At the Cloud Shell prompt, connect to your SQL instance using the following command:
gcloud sql connect event-database --user=postgres
- After a little while, you should get the prompt Connecting to database with SQL user [postgres].Password for user postgres: presented back to you. Enter the default PostgreSQL password you configured earlier
- You should now see the prompt postgres=>. For each group of line (starting with CREATE TABLE) in the “SQL queries” below, copy them one by one, paste them into the prompt and hit enter
- You should get a line back saying CREATE TABLE each time
SQL queries
CREATE TABLE Touch_events (
event_id char(20) NOT NULL UNIQUE,
device_id char(23) NOT NULL,
timestamp char(30) NOT NULL
);
CREATE TABLE Temperature_events (
event_id char(20) NOT NULL UNIQUE,
device_id char(23) NOT NULL,
timestamp char(30) NOT NULL,
temperature float NOT NULL
);
CREATE TABLE Prox_events (
event_id char(20) NOT NULL UNIQUE,
device_id char(23) NOT NULL,
timestamp char(30) NOT NULL,
state varchar(15) NOT NULL
);
You can now close the shell by clicking the X on the top right corner of the shell-window.
Change the Cloud Function code
Finally, we will modify the previously created Cloud Function (part of the “Before you begin”) to listen for sensor events and put this into the database.
There are 4 steps to what we need to do to receive the sensor data and save it into the database:
- Add a library for connecting to the database
- Get the event data from the request body, and make the query based on event type
- Connect to the database and execute a SQL command
- Send a response to the Data Connector to let it know the message has been safely received. If an error occurs, send an error code to the Data Connector to let it know to retry the message again later.
Before continuing, navigate to the Cloud Function’s code editor by:
- Opening the dropdown menu in the top left corner and go to Cloud Functions.
- Click on the name of the Cloud Function to open it.
- Press the Edit button.
1. Add library
We need to add a library called pg.
Open package.json. Edit it so it looks something like this.
{
"name": "sample-http",
"version": "0.0.1",
"dependencies": {
"pg": "7.4.3"
}
}
2. Get event data and make SQL query
Go back to index.js, remove the existing code and replace it with the following empty function:
/**
* Receive sensor events and save to PostgreSQL database
*
* @param {!express:Request} req HTTP request context.
* @param {!express:Response} res HTTP response context.
*/
exports.eventReceiver = (req, res) => {
};
Change the Function to execute, below the editor, to be eventReceiver.
Next, add this at the start of the eventReceiver function:
// Get event data
var eventId = req.body.event.eventId;
var deviceId = req.body.event.targetName.substring(38);
var timestamp = req.body.event.timestamp;
var eventType = req.body.event.eventType;
// Create the SQL query based on event type
switch (eventType) {
case "temperature":
var text = "INSERT INTO Temperature_events (event_id, device_id, timestamp, temperature) " +
"VALUES ($1, $2, $3, $4);";
var temperature = req.body.event.data.temperature.value;
var parameters = [eventId, deviceId, timestamp, temperature];
break;
case "touch":
var text = "INSERT INTO Touch_events (event_id, device_id, timestamp) " +
"VALUES ($1, $2, $3);";
var parameters = [eventId, deviceId, timestamp];
break;
case "objectPresent":
var text = "INSERT INTO Prox_events (event_id, device_id, timestamp, state) " +
"VALUES ($1, $2, $3, $4);";
var state = req.body.event.data.objectPresent.state;
var parameters = [eventId, deviceId, timestamp, state];
break;
default:
console.warn("Unsupported event type received, check Data Connector. EventType = " + eventType);
res.status(200).end(); // Return OK to prevent data connector from resending
return;
}
// Log for debugging and information
console.log("Update: " + deviceId + ", " + eventType + ", " + timestamp);
3. Connect to database and execute command
Add this code at the top (before the function)
const pg = require('pg');
// Connect to database, update with your values
const pool = new pg.Pool({
max: 1,
host: '/cloudsql/INSTANCE_CONNECTION_NAME',
user: 'postgres',
password: 'YOUR_PASSWORD',
database: 'postgres'
});
Replace INSTANCE_CONNECTION_NAME with the instance connection name of the PostgreSQL that you noted down earlier.
Replace YOUR_PASSWORD with the default database password you set earlier.
Then add this code at the end of the function
// Execute query with parameters
pool.query(text, parameters, (err, result) => {
if (err) {
console.error(err);
res.status(500).end(); // Query failed, send empty error status
} else {
res.status(200).end(); // Query succeeded, send empty 200 OK status
}
});
4. Send response to Data Connector
pool.query() is an asynchronous function, so we must send back a response at the end of the callback function. In the code above we call res.status(500).end() to send an error response, and res.status(200).end() to send an OK response.
Confirming that the integration works
Everything is now set up for the integration to work.
We will check that the integration is working in three ways:
- Check the metrics of the Data Connector
- Check the log of the Cloud Function
- Check the content of the SQL database
1. Data Connector metrics
Navigate to Studio again and locate the Data Connector created earlier.
If you open up the Data Connector itself, you will be able to see the Activity last 24h.
Here you can see that the Data Connector has successfully sent over sensor events to the Cloud Function, and the Cloud Function has replied 200 OK, 12 times in the last 24h. If the Cloud Function would not reply at all or reply something else than the HTTP status code 200 OK, then the Error count would increase.
2. Check Cloud Function logs
Google Cloud Platform has a log viewer that shows the log output and return status from the Cloud Function.
Open it by first going to the Cloud Function view, open the Cloud Function, and click View logs on the overview page.
Here you should be able to see entries like Function execution took 25 ms, finished with status code: 200, indicating a successful run Cloud Function, without any errors.
3. Check the SQL database content
To check that the database is storing values:
- Open the Google Cloud Shell again (like you did to create the tables)
- Run the following query to see all values stored in the temperature table
SELECT * FROM Temperature_events;
Provided that you have generated some temperature events (e.g. by pressing waiting 15min, pressing a temperature sensor or using a virtual sensor), the query should return something like the following:
postgres=> SELECT * FROM Temperature_events;
event_id | device_id | timestamp | temperature
----------------------+-------------------------+--------------------------------+-------------
beimle6gse4g00838qlg | be8odsegse4g0082e4vg | 2018-09-21T22:03:36.354987851Z | 0
beimle6gse4g00838qog | be8odsegse4g0082e4vg | 2018-09-21T22:03:36.917462155Z | 42
beimleegse4g00838qpg | be8odsegse4g0082e4vg | 2018-09-21T22:03:37.242580212Z | 42
beimh66gse4g00838q4g | be8odsegse4g0082e4vg | 2018-09-21T21:54:32.508543861Z | 34
beimpqugse4g00838rag | be8odsegse4g0082e4vg | 2018-09-21T22:12:59.580665268Z | 41
beimpr6gse4g00838rcg | be8odsegse4g0082e4vg | 2018-09-21T22:13:00.704640379Z | 10
beimpqugse4g00838r7g | be8odsegse4g0082e4vg | 2018-09-21T22:12:59.111525991Z | 22
beimprmgse4g00838rig | be8odsegse4g0082e4vg | 2018-09-21T22:13:02.834087266Z | 47
beimprugse4g00838rl0 | be8odsegse4g0082e4vg | 2018-09-21T22:13:03.875003723Z | 13
(9 rows)
The same can be done for Touch_events or Prox_events.
(optional) Add verification of Secret
Verifying the events sent by the Data Connector in the Cloud Function prevents other people from sending false sensor readings into your database. It is not required to use the Secret signature, the Data Connector will work fine without, but it is recommended.
For a full description of how verifying the secret works, see the Verify Secret part of the Data Connector article.
For step-by-step instruction on adding this to this example integration, see the steps below.
1. Add the secret to Data Connector
On the Data Connector configuration page, write a signature under Secret, for example, “Secret-signature”. Remember to click Update configuration.
2. Add library to Cloud Function
We must add the library jsonwebtoken to the dependencies of the Cloud Function.
Navigate to the Cloud Function, go to Edit and update the package.json to be:
{
"name": "sample-http",
"version": "0.0.1",
"dependencies": {
"pg": "7.4.3",
"jsonwebtoken": "8.3.0"
}
}
3. Add secret verification code to Cloud Function
Switch to the index.js file and first add requirements at the top:
const jwt = require('jsonwebtoken');
const crypto = require('crypto');
Next, add the following code to the very start of the function:
// Verify signature
try {
var token = req.get('x-dt-signature');
var jwtPayload = jwt.verify(token, "Secret-signature"); // Update to your secret
} catch(err) {
console.error(err);
res.sendStatus(401);
return;
}
// Verify checksum
var hash = crypto.createHash('sha1');
hash.update(req.rawBody);
if (jwtPayload.checksum != hash.digest('hex')) {
console.error('Invalid content');
res.sendStatus(401);
return;
}
The function will now reject all invocations with an invalid token or checksum.
To test that the Cloud Function rejects requests with the wrong signature, try changing or removing the secret in the Data Connector and watch the Error metric now increment, as the Cloud Function is rejecting the events.