Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Recieving NULL Message Body Content from Azure Service-Bus Queue

I am trying to implement a Java application that sends and receives messages to and from Azure service-bus queue. The connection to the portal and the sending of the messages are going quiet well, but when receiving the messages and getting the message body content, some of the variables are missing their values "null". How can I fix this problem?

Here is the code I have used to send and receive the messages:

CompletableFuture<Void> sendMessagesAsync(QueueClient sendClient) {
    List<HashMap<String, String>> data =
            GSON.fromJson(
                    "[" +
                            "{'Device ID' = 'FieldPanel_L1'},"+
                            "{'Sensor1name' = 'FieldPanel_SL1', 'value1' = '0', 'Location of Sensor1 X' = '0.0', 'Location of sensor1 Y' = '0.0'}," +
                            "{'Sensor2name' = 'FieldPanel_SL2', 'value2' = '0', 'Location of Sensor2 X' = '20.0', 'Location of sensor2 Y' = '0.0'},"+
                            "{'Sensor3name' = 'FieldPanel_SL3', 'value3' = '0', 'Location of Sensor3 X' = '40.0', 'Location of sensor3 Y' = '0.0'}"+
                        "]",
                        new TypeToken<List<HashMap<String, String>>>() {}.getType());

    List<CompletableFuture> tasks = new ArrayList<>();
    for (int i = 0; i < data.size(); i++) {
        final String messageId = Integer.toString(i);
        Message message = new Message(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8));
        message.setContentType("application/json");
        message.setLabel("FieldPanel");
        message.setMessageId(messageId);
        message.setTimeToLive(Duration.ofMinutes(2));
        System.out.printf("\nSending Message: Id = %s", message.getMessageId());
        tasks.add(
                sendClient.sendAsync(message).thenRunAsync(() -> {
                    System.out.printf("\n\tAcknowledged Message: Id = %s", message.getMessageId());
                }));
    }
    return CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[tasks.size()]));
} // Method sendMessageAsync()

void registerReceiver(QueueClient queueClient, ExecutorService executorService) throws Exception {

    // register the RegisterMessageHandler callback with executor service
    queueClient.registerMessageHandler(new IMessageHandler() {

    // callback invoked when the message handler loop has obtained a message
    public CompletableFuture<Void> onMessageAsync(IMessage message) {
        // received message is passed to callback
        if (message.getLabel() != null &&
                message.getContentType() != null &&
                message.getLabel().contentEquals("FieldPanel") &&
                message.getContentType().contentEquals("application/json")) {

                byte[] body = message.getBody();
                Map fieldPanel = GSON.fromJson(new String(body, UTF_8), Map.class);

                System.out.printf("\n\t\t\tMessage received: \n\t\t\t\t\tMessageId = %s, \n\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\tEnqueuedTimeUtc = %s," +
                "\n\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\tContentType = \"%s\",  \n\t\t\t\t\tContent: [ Sensor 1 Name = %s, Value = %s, Location of Sensor1 X = %s, Location of sensor1 Y = %s, Sensor 2 Name = %s, Value = %s, Location of Sensor2 X = %s, Location of sensor2 Y = %s, Sensor 3 Name = %s, Value = %s, Location of Sensor3 X = %s, Location of sensor3 Y = %s ]\n",
                message.getMessageId(),
                message.getSequenceNumber(),
                message.getEnqueuedTimeUtc(),
                message.getExpiresAtUtc(),
                message.getContentType(),

fieldPanel != null ? fieldPanel.get("Sensor1name") : "ERROR" ,
fieldPanel != null ? fieldPanel.get("value1") : "ERROR",
fieldPanel != null ? fieldPanel.get("Location of Sensor1 X") : "ERROR",
fieldPanel != null ? fieldPanel.get("Location of sensor1 Y") : "ERROR",
fieldPanel != null ? fieldPanel.get("Sensor2name") : "ERROR",
fieldPanel != null ? fieldPanel.get("value2") : "ERROR",
fieldPanel != null ? fieldPanel.get("Location of Sensor2 X") : "ERROR",
fieldPanel != null ? fieldPanel.get("Location of sensor2 Y") : "ERROR",
fieldPanel != null ? fieldPanel.get("Sensor3name") : "ERROR",
fieldPanel != null ? fieldPanel.get("value3") : "ERROR",
fieldPanel != null ? fieldPanel.get("Location of Sensor3 X") : "ERROR",
fieldPanel != null ? fieldPanel.get("Location of sensor3 Y") : "ERROR");
                } //Message Body
                return CompletableFuture.completedFuture(null);
            }

 // callback invoked when the message handler has an exception to report
 public void notifyException(Throwable throwable, ExceptionPhase   exceptionPhase) {
                  System.out.printf(exceptionPhase + "-" + throwable.getMessage());}
}, new MessageHandlerOptions(1, true,Duration.ofMinutes(1)),executorService);

         } // Method registerReceiver()

This is the message output I receive on my console :

Message received: 
    MessageId = 1, 
    SequenceNumber = 15, 
    EnqueuedTimeUtc = 2019-07-22T08:40:29.161Z,
    ExpiresAtUtc = 2019-07-22T08:42:29.161Z, 
    ContentType = "application/json",  
Content: [ Sensor 1 Name = FieldPanel_SL1, Value = 0, Location of Sensor1 X = 0.0, Location of sensor1 Y = 0.0, Sensor 2 Name = null, Value = null, Location of Sensor2 X = null, Location of sensor2 Y = null, Sensor 3 Name = null, Value = null, Location of Sensor3 X = null, Location of sensor3 Y = null ]
like image 808
hessa Z Avatar asked Nov 07 '22 15:11

hessa Z


1 Answers

I reviewed your codes, and found that the outputs you got is right.

In your code, you create a list which contains 4 HashMaps. 4 message will be sent. Each message is part information of the device.

So, in the message you received, you can only get one part each time.

I guess that all the information in the json should be for one device. You actually want to send them in one message.

Here is the sample which may meet your requirement:

public class ServicebusBasicTest {

    static Gson GSON = new Gson();
    static String connectionString = "Endpoint=sb://ja**78.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=73**U=";

    static CompletableFuture<Void> sendMessagesAsync(QueueClient sendClient) {

        List<HashMap<String, String>> data =
                GSON.fromJson(
                        "[" +
                                "{'Device ID' = 'FieldPanel_L1'}," +
                                "{'Sensor1name' = 'FieldPanel_SL1', 'value1' = '0', 'Location of Sensor1 X' = '0.0', 'Location of sensor1 Y' = '0.0'}," +
                                "{'Sensor2name' = 'FieldPanel_SL2', 'value2' = '0', 'Location of Sensor2 X' = '20.0', 'Location of sensor2 Y' = '0.0'}," +
                                "{'Sensor3name' = 'FieldPanel_SL3', 'value3' = '0', 'Location of Sensor3 X' = '40.0', 'Location of sensor3 Y' = '0.0'}" +
                                "]",
                        new TypeToken<List<HashMap<String, String>>>() {}.getType()
                );

        List<CompletableFuture> tasks = new ArrayList<>();

        final String messageId = Integer.toString(1);
        Message message = new Message(GSON.toJson(data, List.class).getBytes(Charset.forName("UTF-8")));
        message.setContentType("application/json");
        message.setLabel("FieldPanel");
        message.setMessageId(messageId);
        message.setTimeToLive(Duration.ofMinutes(2));
        System.out.printf("\nSending Message: Id = %s\n", message.getMessageId());
        tasks.add(
                sendClient.sendAsync(message).thenRunAsync(() -> {
                    System.out.printf("\nMessage Sent: Id = %s\n", message.getMessageId());
                }));

        return CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[tasks.size()]));
    }

    static void registerReceiver(QueueClient queueClient, ExecutorService executorService) throws Exception {
        queueClient.registerMessageHandler(new IMessageHandler() {
                                               public CompletableFuture<Void> onMessageAsync(IMessage message) {
                                                   if (message.getLabel() != null &&
                                                           message.getContentType() != null &&
                                                           message.getLabel().contentEquals("FieldPanel") &&
                                                           message.getContentType().contentEquals("application/json")) {

                                                       List<HashMap<String, String>> fieldPanel = GSON.fromJson(new String(message.getBody(), Charset.forName("UTF-8")), new TypeToken<List<HashMap<String, String>>>() {}.getType());

                                                       System.out.printf(
                                                               "\nMessage received: \n -->MessageId = %s\n -->ContentType = %s",
                                                               message.getMessageId(),
                                                               message.getContentType()
                                                       );

                                                       System.out.print("\n -->Device info");
                                                       for ( HashMap<String,String> map: fieldPanel) {
                                                           for (Object object : map.entrySet()) {
                                                               Map.Entry entry = (Map.Entry) object;
                                                               System.out.printf("\n -----> %s : %s", entry.getKey(),entry.getValue());
                                                           }
                                                       }
                                                       System.out.println();
                                                   }
                                                   return queueClient.completeAsync(message.getLockToken());
                                               }

                                               public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
                                                   System.out.printf(exceptionPhase + "-" + throwable.getMessage());
                                               }
                                           },
                new MessageHandlerOptions(1, false, Duration.ofSeconds(5)),
                executorService
        );
    }

    public static void main(String[] args) throws Exception{
        QueueClient receiveClient = new QueueClient(new ConnectionStringBuilder(connectionString, "testqueue"), ReceiveMode.PEEKLOCK);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        registerReceiver(receiveClient, executorService);

        QueueClient client = new QueueClient(new ConnectionStringBuilder(connectionString, "testqueue"), ReceiveMode.PEEKLOCK);
        sendMessagesAsync(client).join();

        Thread.sleep(5000);

        client.close();
        receiveClient.close();
        executorService.shutdown();
    }
}

Console outputs:

Sending Message: Id = 1

Message Sent: Id = 1

Message received: 
 -->MessageId = 1
 -->ContentType = application/json
 -->Device info
 -----> Device ID : FieldPanel_L1
 -----> Sensor1name : FieldPanel_SL1
 -----> Location of sensor1 Y : 0.0
 -----> Location of Sensor1 X : 0.0
 -----> value1 : 0
 -----> Location of Sensor2 X : 20.0
 -----> Location of sensor2 Y : 0.0
 -----> value2 : 0
 -----> Sensor2name : FieldPanel_SL2
 -----> Location of Sensor3 X : 40.0
 -----> Location of sensor3 Y : 0.0
 -----> value3 : 0
 -----> Sensor3name : FieldPanel_SL3
like image 130
Jack Jia Avatar answered Nov 15 '22 05:11

Jack Jia