In this tutorial, you will create a sandbox environment in which you can further explore the architectural components of a specific Azure IoT architecture aimed at teaching AI through digital twin simulation. You will be completing the following tasks:
Before you begin, you may want to clone this entire repository to your local machine, to make some of the steps below more straightforward. This is a collaborative tutorial and sample. Please use the Issues feature of Github to notify others and get help if you run into issues with the instructions or and of the code samples. Issues can be addressed by anyone on the team. Please feel free to submit improvements to the repository, but please document any changes.
To create your own sandbox environment, you will execute various commands in the Azure Cloud Shell. To access the Azure Cloud Shell, follow these steps:
To create a resource group to hold all of your sandbox resources, follow these steps:
industryx-sandbox-<<your-initials-or-name>> #this is your sandbox name. Please copy it as you will be using it frequently
az group create --name <<your-sandbox-name>> --location japaneast
To create the IoT Hub instance follow these steps:
az deployment group create --resource-group <<your-sandbox-name>> --template-uri https://raw.githubusercontent.com/lowndesc/industryx/main/sandbox/IoTHub/azuredeploy.json
For our sandbox, we will first create code-based 3 simulated devices, each transmitting the same range of telemetry on a schedule, using randomisation to vary the values transmitted and the scheduling.
To create the simulated devices, follow these steps:
https://raw.githubusercontent.com/lowndesc/industryx/main/sandbox/SimulatedDevices/SimulatorCloudRunner.ps1
./SimulatorCloudRunner.ps1 -ResourceGroup <<your-sandbox-name>> -IotHubConnectionString <<your-iothub-connection-string>>
To connect the simulated devices to the IoT Hub, follow these steps:
To run the simulated devices, follow these steps:
For our sandbox environment, we will be implementing digital twins using Microsoft's Azure Digital Twins (ADT). ADT is based on a spatial graph model as opposed to traditional relational database technology. In relational databases, to analyse relationships across different table entities, the time expensive ‘JOIN’ operation is used to combine related data. This operation is expensive as it requires index lookups and matching to related columns in other tables. This is a major advantage of graph data models. Graphs store entities and their relationships as nodes and content which may be augmented with additional attributes. Retrieving the relationship between two entities does not involve expensive ‘join’ operations.
Within our sandbox, we therefore require a single Azure Digital Twins (ADT) instance. Each instance can support many different digital twins based on many different models. For simplicity, it is best practice to use a separate ADT instance for each solution domain. ADT hosts the definitions of digital twins, based on underlying models, as well as the data describing the digital twins, including twin instances, components, relationships and properties. It does not, however, store underlying property and telemetry values, which are accessed from underlying storage. This makes ADT a highly performant abstraction of the digital twins it hosts.
ADT uses models to define digital twin types. Modelling is defined in a JSON-LD-based descriptive language called DTDL (Digital Twins Definition Language). DTDL models are interchangeble between device twins used in IoT Plug-and-Play scenarios and digital twins of those devices within Azure Digital Twins. Each DTDL model is an interface which defines the permissible structure of a digital twin. DTDL supports inheritance, such that one or more interfaces can be a base for other derived interfaces. A model is a generally a definition which can be instantiated as a digital twin. Howevere, a model can also be a component, used to compose other models, but not intended for instantiation by itself. An example would be a smartphone device, defined as containing components defining a front camera and a rear camera.
For manufaturing scenarios, we need a set of base models which can form the foundation for describing manufacturing assets and processes. These base models need to be extendible to problem-specific scenrios, such as production monitoring, optimization and simulation, as well as wider scenarios such as materials handling and supply chain modelling. These base models need to follow industry-specific standards, such as ISA-95 and ISA-88, and be gathered into an overall model known as a manufacturing ontology.
The starting point for our ontology is this basic data model as defined by the Industrial Automation standard ISA-95:
From this, we have derived a basic manufacturing ontology as captured in this diagram. We have incuded a few example Azure IoT Plug-and-PLay devices into our ontology, to illustrate how these interact with the manufacturing assets and processes. We are to use this ontology within our sandbox environments:
Take a moment to examine the manufacturing ontology in its raw JSON DTDL form at this location in this repository:
./sandbox/AzureDigitalTwins/models/manufacturing-ontology This entire folder/file structure is in the clone on your local drive for later use
Creating The ADT Instance
To create your ADT instance within your sandbox, execute the following steps:
- Execute the following command in the Azure Cloud Shell:
az deployment group create --resource-group <<your-sandbox-name>> --template-uri https://raw.githubusercontent.com/lowndesc/industryx/main/sandbox/AzureDigitalTwins/azuredeploy.json
- When execution has finished, navigate to your sandbox ADT instance, on the Overview pain, copy the host name URL. You will need this later. In order to create permissions for you to access your ADT instance, you need to add yourself as an owner of this instance.
- On the left-hand menu, select 'Access Control (IAM)'
- On the 'Access Control (IAM)' pane, click +Add, and then 'Add role sssignment'
- In the 'Add role assignment' panel, in the Role dropdown, select the 'Azure Digital Twins Data Owner' role
- In the 'Select' field, type the start of your name, then select your user account from the list to add your user account to the 'Selected members' list
- Click Save to create the role assignment
- To verify that you can now access your ADT instance, navigate to the online ADT explorer.
- In the pop-up that asks for an Azure Digital Twins URL, type 'https://<<your host name URL from step 2 above>>' and click Save.
- On the top bar, click the cog icon to go to Settings. Set the Console and Output toggles to on.
- Now, when you click 'Run Query' in the top right, you should see a message in the centre pain that states 'No Results Found'.
Uploading Your Models
Execute the following steps to upload the manufacturing ontology into the ADT model library:
- At the top of the left-hand Models panel, click the 'Upload a directory of Models' button:
- Select the folder at the head of your local copy of the manufacturing ontology 'manufacturing-ontology'
- Click Upload, and then OK when the system reminds you that you are uploading 40 models.
- When these have uploaded, on the centre pane, click the 'MODEL GRAPH' tab.
- You should see the entire manufacturing ontology with relationships and inheritance depicted.
Creating An Asset Twin
We will now create sample digital twins of an entire manufacturing operation.
Execute the following steps to create the sample digital twins:
./sandbox/AzureDigitalTwins/logistics-twin.xlsx
Now that we have created our digital twins of assets and spaces, we can add digital twins of the processes we wish to model. For our sandbox, we will model an end-to-end supply chain process, incorporating the manufacturing assets and spaces we have previously modelled. To create digital twins of an end-to-end supply chain, execute the following steps:
./sandbox/AzureDigitalTwins/logistics-supply-chain-twin.xlsx
SELECT * FROM DIGITALTWINS WHERE IS_OF_MODEL('dtmi:isa95:core:SupplyChain;1')
To update a twin once it is created, you can either use the tools within the Azure Digital Twins explorer or prefereably refer to this article about using the ADT API Here, we will explain how to use the tools in Azure Digital Twins Explorer to make a simple change to an existing twin. Execute the following steps:
SELECT * FROM DIGITALTWINS WHERE IS_OF_MODEL('dtmi:isa95:core:Asset;1')
In order to synchronize data between features within our sandbox, we require some light Azure Functions. These we have grouped under a concept called TwinSync, a general principle to use Azure Functions where possible to execute data transformation code outside of any user context. These functions are descrete, low cost, highly scalable and reliable execution actions. The trigger is normally a standard Azure event occuring, such as a message arriving on an Event Grid topic or Event Hub.
We will use TwinSync functions to achieve the following:
For example, we have the following Function code available in a TwinSync Function named IoTHubtoTwins:
using System;
using Azure;
using System.Net.Http;
using Azure.Core.Pipeline;
using Azure.DigitalTwins.Core;
using Azure.Identity;
using Microsoft.Azure.EventGrid.Models;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.EventGrid;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace Avanade.Japan.Device.Simulation.Sync
{
public class IoTHubtoTwins
{
private static readonly string adtInstanceUrl = Environment.GetEnvironmentVariable("ADT_SERVICE_URL");
private static readonly HttpClient httpClient = new HttpClient();
[FunctionName("IoTHubtoTwins")]
public async void Run([EventGridTrigger] EventGridEvent eventGridEvent, ILogger log)
{
if (adtInstanceUrl == null) log.LogError("Application setting \"ADT_SERVICE_URL\" not set");
try
{
// Authenticate with Digital Twins
var cred = new ManagedIdentityCredential("https://digitaltwins.azure.net");
var client = new DigitalTwinsClient(
new Uri(adtInstanceUrl),
cred,
new DigitalTwinsClientOptions { Transport = new HttpClientTransport(httpClient) });
log.LogInformation($"ADT service client connection created.");
if (eventGridEvent != null && eventGridEvent.Data != null)
{
log.LogInformation(eventGridEvent.Data.ToString());
// <Find_device_ID_and_values>
JObject deviceMessage = (JObject)JsonConvert.DeserializeObject(eventGridEvent.Data.ToString());
string deviceId = (string)deviceMessage["systemProperties"]["iothub-connection-device-id"];
var temperature = deviceMessage["body"]["temperature"];
var humidity = deviceMessage["body"]["humidity"];
var power = deviceMessage["body"]["power"];
var vibration = deviceMessage["body"]["vibration"];
var uptime = deviceMessage["body"]["uptime"];
var capacity = deviceMessage["body"]["capacity"];
var wait = deviceMessage["body"]["wait"];
var delay = deviceMessage["body"]["delay"];
var arrivalRate = deviceMessage["body"]["arrivalRate"];
var counter = deviceMessage["body"]["Counter"].Value<int>(); // counter must be int
// </Find_device_ID_and_values>
// Correct integer telemetry values by dividing by 100
Random rnd = new Random();
int randomFactor = rnd.Next(3, 18); // a random integer from 3 to 17
var seed = (counter % 99) % randomFactor; // creates a seed value using the random factor, more frequent occurences at lower end
var t = temperature.Value<double>() / 100;
var h = humidity.Value<double>() / 100;
var p = power.Value<double>() / 100;
var v = vibration.Value<double>() / 100;
var u = uptime.Value<double>() / 100;
var c = (int)Math.Round(capacity.Value<double>() / 100); // round capacity to the nearest integer, even integer if midpoint
var w = wait.Value<double>() / 100;
var d = delay.Value<double>() / 100;
var ar = (17*Math.Sin(arrivalRate.Value<double>() / 100) + seed) / 15; // creates a more realistic arrival rate curve
log.LogInformation($"Device:{deviceId} Counter: [{counter}]");
log.LogInformation($"Device:{deviceId} Temperature: {t}");
log.LogInformation($"Device:{deviceId} Humidity: {h}");
log.LogInformation($"Device:{deviceId} Power: {p}");
log.LogInformation($"Device:{deviceId} Vibration: {v}");
log.LogInformation($"Device:{deviceId} Uptime: {u}");
log.LogInformation($"Device:{deviceId} Capacity: {c}");
log.LogInformation($"Device:{deviceId} Wait: {w}");
log.LogInformation($"Device:{deviceId} Delay: {d}");
log.LogInformation($"Device:{deviceId} ArrivalRate: {ar}");
// <Update_twin_with_device_values>
var updateTwinData = new JsonPatchDocument();
updateTwinData.AppendReplace("/Temperature", t);
updateTwinData.AppendReplace("/Humidity", h);
updateTwinData.AppendReplace("/Power", p);
updateTwinData.AppendReplace("/Vibration", v);
updateTwinData.AppendReplace("/Uptime", u);
updateTwinData.AppendReplace("/Capacity", c);
updateTwinData.AppendReplace("/Wait", w);
updateTwinData.AppendReplace("/Delay", d);
updateTwinData.AppendReplace("/ArrivalRate", ar);
await client.UpdateDigitalTwinAsync(deviceId, updateTwinData);
await client.PublishTelemetryAsync(deviceId,null, new JObject
{
{ "Temperature", t },
{ "Humidity", h },
{ "Power", p },
{ "Vibration", v },
{ "Uptime", u },
{ "Capacity", c },
{ "Wait", w },
{ "Delay", d },
{ "ArrivalRate", ar }
}.ToString());
// </Update_twin_with_device_values>
}
}
catch (Exception ex)
{
log.LogError($"Error in ingest function: {ex.Message} | {ex.InnerException} | {ex.StackTrace}");
}
}
}
}
Once the events are flowing to our function app, they should be flowing into our ADT twins. To verify the end-to-end flow, execute the following steps:
using System;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using Azure;
using Azure.Core.Pipeline;
using Azure.DigitalTwins.Core;
using Azure.Identity;
using Microsoft.AspNetCore.Mvc;
using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.Devices.Shared;
using Microsoft.Azure.Devices.Provisioning.Service;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
namespace Samples.AdtIothub { public static class DpsAdtAllocationFunc { private const string adtAppId = "https://digitaltwins.azure.net"; private static string adtInstanceUrl = Environment.GetEnvironmentVariable("ADT_SERVICE_URL"); private static readonly HttpClient singletonHttpClientInstance = new HttpClient();
[FunctionName("DpsAdtAllocationFunc")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req, ILogger log)
{
// Get request body
string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
log.LogDebug($"Request.Body: {requestBody}");
dynamic data = JsonConvert.DeserializeObject(requestBody);
// Get registration ID of the device
string regId = data?.deviceRuntimeContext?.registrationId;
bool fail = false;
string message = "Uncaught error";
var response = new ResponseObj();
// Must have unique registration ID on DPS request
if (regId == null)
{
message = "Registration ID not provided for the device.";
log.LogInformation("Registration ID: NULL");
fail = true;
}
else
{
string[] hubs = data?.linkedHubs.ToObject<string[]>();
// Must have hubs selected on the enrollment
if (hubs == null
|| hubs.Length < 1)
{
message = "No hub group defined for the enrollment.";
log.LogInformation("linkedHubs: NULL");
fail = true;
}
else
{
// Find or create twin based on the provided registration ID and model ID
dynamic payloadContext = data?.deviceRuntimeContext?.payload;
string dtmi = payloadContext.modelId;
log.LogDebug($"payload.modelId: {dtmi}");
string dtId = await FindOrCreateTwinAsync(dtmi, regId, log);
// Get first linked hub (TODO: select one of the linked hubs based on policy)
response.iotHubHostName = hubs[0];
// Specify the initial tags for the device.
var tags = new TwinCollection();
tags["dtmi"] = dtmi;
tags["dtId"] = dtId;
// Specify the initial desired properties for the device.
var properties = new TwinCollection();
// Add the initial twin state to the response.
var twinState = new TwinState(tags, properties);
response.initialTwin = twinState;
}
}
log.LogDebug("Response: " + ((response.iotHubHostName != null)? JsonConvert.SerializeObject(response) : message));
return fail
? new BadRequestObjectResult(message)
: (ActionResult)new OkObjectResult(response);
}
public static async Task<string> FindOrCreateTwinAsync(string dtmi, string regId, ILogger log)
{
// Create Digital Twins client
var cred = new ManagedIdentityCredential(adtAppId);
var client = new DigitalTwinsClient(
new Uri(adtInstanceUrl),
cred,
new DigitalTwinsClientOptions
{
Transport = new HttpClientTransport(singletonHttpClientInstance)
});
// Find existing DigitalTwin with registration ID
try
{
// Get DigitalTwin with Id 'regId'
BasicDigitalTwin existingDt = await client.GetDigitalTwinAsync<BasicDigitalTwin>(regId).ConfigureAwait(false);
// Check to make sure it is of the correct model type
if (StringComparer.OrdinalIgnoreCase.Equals(dtmi, existingDt.Metadata.ModelId))
{
log.LogInformation($"DigitalTwin {existingDt.Id} already exists");
return existingDt.Id;
}
// Found DigitalTwin but it is not of the correct model type
log.LogInformation($"Found DigitalTwin {existingDt.Id} but it is not of model {dtmi}");
}
catch(RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.NotFound)
{
log.LogDebug($"Did not find DigitalTwin {regId}");
}
// Either the DigitalTwin was not found, or we found it but it is of a different model type
// Create or replace it with what it needs to be, meaning if it was not found a brand new DigitalTwin will be created
// and if it was of a different model, it will replace that existing DigitalTwin
// If it was intended to only create the DigitalTwin if there is no matching DigitalTwin with the same Id,
// ETag.All could have been used as the ifNonMatch parameter to the CreateOrReplaceDigitalTwinAsync method call.
// Read more in the CreateOrReplaceDigitalTwinAsync documentation here:
// https://docs.microsoft.com/en-us/dotnet/api/azure.digitaltwins.core.digitaltwinsclient.createorreplacedigitaltwinasync?view=azure-dotnet
BasicDigitalTwin dt = await client.CreateOrReplaceDigitalTwinAsync(
regId,
new BasicDigitalTwin
{
Metadata = { ModelId = dtmi },
Contents =
{
{ "Temperature", 0.0 },
{
"deviceInformation",
new BasicDigitalTwinComponent
{
Metadata = {},
Contents =
{
{ "manufacturer", "MXCHIP" }
}
}
}
}
}
).ConfigureAwait(false);
log.LogInformation($"Digital Twin {dt.Id} created.");
return dt.Id;
}
}
/// <summary>
/// Expected function result format
/// </summary>
public class ResponseObj
{
public string iotHubHostName { get; set; }
public TwinState initialTwin { get; set; }
}
}
##### PnP Telemetry to Twin Function #####
```cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
using Azure;
using Azure.Core.Pipeline;
using Azure.DigitalTwins.Core;
using Azure.Identity;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace Samples.AdtIothub
{
public static class DeviceTelemetryToTwinFunc
{
private static string adtAppId = "https://digitaltwins.azure.net";
private static readonly string adtInstanceUrl = Environment.GetEnvironmentVariable("ADT_SERVICE_URL", EnvironmentVariableTarget.Process);
private static readonly HttpClient singletonHttpClientInstance = new HttpClient();
[FunctionName("DeviceTelemetryToTwinFunc")]
public static async Task Run(
[EventHubTrigger("deviceevents", Connection = "EVENTHUB_CONNECTIONSTRING")] EventData eventData, ILogger log)
{
log.LogInformation($"C# function triggered to process a message: {eventData}");
log.LogInformation($"C# function triggered to process a message: {Encoding.UTF8.GetString(eventData.Body)}");
// Metadata accessed by binding to EventData
log.LogInformation($"EnqueuedTimeUtc={eventData.SystemProperties.EnqueuedTimeUtc}");
log.LogInformation($"SequenceNumber={eventData.SystemProperties.SequenceNumber}");
log.LogInformation($"Offset={eventData.SystemProperties.Offset}");
//var exceptions = new List<Exception>(events.Length);
// Create Digital Twin client
var cred = new ManagedIdentityCredential(adtAppId);
var client = new DigitalTwinsClient(
new Uri(adtInstanceUrl),
cred,
new DigitalTwinsClientOptions
{
Transport = new HttpClientTransport(singletonHttpClientInstance)
});
try
{
log.LogInformation($"EventData: {System.Text.Json.JsonSerializer.Serialize(eventData)}");
// Get message body
string messageBody = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
log.LogInformation($"MessageBody: {messageBody}");
// Reading Device ID from message headers
JObject jbody = (JObject)JsonConvert.DeserializeObject(messageBody);
string deviceId = eventData.SystemProperties["iothub-connection-device-id"].ToString();
log.LogInformation($"DeviceId: {deviceId}");
// Extracting temperature from device telemetry
double temperature = Convert.ToDouble(jbody["temperature"].ToString());
// Update device Temperature property
var updateTwinData = new JsonPatchDocument();
updateTwinData.AppendReplace("/Temperature", temperature);
log.LogInformation($"ADT Patch Document: {updateTwinData.ToString()}");
await client.UpdateDigitalTwinAsync(deviceId, updateTwinData);
log.LogInformation($"Updated Temperature of device Twin {deviceId} to: {temperature}");
}
catch (Exception e)
{
// We need to keep processing the rest of the batch - capture this exception and continue.
log.LogError(e, "Function error");
}
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using Azure;
using Azure.Core.Pipeline;
using Azure.DigitalTwins.Core;
using Azure.Identity;
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
namespace Samples.AdtIothub
{
public static class DeleteDeviceInTwinFunc
{
private static string adtAppId = "https://digitaltwins.azure.net";
private static readonly string adtInstanceUrl = Environment.GetEnvironmentVariable("ADT_SERVICE_URL", EnvironmentVariableTarget.Process);
private static readonly HttpClient singletonHttpClientInstance = new HttpClient();
[FunctionName("DeleteDeviceInTwinFunc")]
public static async Task Run(
[EventHubTrigger("lifecycleevents", Connection = "EVENTHUB_CONNECTIONSTRING")] EventData[] events, ILogger log)
{
var exceptions = new List<Exception>(events.Length);
// Create Digital Twin client
var cred = new ManagedIdentityCredential(adtAppId);
var client = new DigitalTwinsClient(
new Uri(adtInstanceUrl),
cred,
new DigitalTwinsClientOptions
{
Transport = new HttpClientTransport(singletonHttpClientInstance)
});
foreach (EventData eventData in events)
{
try
{
//log.LogDebug($"EventData: {System.Text.Json.JsonSerializer.Serialize(eventData)}");
string opType = eventData.Properties["opType"] as string;
if (opType == "deleteDeviceIdentity")
{
string deviceId = eventData.Properties["deviceId"] as string;
try
{
// Find twin based on the original Registration ID
BasicDigitalTwin digitalTwin = await client.GetDigitalTwinAsync<BasicDigitalTwin>(deviceId);
// In order to delete the twin, all relationships must first be removed
await DeleteAllRelationshipsAsync(client, digitalTwin.Id, log);
// Delete the twin
await client.DeleteDigitalTwinAsync(digitalTwin.Id, digitalTwin.ETag);
log.LogInformation($"Twin {digitalTwin.Id} deleted in DT");
}
catch (RequestFailedException e) when (e.Status == (int)HttpStatusCode.NotFound)
{
log.LogWarning($"Twin {deviceId} not found in DT");
}
}
}
catch (Exception e)
{
// We need to keep processing the rest of the batch - capture this exception and continue.
exceptions.Add(e);
}
}
if (exceptions.Count > 1)
throw new AggregateException(exceptions);
if (exceptions.Count == 1)
throw exceptions.Single();
}
/// <summary>
/// Deletes all outgoing and incoming relationships from a specified digital twin
/// </summary>
public static async Task DeleteAllRelationshipsAsync(DigitalTwinsClient client, string dtId, ILogger log)
{
AsyncPageable<BasicRelationship> relationships = client.GetRelationshipsAsync<BasicRelationship>(dtId);
await foreach (BasicRelationship relationship in relationships)
{
await client.DeleteRelationshipAsync(dtId, relationship.Id, relationship.ETag);
log.LogInformation($"Twin {dtId} relationship {relationship.Id} deleted in DT");
}
AsyncPageable<IncomingRelationship> incomingRelationships = client.GetIncomingRelationshipsAsync(dtId);
await foreach (IncomingRelationship incomingRelationship in incomingRelationships)
{
await client.DeleteRelationshipAsync(incomingRelationship.SourceId, incomingRelationship.RelationshipId);
log.LogInformation($"Twin {dtId} incoming relationship {incomingRelationship.RelationshipId} from {incomingRelationship.SourceId} deleted in DT");
}
}
}
}
The Azure Device Provisioning Service (DPS) for IoT Hub automatically provisions registered devices so that they will communicate with the correct IoT Hub. The DPS can also be customized to do additional tasks. In our case, we will customize it to also create a Twin in Azure Digital Twins for every device provisioned.
The device is given back connection details for the appropriate IoTHub where it will then start sending telemetry data.
The DPS acts like a 'front desk' in a hotel setting, where it identifies the 'guest' (device) and if the 'guest' has a 'booking' / 'reservation' (enrollment) directs him/her to the appropriate 'room' (IoTHub).
Setting up a DPS
Next we will be setting up a simulated device that's supposedly freshly manufactured. The simulator can be run on your local machine.
Setting up a simulated freshly manufactured device
./sandbox/PnPDevices/azure-iot-rpisimulator
npm install
Testing the DPS
Open Azure Digital Twins and run this query.
SELECT * FROM DIGITALTWINS WHERE IS_OF_MODEL('dtmi:azurertos:devkit:gsgmxchip;2')
Run the simulator using the ff. command:
npm start
Run the query in step 1 again in Azure Digital Twins. The simulated device should now appear in the twin graph.
Azure EventHub is a service for ingesting streams of data from various sources including IoT telemetry. This data can be used by various consumers for further processing and analysis. In our case, we will be using EventHub to:
In the following steps, we'll be showing how to setup an event hub for use case 1, i.e., for the DeviceTelemetryToTwinFunc function. You can use the same steps for setting up use case 2 (DeleteDeviceInTwinFunc).
Creating an EventHub Namespace
Creating an EventHub
Setting up Azure Functions
Setting up IoTHub
In IoTHub, we need to route messages towards the appropriate EventHub. To do this, we'll use IoTHub's built-in message routing.
We can verify that we are updating our twin using the telemetry data from the simulated device. In ADT, we can see that our provisioned device initially has 0 for the value of temperature.
After running the simulated device, we should get the following logs for 'DeviceTelemetryToTwinFunc' (enable 'Application Insights' to see function logs). We should see that the temperature is being set to the value 2195.
Upon reloading the query in ADT, we can see that the value for temperature of the twin is also updated.
Given that we've already added an EventHub for 'lifecycleevents', we can now perform auto-retiring on the simulated device, i.e., when a device is deleted in IoTHub, its twin in ADT should also be deleted. To test this, we'll simply delete our provisioned simulated device from IoTHub.
We can verify in the DeleteDeviceInTwinFunc function logs that deletion on ADT was performed.
We can also verify in ADT that our simulated device's twin no longer exists.
%pip install azure-digitaltwins-core azure-identity
# Establish a client connection to ADT
import os import azure.identity import azure.digitaltwins.core
url = os.getenv("AZURE_ADT_URL")
credential = azure.identity.DefaultAzureCredential() service_client = azure.digitaltwins.core.DigitalTwinsClient(url, credential)
query_expression ='SELECT Station FROM DIGITALTWINS Station JOIN Process RELATED Station.isStepOf Relationship WHERE Process.$dtId = \'pp_BodyPanelProduction_01\'' query_result = service_client.query_twins(query_expression) print('DigitalTwins:') for twin in query_result: print(twin)
5. Add the following codeblock in a new cell under the previous. This codeblock uses the ADT client to query the telemetry of one sensor, obtaining a value every 5 seconds.
```python
# Query ADT for the Process Delay telemetry for the Coating Station
import time
query_expression = 'SELECT Device.Delay FROM DigitalTwins Device WHERE $dtId=\'sim000001\''
txt = '{}:{}'
for x in range(49):
query_result = service_client.query_twins(query_expression)
for value in query_result:
print(txt.format(x,value))
time.sleep(5)
AZURE_ADT_URL=<The URL for your ADT in Azure, including 'https://'>
AZURE_TENANT_ID=<The tenant ID of your Azure Active Directory>
AZURE_CLIENT_ID=<The application (client) ID registered in the AAD tenant>
AZURE_CLIENT_SECRET=<The client secret for the registered application>
SIM_WORKSPACE=<the workspace ID from your Bonsai workspace>
SIM_ACCESS_KEY=<the access key from your Bonsai workspace>
%pip install git+https://github.com/microsoft/bonsai-common
interface_file_path = "/FileStore/tables/cartpole-py/cartpole_interface.json"
display(dbutils.fs.ls("dbfs:" + interface_file_path))
"""
Classic cart-pole system implemented by Rich Sutton et al.
Derived from http://incompleteideas.net/sutton/book/code/pole.c
permalink: https://perma.cc/C9ZM-652R
"""
__copyright__ = "Copyright 2020, Microsoft Corp."
import math import os import random import sys import json
from bonsai_common import SimulatorSession, Schema from microsoft_bonsai_api.simulator.client import BonsaiClientConfig from microsoft_bonsai_api.simulator.generated.models import SimulatorInterface
GRAVITY = 9.8 # a classic... CART_MASS = 0.31 # kg POLE_MASS = 0.055 # kg TOTAL_MASS = CART_MASS + POLE_MASS POLE_HALF_LENGTH = 0.4 / 2 # half the pole's length in m POLE_MASS_LENGTH = POLE_MASS * POLE_HALF_LENGTH FORCE_MAG = 1.0 STEP_DURATION = 0.02 # seconds between state updates (20ms) TRACK_WIDTH = 1.0 # m FORCE_NOISE = 0.02 # % of FORCE_MAG
class CartPoleModel(SimulatorSession): def reset( self, initial_cart_position: float = 0, initial_pole_angle: float = 0, target_pole_position: float = 0, ):
self._cart_position = initial_cart_position
# cart velocity (m/s)
self._cart_velocity = 0
# cart angle (rad)
self._pole_angle = initial_pole_angle
# pole angular velocity (rad/s)
self._pole_angular_velocity = 0
# pole position (m)
self._pole_center_position = 0
# pole velocity (m/s)
self._pole_center_velocity = 0
# target pole position (m)
self._target_pole_position = target_pole_position
def step(self, command: float):
# We are expecting the input command to be -1 or 1,
# but we'll support a continuous action space.
# Add a small amount of random noise to the force so
# the policy can't succeed by simply applying zero
# force each time.
force = FORCE_MAG * (command + random.uniform(-0.02, 0.02))
cosTheta = math.cos(self._pole_angle)
sinTheta = math.sin(self._pole_angle)
temp = (
force + POLE_MASS_LENGTH * self._pole_angular_velocity ** 2 * sinTheta
) / TOTAL_MASS
angularAccel = (GRAVITY * sinTheta - cosTheta * temp) / (
POLE_HALF_LENGTH * (4.0 / 3.0 - (POLE_MASS * cosTheta ** 2) / TOTAL_MASS)
)
linearAccel = temp - (POLE_MASS_LENGTH * angularAccel * cosTheta) / TOTAL_MASS
self._cart_position = self._cart_position + STEP_DURATION * self._cart_velocity
self._cart_velocity = self._cart_velocity + STEP_DURATION * linearAccel
self._pole_angle = (
self._pole_angle + STEP_DURATION * self._pole_angular_velocity
)
self._pole_angular_velocity = (
self._pole_angular_velocity + STEP_DURATION * angularAccel
)
# Use the pole center, not the cart center, for tracking
# pole center velocity.
self._pole_center_position = (
self._cart_position + math.sin(self._pole_angle) * POLE_HALF_LENGTH
)
self._pole_center_velocity = (
self._cart_velocity
+ math.sin(self._pole_angular_velocity) * POLE_HALF_LENGTH
)
def halted(self):
# If the pole has fallen past 45 degrees, there's no use in continuing.
return abs(self._pole_angle) >= math.pi / 4
def state(self):
return {
"cart_position": self._cart_position,
"cart_velocity": self._cart_velocity,
"pole_angle": self._pole_angle,
"pole_angular_velocity": self._pole_angular_velocity,
"pole_center_position": self._pole_center_position,
"pole_center_velocity": self._pole_center_velocity,
"target_pole_position": self._target_pole_position,
}
# Callbacks
def get_state(self):
return self.state()
def get_interface(self) -> SimulatorInterface:
with open("/dbfs" + interface_file_path, "r") as file:
json_interface = file.read()
interface = json.loads(json_interface)
return SimulatorInterface(
name=interface["name"],
timeout=interface["timeout"],
simulator_context=self.get_simulator_context(),
description=interface["description"],
)
def episode_start(self, config: Schema):
self.reset(
config.get("initial_cart_position") or 0,
config.get("initial_pole_angle") or 0,
config.get("target_pole_position") or 0,
)
def episode_step(self, action: Schema):
self.step(action.get("command") or 0)
if name == "main": config = BonsaiClientConfig(argv=sys.argv) cartpole = CartPoleModel(config) cartpole.reset() while cartpole.run(): continue
### Microsoft Bonsai Teaching ###
#### Testing The Simulation ####
#### Importing The Simulation ####
#### Creating The Brain ####
#### Teaching The Brain ####
#### Exporting The Brain ####
#### Running A Simulation Using The Brain ####
#### Other Scenarios For Using The Trained Brain ####