CymaticLabs / Unity3D.Amqp

AMQP client library for Unity 3D supporting RabbitMQ
MIT License
97 stars 21 forks source link

How would i send messages to an exchange? #21

Open danammeansbear opened 1 year ago

danammeansbear commented 1 year ago

I basically want to be able to send messages to the queue or exchange that goes into moving the cubes.

I have the following code written for a c# console app but how would i convert this to your unity code?

  foreach (string greenhouselist in Greenhouse.GreenhouseList)
                {
                    channel.QueueDeclare($"{greenhouselist}Queue",
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);

                    //channel.ExchangeDeclare("amq.topic", ExchangeType.Fanout);

                    var Count = 0;

                    foreach (string Sensor in Sensor.SensorList)
                    {
                        //var message = new { Name = "Plant Event", Location = greenhouselist, Sensor = Sensor, Saverity = "high", Message = $"Hello! Water me!! Count:{Count}" };
                        //var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));

                        //// string query = "INSERT INTO dbo.greenhouse.message (Name,Price,Date) VALUES('LED Screen','$120','27 January 2017')";
                        ////SqlCommand cmd = new SqlCommand(query, con);
                        //channel.BasicPublish("", $"{greenhouselist}Queue", null, body);
                        foreach (string cube in cube.cubeList)
                        {
                            Random random = new Random();
                            //{ "id":"cube2" "rotY":45 }
                            //{ "id":"cube2", "rotY":45 }
                            //[{ "id":"cube1", "rotX":45 }, { "id":"cube2", "rotY":45 }, { "id":"cube3", "rotZ":45 }]
                            var message1 = @"{ ""id"":""cube2"", ""rotY"":45 }";
                            var message = @"[{ ""id"":""cube2"", ""rotY"":45 }, { ""id"":""cube1"", ""rotX"":90, ""rotY"":23 }, { ""id"":""cube3"", ""rotZ"":29 }]";
                            var message2 = System.Text.RegularExpressions.Regex.Replace(message, "[@\\.;'\\\\]", "");
                            //var message2 = message.Replace(@"\", string.Empty);
                            //var message =  new { id = "Plant Event", rot = greenhouselist, Sensor = Sensor};
                            var body2 = Encoding.UTF8.GetBytes(message2);

                            channel.BasicPublish(exchange: "amq.topic",
                                                 routingKey: "amqpdemo.objects",
                                                 basicProperties: null,
                                                 body: body2);

                            Count++;
                            Thread.Sleep(1000);
                        }
                        Count++;
                        Thread.Sleep(1000);
                    }

    }
            }
danammeansbear commented 1 year ago

`using System.Collections.Generic; using UnityEngine; using UnityEngine.UI; using CymaticLabs.Unity3D.Amqp.SimpleJSON;

namespace CymaticLabs.Unity3D.Amqp.UI { ///

/// Performs UI logic for the demo AMQP connection form. /// public class SensorScript : MonoBehaviour // public class AmqpConnectionForm : MonoBehaviour {

    [Tooltip("The name of the exchange to subscribe to.")]
    public string ExchangeName;

    [Tooltip("The exchange type for the exchange being subscribed to. It is important to get this value correct as the RabbitMQ client will close a connection if you pass the wrong type for an already declared exchange.")]
    public AmqpExchangeTypes ExchangeType = AmqpExchangeTypes.Topic;

    [Tooltip("The optional routing key to use when subscribing to the exchange. This mostly applies to 'topic' exchanges.")]
    public string RoutingKey;
    [Tooltip("When enabled received messages will be logged to the debug console.")]
    public bool DebugLogMessages = false;
    #region Fields

    // Internal look-up table of object references given their AMQP ID
    private Dictionary<string, AmqpObjectControlReference> objectsById;

    #endregion Fields

    #region Properties

    /// <summary>
    /// Gets the static, singleton instance of the behaviour.
    /// </summary>
    public static SensorScript Instance { get; private set; }

    #endregion Properties
    #region Methods
    private void Awake()
    {
        // Set static instance
        Instance = this;

        // Initialize the object/id look-up table
        objectsById = new Dictionary<string, AmqpObjectControlReference>();
    }

    // *Note*: Only interact with the AMQP library in Start(), not Awake() 
    // since the AmqpClient initializes itself in Awake() and won't be ready yet.
    private void Start()
    {
        // Create a new exchange subscription using the inspector values
        var subscription = new AmqpExchangeSubscription(ExchangeName, ExchangeType, RoutingKey, HandleExchangeMessageReceived);

        /*
         * Add the subscription to the client. If you are using multiple AmqpClient instances then
         * using the static methods won't work. In that case add a inspector property of type 'AmqpClient'
         * and assigned a reference to the connection you want to work with and call the 'SubscribeToExchange()'
         * non-static method instead.
         */
        AmqpClient.Subscribe(subscription);
    }
    private void Update()
    {
        Publish();
    }
    public void Publish()
    {
        // Validate args
        var isValid = true;

        //var exchangeName = PublishExchange.options[PublishExchange.value].text;
        var exchangeName = "amq.topic";

        if (string.IsNullOrEmpty(exchangeName))
        {
            isValid = false;
            AmqpConsole.Color = Color.red;
            AmqpConsole.WriteLine("* Exchange Name cannot be blank");
            AmqpConsole.Color = null;
        }

        var message = "insert some sort of message here probably json or xml";

        if (string.IsNullOrEmpty(message))
        {
            isValid = false;
            AmqpConsole.Color = Color.red;
            AmqpConsole.WriteLine("* Message cannot be blank");
            AmqpConsole.Color = null;
        }

        // Don't continue if values are invald
        if (!isValid) return;

        var exchangeType = AmqpExchangeTypes.Direct;

        // Find this exchange and get its exchange type
        foreach (var exchange in exchanges)
        {
            if (exchange.Name == exchangeName)
            {
                exchangeType = exchange.Type;
                break;
            }
        }

        var routingKey = "amqpdemo.objects";

        // Publish the message
        AmqpClient.Publish(exchangeName, routingKey, message);
        //PublishMessage.text = null; // clear out message

        // Refocus the message area
        //PublishMessage.Select();
        //PublishMessage.ActivateInputField();
    }
    /// <summary>
    /// Registers a new AMQP object control reference with the controller.
    /// </summary>
    /// <param name="objRef">The object control reference to register.</param>
    public void RegisterObject(AmqpObjectControlReference objRef)
    {
        if (objRef == null) throw new System.ArgumentNullException("objRef");

        // Ensure this reference has been filled out properly
        if (string.IsNullOrEmpty(objRef.AmqpId))
        {
            Debug.LogWarningFormat("AMQP Control Object Reference is missing its ID: {0}", objRef.name);
            return;
        }

        // Add new
        if (!objectsById.ContainsKey(objRef.AmqpId))
        {
            objectsById.Add(objRef.AmqpId, objRef);
        }
        // Replace, but warn
        else
        {
            Debug.LogWarningFormat("AMQP Control Object Reference with ID has already been registered: {0}", objRef.AmqpId);
            objectsById[objRef.AmqpId] = objRef;
        }

        if (DebugLogMessages)
            Debug.LogFormat("AMQP Control Object registered with ID {0} => {1}", objRef.AmqpId, objRef.name);
    }

    /// <summary>
    /// unregisters an existing AMQP object control reference from the controller.
    /// </summary>
    /// <param name="objRef">The object control reference to unregister.</param>
    public void UnregisterObject(AmqpObjectControlReference objRef)
    {
        if (objRef == null) throw new System.ArgumentNullException("objRef");

        // Ensure this reference has been filled out properly
        if (string.IsNullOrEmpty(objRef.AmqpId))
        {
            Debug.LogWarningFormat("AMQP Control Object Reference is missing its ID: {0}", objRef.name);
            return;
        }

        if (objectsById.ContainsKey(objRef.AmqpId))
        {
            objectsById.Remove(objRef.AmqpId);
            if (DebugLogMessages) Debug.LogFormat("AMQP Control Object Reference unregistere {0}", objRef.AmqpId);
        }
    }

    /**
     * Handles messages receieved from this object's subscription based on the exchange name,
     * exchange type, and routing key used. You could also write an anonymous delegate in line
     * when creating the subscription like: (received) => { Debug.Log(received.Message.Body.Length); }
     */
    void HandleExchangeMessageReceived(AmqpExchangeReceivedMessage received)
    {
        // First convert the message's body, which is a byte array, into a string for parsing the JSON
        var receivedJson = System.Text.Encoding.UTF8.GetString(received.Message.Body);

        Debug.Log(receivedJson);

        /**
         *  Parse the JSON message
         *  This example uses the SimpleJSON parser which is included in the AMQP library.
         *  You can find out more about this parser here: http://wiki.unity3d.com/index.php/SimpleJSON
        */

        // If this starts with a bracket, it's an array of messages, so decode separately
        if (receivedJson.StartsWith("["))
        {
            var msgList = JSON.Parse(receivedJson).AsArray;

            for (var i = 0; i < msgList.Count; i++)
            {
                var msg = msgList[i];
                UpdateObject(msg);
            }
        }

        // Otherwise it's an individual message so decode individually
        else
        {
            var msg = JSON.Parse(receivedJson);
            UpdateObject(msg);
        }
    }

    // Updates an object in the list with the given update message
    void UpdateObject(JSONNode msg)
    {
        // Get the message ID filter, if any
        var id = msg != null ? msg["id"].Value : null;

        if (string.IsNullOrEmpty(id))
        {
            if (DebugLogMessages) Debug.LogWarning("AMQP message received without 'id' property.");
            return;
        }

        // Get the object given its message ID
        if (!objectsById.ContainsKey(id))
        {
            if (DebugLogMessages) Debug.LogWarningFormat("No AMQP Object Control Reference found for ID: {0}.", id);
            return;
        }

        // Get the object reference for this ID
        var objRef = objectsById[id];

        if (UpdatePosition)
        {
            // If the property exists use its value, otherwise just use the current value
            var objPos = UpdateInWorldSpace ? objRef.transform.position : objRef.transform.localPosition;
            var posX = msg["posX"] != null ? msg["posX"].AsFloat : objPos.x;
            var posY = msg["posY"] != null ? msg["posY"].AsFloat : objPos.y;
            var posZ = msg["posZ"] != null ? msg["posZ"].AsFloat : objPos.z;

            // Update with new values
            if (UpdateInWorldSpace)
            {
                objRef.transform.position = new Vector3(posX, posY, posZ);
            }
            else
            {
                objRef.transform.localPosition = new Vector3(posX, posY, posZ);
            }
        }

        if (UpdateRotation)
        {
            // If the property exists use its value, otherwise just use the current value
            var objRot = UpdateInWorldSpace ? objRef.transform.eulerAngles : objRef.transform.localEulerAngles;
            var rotX = msg["rotX"] != null ? msg["rotX"].AsFloat : objRot.x;
            var rotY = msg["rotY"] != null ? msg["rotY"].AsFloat : objRot.y;
            var rotZ = msg["rotZ"] != null ? msg["rotZ"].AsFloat : objRot.z;

            // Update with new values
            if (UpdateInWorldSpace)
            {
                objRef.transform.eulerAngles = new Vector3(rotX, rotY, rotZ);
            }
            else
            {
                objRef.transform.localEulerAngles = new Vector3(rotX, rotY, rotZ);
            }
        }

        if (UpdateScale)
        {
            // If the property exists use its value, otherwise just use the current value
            var scaleX = msg["sclX"] != null ? msg["sclX"].AsFloat : objRef.transform.localScale.x;
            var scaleY = msg["sclY"] != null ? msg["sclY"].AsFloat : objRef.transform.localScale.y;
            var scaleZ = msg["sclZ"] != null ? msg["sclZ"].AsFloat : objRef.transform.localScale.z;

            // Update with new values
            objRef.transform.localScale = new Vector3(scaleX, scaleY, scaleZ);
        }
    }

    /// <summary>
    /// Publishes a message to the current exchange using the form's input values.
    /// </summary>

    #region Event Handlers

    // Handles a connection event
    void HandleConnected(AmqpClient client)
    {
        //Connection.interactable = false;
        //ConnectButton.interactable = false;
        //DisconnectButton.interactable = true;

        // Query exchange list
        AmqpClient.GetExchangesAsync(FinishConnected);
    }

    // Finishes the connection event by receiving the async result of the exchange query and display the results in the drop down
    void FinishConnected(AmqpExchange[] exchangeList)
    {
        // Copy list locally
        exchanges = exchangeList;

        //foreach (var exchange in exchanges)
        //{
        //    if (exchange.Name == null || exchange.Name == "/") continue;
        //    var option = new Dropdown.OptionData(exchange.Name);
        //    ExchangeName.options.Add(option);
        //    PublishExchange.options.Add(option);
        //}

        //if (exchanges.Length > 0)
        //{
        //    ExchangeName.RefreshShownValue();
        //    PublishExchange.RefreshShownValue();
        //}

        // Enable UI
        //ExchangeName.interactable = true;
        //RoutingKey.interactable = true;
        //SubscribeButton.interactable = true;
        //UnsubscribeButton.interactable = true;

        //PublishButton.interactable = true;
        //PublishExchange.interactable = true;
        //PublishMessage.interactable = true;
        //PublishRoutingKey.interactable = true;
    }

    // Handles a disconnection event
    void HandleDisconnected(AmqpClient client)
    {
        //Connection.interactable = true;
        //ConnectButton.interactable = true;
        //DisconnectButton.interactable = false;

        //ExchangeName.interactable = false;
        //RoutingKey.interactable = false;
        //SubscribeButton.interactable = false;
        //UnsubscribeButton.interactable = false;

        //PublishButton.interactable = false;
        //PublishExchange.interactable = false;
        //PublishMessage.interactable = false;
        //PublishRoutingKey.interactable = false;
    }

    // Handles a reconnecting event
    void HandleReconnecting(AmqpClient client)
    {

    }

    // Handles a blocked event
    void HandleBlocked(AmqpClient client)
    {

    }

    // Handles exchange subscribes
    void HandleExchangeSubscribed(AmqpExchangeSubscription subscription)
    {
        // Add it to the local list
        exSubscriptions.Add(subscription);
    }

    // Handles exchange unsubscribes
    void HandleExchangeUnsubscribed(AmqpExchangeSubscription subscription)
    {
        // Add it to the local list
        exSubscriptions.Remove(subscription);
    }

    #endregion Event Handlers

    #endregion Methods
}

} `

danammeansbear commented 1 year ago

so I imagine you just use the AMQP publish method but I feel like I am missing something. I am an agriculture major so coding isnt exactly my fortay but i try.