In the RabbitMQ c client library there are two functions for getting messages from queues amqp_basic_get()
and amqp_basic_consume()
.
An example call for amqp_basic_consume()
would be the following. You pass in an envelope
struct which has a field called message
to hold the data.
amqp_envelope_t envelope;
int res = amqp_consume_message(conn_image_queue, &envelope, NULL, 0);
--
typedef struct amqp_envelope_t_ {
amqp_channel_t channel; /**< channel message was delivered on */
amqp_bytes_t
consumer_tag; /**< the consumer tag the message was delivered to */
uint64_t delivery_tag; /**< the messages delivery tag */
amqp_boolean_t redelivered; /**< flag indicating whether this message is being
redelivered */
amqp_bytes_t exchange; /**< exchange this message was published to */
amqp_bytes_t
routing_key; /**< the routing key this message was published with */
amqp_message_t message; /**< the message */
} amqp_envelope_t;
However the API for does not pass an envelope object. Here is the definition below.
I call the function with res = amqp_basic_get(conn_image_queue,1,amqp_cstring_bytes(image_queue),1);
My question is, when you call amqp_basic_get()
how do you access the data, where is it stored?
/**
* Do a basic.get
*
* Synchonously polls the broker for a message in a queue, and
* retrieves the message if a message is in the queue.
*
* \param [in] state the connection object
* \param [in] channel the channel identifier to use
* \param [in] queue the queue name to retrieve from
* \param [in] no_ack if true the message is automatically ack'ed
* if false amqp_basic_ack should be called once the message
* retrieved has been processed
* \return amqp_rpc_reply indicating success or failure
*
* \since v0.1
*/
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t AMQP_CALL amqp_basic_get(amqp_connection_state_t state,
amqp_channel_t channel,
amqp_bytes_t queue,
amqp_boolean_t no_ack);
/**
* Wait for and consume a message
*
* Waits for a basic.deliver method on any channel, upon receipt of
* basic.deliver it reads that message, and returns. If any other method is
* received before basic.deliver, this function will return an amqp_rpc_reply_t
* with ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION, and
* ret.library_error == AMQP_STATUS_UNEXPECTED_STATE. The caller should then
* call amqp_simple_wait_frame() to read this frame and take appropriate action.
*
* This function should be used after starting a consumer with the
* amqp_basic_consume() function
*
* \param [in,out] state the connection object
* \param [in,out] envelope a pointer to a amqp_envelope_t object. Caller
* should call #amqp_destroy_envelope() when it is done using
* the fields in the envelope object. The caller is responsible
* for allocating/destroying the amqp_envelope_t object itself.
* \param [in] timeout a timeout to wait for a message delivery. Passing in
* NULL will result in blocking behavior.
* \param [in] flags pass in 0. Currently unused.
* \returns a amqp_rpc_reply_t object. ret.reply_type == AMQP_RESPONSE_NORMAL
* on success. If ret.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION,
* and ret.library_error == AMQP_STATUS_UNEXPECTED_STATE, a frame other
* than AMQP_BASIC_DELIVER_METHOD was received, the caller should call
* amqp_simple_wait_frame() to read this frame and take appropriate
* action.
*
* \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
amqp_rpc_reply_t AMQP_CALL amqp_consume_message(amqp_connection_state_t state,
amqp_envelope_t *envelope,
struct timeval *timeout,
int flags);
The documentation for amqp_read_message
suggests it is intended to be used after amqp_basic_get
, to fetch the message:
Reads a complete message (header + body) on a specified channel. This function is intended to be used with amqp_basic_get() or when an AMQP_BASIC_DELIVERY_METHOD method is received.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With