narutaro / note

0 stars 0 forks source link

AWS IoT Device SDK for Python v2 #15

Open narutaro opened 12 months ago

narutaro commented 12 months ago

MQTT3 Client - PubSub

from awscrt import mqtt
from awsiot import mqtt_connection_builder
import time
import json

target_ep = '<code>-ats.iot.ap-northeast-1.amazonaws.com'
thing_name = 'f451f939'
cert_filepath = 'f451f939/device.pem.crt'
private_key_filepath = 'f451f939/private.pem.key'
ca_filepath = 'f451f939/AmazonRootCA1.pem'
topic = 'device/{}/data'.format(thing_name)

# Callback when the subscribed topic receives a message
def on_message_received(topic, payload, dup, qos, retain, **kwargs):
    print("Received message from topic '{}': {}".format(topic, payload))

mqtt_connection = mqtt_connection_builder.mtls_from_path(
    endpoint=target_ep,
    port=8883,
    cert_filepath=cert_filepath,
    pri_key_filepath=private_key_filepath,
    ca_filepath=ca_filepath,
    client_id=thing_name,
    clean_session=True,
    keep_alive_secs=30)

print("Connecting to {} with client ID '{}'...".format(
    target_ep, thing_name))

#Connect to the gateway
while True:
  try:
    connect_future = mqtt_connection.connect()
    # Future.result() waits until a result is available
    connect_future.result()
  except:
    print("Connection to IoT Core failed...  retrying in 5s.")
    time.sleep(5)
    continue
  else:
    print("Connected!")
    break

# Subscribe
print("Subscribing to topic " + topic)
subscribe_future, packet_id = mqtt_connection.subscribe(
    topic=topic,
    qos=mqtt.QoS.AT_LEAST_ONCE,
    callback=on_message_received)

subscribe_result = subscribe_future.result()
print("Subscribed with {}".format(str(subscribe_result['qos'])))

while True:
    print ('Publishing message on topic {}'.format(topic))

    hello_world_message = {
        'message' : 'Hello from {}!'.format(thing_name)
    }

    message_json = json.dumps(hello_world_message)
    mqtt_connection.publish(
        topic=topic,
        payload=message_json,
        qos=mqtt.QoS.AT_LEAST_ONCE)
    time.sleep(5)
narutaro commented 12 months ago

MQTT5 Client - PubSub


from awsiot import mqtt5_client_builder
from awscrt import mqtt5
import time
import json

target_ep = '<code>-ats.iot.ap-northeast-1.amazonaws.com'
thing_name = 'f451f939'
cert_filepath = 'f451f939/device.pem.crt'
private_key_filepath = 'f451f939/private.pem.key'
ca_filepath = 'f451f939/AmazonRootCA1.pem'

topic = 'device/{}/data'.format(thing_name)

# Callback when any publish is received
def on_publish_received(publish_packet_data):
    publish_packet = publish_packet_data.publish_packet
    assert isinstance(publish_packet, mqtt5.PublishPacket)
    print("Received message from topic'{}':{}".format(publish_packet.topic, publish_packet.payload))

client = mqtt5_client_builder.mtls_from_path(
    endpoint=target_ep,
    port=8883,
    cert_filepath=cert_filepath,
    pri_key_filepath=private_key_filepath,
    ca_filepath=ca_filepath,
    client_id=thing_name,
    clean_session=True,
    keep_alive_secs=30,
    on_publish_received=on_publish_received,
    )

print("Connecting to {} with client ID '{}'...".format(target_ep, thing_name))

#Connect to the gateway
while True:
  try:
    client.start()
  except:
    print("Connection to IoT Core failed...  retrying in 5s.")
    time.sleep(5)
    continue
  else:
    print("Connected: {}".format(client))
    break

# Subscribe
print("Subscribing to topic '{}'...".format(topic))
subscribe_future = client.subscribe(subscribe_packet=mqtt5.SubscribePacket(
    subscriptions=[mqtt5.Subscription(
    topic_filter=topic,
    qos=mqtt5.QoS.AT_LEAST_ONCE)]
))

suback = subscribe_future.result()
print("Subscribed with {}".format(suback))

while True:
    print ('Publishing message on topic {}'.format(topic))

    hello_world_message = {
        'message' : 'Hello from {}!'.format(thing_name)
    }

    message_json = json.dumps(hello_world_message)
    publish_future = client.publish(mqtt5.PublishPacket(
        topic=topic,
        payload=message_json,
        qos=mqtt5.QoS.AT_LEAST_ONCE))
    puback = publish_future.result()
    print("PubAck: {}".format(puback))
    time.sleep(5)
narutaro commented 12 months ago

MQTT3 Client - Shadow

from awscrt import io, mqtt, auth, http
from awscrt import mqtt
from awsiot import mqtt_connection_builder
from awsiot import iotshadow

# Setup our MQTT client and security certificates
# Make sure your certificate names match what you downloaded from AWS IoT
target_ep = '<code>-ats.iot.ap-northeast-1.amazonaws.com'
thing_name = 'f451f939'
cert_filepath = 'f451f939/device.pem.crt'
private_key_filepath = 'f451f939/private.pem.key'
ca_filepath = 'f451f939/AmazonRootCA1.pem'

# Our motor is not currently running.
MOTOR_STATUS = "OK3"

mqtt_connection = mqtt_connection_builder.mtls_from_path(
    endpoint=target_ep,
    port=8883,
    cert_filepath=cert_filepath,
    pri_key_filepath=private_key_filepath,
    ca_filepath=ca_filepath,
    client_id=thing_name,
    clean_session=True,
    keep_alive_secs=30,
)

print("Connecting to {} with client ID '{}'...".format(target_ep, thing_name))

# Connect to the gateway
while True:
    try:
        connect_future = mqtt_connection.connect()
        connect_result = connect_future.result()
    except:
        print("Connection to IoT Core failed... retrying in 5s.")
        time.sleep(5)
        continue
    else:
        print("Connected: {}".format(mqtt_connection))
        break

# Set up the Classic Shadow handler
shadowClient = iotshadow.IotShadowClient(mqtt_connection)

print("Shadow Client: {}".format(shadowClient))

# Function to update the Classic Shadow
def updateDeviceShadow():
    global shadowClient
    global MOTOR_STATUS

    # Set the Classic Shadow with the current motor status and check if it was successful
    print("Updating shadow with reported motor status")
    payload = {"MOTOR": MOTOR_STATUS}
    shadowMessage = iotshadow.ShadowState(reported=payload)
    update_shadow_request = iotshadow.UpdateShadowRequest(state=shadowMessage, thing_name=thing_name)
    update_shadow_future = shadowClient.publish_update_shadow(request=update_shadow_request, qos=mqtt.QoS.AT_LEAST_ONCE)
    update_shadow_future.add_done_callback(on_classic_shadow_update_complete)

# Callback to update the Classic Shadow
def on_classic_shadow_update_complete(update_shadow_future):
    update_shadow_result = update_shadow_future.result()
    print("Device shadow reported properties updated")

    if update_shadow_result is not None:
        sys.exit("Server rejected Classic Shadow update request")

# Set the initial motor status in the device shadow
updateDeviceShadow()

# Make sure the classic shadow in the console.