Sending MQTT Messages Using The Message Pump

This post explains how to send and receive messages from a System Message Pump. There is a previous post showing how to create a System Message Pump. This example uses the additional applications the previous example did and should be included when the project for this application is created. That post can be accessed here. Please look over that post first as this one uses code from and references that post. 

package mqttmessagepump;

import com.integpg.system.JANOS;
import com.integpg.system.MessagePump;
import com.integpg.system.SystemMsg;
import java.util.Json;
import java.util.Vector;
import java.util.logging.Level;
import java.util.logging.Logger;

public class MQTTMessagePump implements Runnable {

    private static final MQTTMessagePump THIS = new MQTTMessagePump();
    private static final MessagePump MESSAGE_PUMP = new MessagePump();
    private static final Vector LISTENERS = new Vector<>();

    private static Thread _thread;

    /**
     * Singleton constructor
     */
    private MQTTMessagePump() {
    }

    /**
     * adds a listener that will be alerted of all received messages. The
     * listener will be responsible for determining if the message has meaning
     * to them
     *
     * @param listener
     */
    public static void addListener(MessagePumpListener listener) {
        synchronized (LISTENERS) {
            LISTENERS.addElement(listener);
        }
    }

    /**
     * starts our message pump engine.
     */
    static void start() {
        if (null == _thread) {
            _thread = new Thread(THIS);
            _thread.setName("message-pump-engine");
            _thread.setDaemon(true);
            _thread.start();
        }
    }
    
    //completes the message 1600 after being sent from the MQTT application
    
    public static void completeMessage1600 (String recievedJsonInfo) {
        
        Json holdinginfo = new Json(recievedJsonInfo);
        String[] arrayofkeys = holdinginfo.keyarray();
        String subscribecheck = holdinginfo.getString(arrayofkeys[0]);
        String topicscheck = holdinginfo.getString(arrayofkeys[1]);
        String infocheck = holdinginfo.getString(arrayofkeys[2]);
        System.out.println(arrayofkeys[0] + ": " + subscribecheck + ", " + arrayofkeys[1] + ": " + topicscheck + ", " + arrayofkeys[2] + ": " + infocheck);
 
    }
    
    //creates the message that will be sent to the other program to complete
    
    public static SystemMsg createMessage (int messageType, byte [] messageInfo) {
        
        SystemMsg returnMsg = new SystemMsg();
        returnMsg.type = messageType;
        returnMsg.msg = messageInfo;
        
        System.out.println(returnMsg);
        return returnMsg;
        
    }
    
    //adds a value to the Json object that we be sent as part of the message to the other program
    
    public static Json addToJson(Json Jsonadd, String valuename, Object jsonvalue) {
    
        Jsonadd.put(valuename, jsonvalue);
        return Jsonadd;
        
    }
    
    @Override
    public void run() {
        System.out.println("open the message pump and start monitoring.\r\n");
        MESSAGE_PUMP.open();

        for (;;) {
            // read all messages from the message pump
            SystemMsg systemMsg = MESSAGE_PUMP.getMessage();

            // we must repost as fast as we can
            MESSAGE_PUMP.postMessage(systemMsg);

            // notify all of our listeners

            if (systemMsg.type == 1600) {

                System.out.println();
                System.out.println("Recieved command 1600.\n");
                String jsoninfo = new String(systemMsg.msg);
                completeMessage1600(jsoninfo);           

            }
            
             Json thirdjson = new Json();
             addToJson(thirdjson, "Message", "publish"); 
             addToJson(thirdjson, "Topic", "jnior/(Your JNIOR's Serial Number)/set/digital/outputs/1/state");
             addToJson(thirdjson, "Payload", "true");
             SystemMsg returnMsg = createMessage(1600, thirdjson.toString().getBytes());
             MESSAGE_PUMP.postMessage(returnMsg);    
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                Logger.getLogger(MQTTMessagePump.class.getName()).log(Level.SEVERE, null, ex);
            }

        }

    }

    /**
     * Exposes the postMesssage method of the System MessagePump
     *
     * @param msg the message the will be sent to the system message pump
     */
    public static void postMessage(SystemMsg msg) {
        MESSAGE_PUMP.postMessage(msg);
    }

}

To start, before we can create an application to talk with the MQTT application, we need to configure the MQTT application to hook up with a broker. Here is a download for the MQTT application update project. An online broker is needed in order to use the MQTT protocol. Free ones can be found online, but for this example the free hivemq broker is being used. This broker is public and if you want to use MQTT protocols outside this example, a secure one should be used. The only information you need to have in the MQTT application is the host URL and the port number. Then click connect and make sure the MQTT application is running on the JNIOR.

MQT connection in MQTT Application

Once the MQTT application is properly configured, the MQTT coding can be made. After creating a Message Pump program that could send messages to and from another application, the next step is to create and receive messages between the message pump and the MQTT application using the MQTT protocol. To start, we can create a new program that will interact with the MQTT application. We’ll call this program our MQTTMessage program. This sample program will only send a message to toggle an output, but could be edited to do more.

For this example, only the application containing the message pump from the previous example is needed. In the previous example, the message being sent between the applications are Json Objects. The message that needs to be created for this example is formatted as this:

{
"Message" : "publish",
"Topic" : STRING,
"Payload" : STRING
}

The MQTT application also uses Json Objects to communicate through the messages. Looking at the for loop when creating the message to send, the message has to be “published”, because when you are sending a message to the MQTT application, you are publishing information on a specified topic. The topic is the keyword that other devices are subscribed to. If a device is subscribed to that topic, then the device will receive the payload of the message. The payload is the actual data of the message that any device subscribed to the related topic will receive.

The run function in the previous example’s MessagePumpEngine application is replaced with a new for loop. When creating the message, it needs to be reformatted to match the Json message the MQTT application needs to receive. The message ID number has to be 1600, since that is the message ID the MQTT application is listening to. In this example, the Json object is created as:

{
  "Message" : "publish",
  "Topic" :jnior/"Your JNIOR's IP"/set/digital/outputs/1/state,
  "Payload" : true
}

The topic is to specify the JNIOR, what channel you want to toggle, and if its opened or closed. The payload sets that channels state to true, closing that output.

for (;;) {
            // read all messages from the message pump
            SystemMsg systemMsg = MESSAGE_PUMP.getMessage();

            // we must repost as fast as we can
            MESSAGE_PUMP.postMessage(systemMsg);

            // notify all of our listeners

            if (systemMsg.type == 1600) {

                System.out.println();
                System.out.println("Recieved command 1600.\n");
                String jsoninfo = new String(systemMsg.msg);
                completeMessage1600(jsoninfo);           

            }
            
             Json thirdjson = new Json();
             addToJson(thirdjson, "Message", "publish"); 
             addToJson(thirdjson, "Topic", "jnior/(Your JNIOR's Serial Number)/set/digital/outputs/1/state");
             addToJson(thirdjson, "Payload", "true");
             SystemMsg returnMsg = createMessage(1600, thirdjson.toString().getBytes());
             MESSAGE_PUMP.postMessage(returnMsg);    
            try {
                Thread.sleep(1000);
            } catch (InterruptedException ex) {
                Logger.getLogger(MQTTMessagePump.class.getName()).log(Level.SEVERE, null, ex);
            }

        }

The if statement checking the (systemMsg.type == 1600) is there because the JNIOR being used to publish the message is also listening the for messages on the message pump too, and those messages will have the same message ID 1600.

if (systemMsg.type == 1600) {

                System.out.println();
                System.out.println("Recieved command 1600.\n");
                String jsoninfo = new String(systemMsg.msg);
                completeMessage1600(jsoninfo);           

            }

Inside that if statement is a call to the completeMessage1600() function. That function will grab the data out of the Json object from the message and print it out for you to see.

public static void completeMessage1600 (String recievedJsonInfo) {
        
        Json holdinginfo = new Json(recievedJsonInfo);
        String[] arrayofkeys = holdinginfo.keyarray();
        String subscribecheck = holdinginfo.getString(arrayofkeys[0]);
        String topicscheck = holdinginfo.getString(arrayofkeys[1]);
        String infocheck = holdinginfo.getString(arrayofkeys[2]);
        System.out.println(arrayofkeys[0] + ": " + subscribecheck + ", " + arrayofkeys[1] + ": " + topicscheck + ", " + arrayofkeys[2] + ": " + infocheck);
 
    }

With these changes to the previous message pump application and MQTT application configuration, after running this example program you should be able to create and send messages through the MQTT application using the MQTT protocol.

By | Updated On November 3, 2023 2:28 pm | No Comments | Categories: , | Tags: ,


INTEG Process Group, inc. © 2023

Real-Time
Mon - Fri, 8am - 4pm EST
P: 724-933-9350
PureChat
Always Available
Contact Form
sales@integpg.com
support@integpg.com

@integpg
@jniordev
4529