PHP 5.6.0RC3 is available

AMQPQueue::consume

(PECL amqp >= Unknown)

AMQPQueue::consumeConsume messages from a queue

Descrição

public void AMQPQueue::consume ( callable $callback [, int $flags = AMQP_NOPARAM ] )

Blocking function that will retrieve the next message from the queue as it becomes available and will pass it off to the callback.

Parâmetros

callback

A callback function to which the consumed message will be passed. The function must accept at a minimum one parameter, an AMQPEnvelope object, and an optional second parameter the AMQPQueue from which the message was consumed.

The AMQPQueue::consume() will not return the processing thread back to the PHP script until the callback function returns FALSE.

flags

A bitmask of any of the flags: AMQP_NOACK.

Erros

Throws AMQPChannelException if the channel is not open.

Throws AMQPConnectionException if the connection to the broker was lost.

Valor Retornado

Exemplos

Exemplo #1 AMQPQueue::consume() example

<?php

     
/* Create a connection using all default credentials: */
     
$connection = new AMQPConnection();
     
$connection->connect();

     
$channel = new AMQPChannel($connection);

     
/* create a queue object */
     
$queue = new AMQPQueue($channel);

     
//declare the queue
     
$queue->declare('myqueue');

     
$i 0;
     function 
processMessage($envelope$queue) {
        global 
$i;
        echo 
"Message $i: " $envelope->getBody() . "\n";
        
$i++;
        if (
$i 10) {
            
// Bail after 10 messages
            
return false;
        }
     }

     
// Consume messages on queue
     
$queue->consume("processMessage");

     
?>

add a note add a note

User Contributed Notes 5 notes

up
2
pinepain at gmail dot com
1 year ago
Be careful using this function with non-zero amqp.timeout (you may check at AMQPConnection::getTimeout), because it looks like timeout value says how long to wait for a new message from broker before die in way like

Fatal error: Uncaught exception 'AMQPConnectionException' with message 'Resource temporarily unavailable' in /path/to/your/file.php:12
Stack trace:
#0  /path/to/your/file.php(12): AMQPQueue->consume(Object(Closure), 128)
#1 {main}
  thrown in /path/to/your/file.php on line 12

As for notes about blocking, system resources greediness and so and so, you can investigate how it works by looking in  amqp_queue.c  for read_message_from_channel C function declaration and PHP_METHOD(amqp_queue_class, consume) method declaration. For me it works perfectly without any uncommon resources usage or I/O performance degradation under the load of 10k 64b message per second with delivery time for less than 0.001 sec.

OS: FreeBSD *** 8.2-RELEASE FreeBSD 8.2-RELEASE #0: Sat Mar ****** 2011     root@*****:****  amd64
PHP: PHP Version => 5.3.10, Suhosin Patch 0.9.10, Zend Engine v2.3.0

php AMQP extnsion:

amqp

Version => 1.0.9
Revision => $Revision: 327551 $
Compiled => Dec 2* 2012 @ *****
AMQP protocol version => 0-9-1
librabbitmq version => 0.2.0

Directive => Local Value => Master Value
amqp.auto_ack => 0 => 0
amqp.host => localhost => localhost
amqp.login => guest => guest
amqp.password => guest => guest
amqp.port => 5672 => 5672
amqp.prefetch_count => 3 => 3
amqp.timeout => 0 => 0
amqp.vhost => / => /

AMQP broker: RabbitMQ 3.0.1, Erlang R14B04

Definitely, such loop will block main thread, but due to single-thread PHP nature it's completely normal behavior. To exit this consumption loop your callback function or method (i prefer to use closures, btw) should return FALSE.

The benefit of this function is that you don't have manually iteration for all messages, and what is more important, if there is no unprocessed messages in queue it will wait for such for you.

So you have just to run you consumer (one or many) and optionally time to time check whether they still alive just for reason if you are not sure about callbacks or memory-limit-critical stuff
up
1
liuxiangchao at ometworks dot com
1 year ago
To consume ALL messages stored DURABLE exchanges, you will need to  set channel's prefetch size parameter to 0:

<?php $channel->setPrefetchCount(0); ?>
up
0
Laurent
1 year ago
Be careful using consume() function on AMQP. It will catch all Exception and fall down in infinite loop (message will not be marked as readed and reput in queue)
up
0
peter dot colclough at toolstation dot com
2 years ago
Using AMQP_consume, against a RabbitMQ server, actually stuffs memory. It will work in a loop, or on a constant recall, so long as your exchange/queue and messages are set to durable. However, it will alo make the system unusable within a couple of minutes.

Using get(), all is fine. I think this may be a bugette in teh PHP access code... ff to take a look.
up
0
hlopetz at gmail dot com
3 years ago
you shouldn't use AMQPQueue::consume() if you have to get _all_ incoming messages. you'll get only "max" number of messages and the queue will be destroyed then.

for my amqp.so v0.2.2 this weird behavior is true.

use AMQPQueue::get() and use "count" param instead.
To Top