Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ c client library - amqp_basic_get() versus amqp_basic_consume() - container for data

Tags:

c

rabbitmq

In the RabbitMQ c client library there are two functions for getting messages from queues amqp_basic_get() and amqp_basic_consume().

An example call for amqp_basic_consume() would be the following. You pass in an envelope struct which has a field called message to hold the data.

  amqp_envelope_t envelope;
  int res = amqp_consume_message(conn_image_queue, &envelope, NULL, 0); 

--

typedef struct amqp_envelope_t_ {
  amqp_channel_t channel; /**< channel message was delivered on */
  amqp_bytes_t
      consumer_tag;      /**< the consumer tag the message was delivered to */
  uint64_t delivery_tag; /**< the messages delivery tag */
  amqp_boolean_t redelivered; /**< flag indicating whether this message is being
                                 redelivered */
  amqp_bytes_t exchange;      /**< exchange this message was published to */
  amqp_bytes_t
      routing_key; /**< the routing key this message was published with */
  amqp_message_t message; /**< the message */
} amqp_envelope_t;

However the API for does not pass an envelope object. Here is the definition below.

I call the function with res = amqp_basic_get(conn_image_queue,1,amqp_cstring_bytes(image_queue),1);

My question is, when you call amqp_basic_get() how do you access the data, where is it stored?

/**
 * Do a basic.get
 *
 * Synchonously polls the broker for a message in a queue, and
 * retrieves the message if a message is in the queue.
 *
 * \param [in] state the connection object
 * \param [in] channel the channel identifier to use
 * \param [in] queue the queue name to retrieve from
 * \param [in] no_ack if true the message is automatically ack'ed
 *              if false amqp_basic_ack should be called once the message
 *              retrieved has been processed
 * \return amqp_rpc_reply indicating success or failure
 *
 * \since v0.1
 */
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t AMQP_CALL amqp_basic_get(amqp_connection_state_t state,
                                          amqp_channel_t channel,
                                          amqp_bytes_t queue,
                                          amqp_boolean_t no_ack);

/**
 * Wait for and consume a message
 *
 * Waits for a basic.deliver method on any channel, upon receipt of
 * basic.deliver it reads that message, and returns. If any other method is
 * received before basic.deliver, this function will return an amqp_rpc_reply_t
 * with ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, and
 * ret.library_error == AMQP_STATUS_UNEXPECTED_STATE. The caller should then
 * call amqp_simple_wait_frame() to read this frame and take appropriate action.
 *
 * This function should be used after starting a consumer with the
 * amqp_basic_consume() function
 *
 * \param [in,out] state the connection object
 * \param [in,out] envelope a pointer to a amqp_envelope_t object. Caller
 *                 should call #amqp_destroy_envelope() when it is done using
 *                 the fields in the envelope object. The caller is responsible
 *                 for allocating/destroying the amqp_envelope_t object itself.
 * \param [in] timeout a timeout to wait for a message delivery. Passing in
 *             NULL will result in blocking behavior.
 * \param [in] flags pass in 0. Currently unused.
 * \returns a amqp_rpc_reply_t object.  ret.reply_type == AMQP_RESPONSE_NORMAL
 *          on success. If ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION,
 *          and ret.library_error == AMQP_STATUS_UNEXPECTED_STATE, a frame other
 *          than AMQP_BASIC_DELIVER_METHOD was received, the caller should call
 *          amqp_simple_wait_frame() to read this frame and take appropriate
 *          action.
 *
 * \since v0.4.0
 */
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t AMQP_CALL amqp_consume_message(amqp_connection_state_t state,
                                                amqp_envelope_t *envelope,
                                                struct timeval *timeout,
                                                int flags);
like image 374
arcoxia tom Avatar asked Sep 11 '25 10:09

arcoxia tom


1 Answers

The documentation for amqp_read_message suggests it is intended to be used after amqp_basic_get, to fetch the message:

Reads a complete message (header + body) on a specified channel. This function is intended to be used with amqp_basic_get() or when an AMQP_BASIC_DELIVERY_METHOD method is received.

like image 61
Raedwald Avatar answered Sep 14 '25 00:09

Raedwald