Документация по Python

Работа RabbitMQ с использованием AMQPStorm

В: Документация по Python

Введение

Примеры

Как использовать сообщения от RabbitMQ

Начните с импорта библиотеки.

 from amqpstorm import Connection

 

При использовании сообщений сначала нужно определить функцию для обработки входящих сообщений. Это может быть любой вызываемой функции, и должен принимать объект сообщения, или кортеж сообщения ( в зависимости от to_tuple параметра , определенного в start_consuming ).

Помимо обработки данных из входящего сообщения, мы также должны подтвердить или отклонить сообщение. Это важно, так как мы должны сообщить RabbitMQ, что мы правильно получили и обработали сообщение.

 def on_message(message):
    """This function is called on message received.

    :param message: Delivered message.
    :return:
    """
    print("Message:", message.body)

    # Acknowledge that we handled the message without any issues.
    message.ack()

    # Reject the message.
    # message.reject()

    # Reject the message, and put it back in the queue.
    # message.reject(requeue=True)

 

Далее нам нужно настроить соединение с сервером RabbitMQ.

 connection = Connection('127.0.0.1', 'guest', 'guest')

 

После этого нам нужно настроить канал. Каждое соединение может иметь несколько каналов, и, как правило, при выполнении многопоточных задач рекомендуется (но не обязательно) иметь один канал на поток.

 channel = connection.channel()

 

После того, как мы настроили наш канал, нам нужно сообщить RabbitMQ, что мы хотим начать принимать сообщения. В этом случае мы будем использовать нашу ранее определенную on_message функцию , чтобы обрабатывать все наши потребляемые сообщения.

Очередь мы будем слушать на сервере RabbitMQ будет simple_queue , и мы также говорят RabbitMQ , что мы будем признающее все входящие сообщения , как только мы сделали с ними.

 channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)

 

Наконец, нам нужно запустить цикл ввода-вывода, чтобы начать обработку сообщений, доставляемых сервером RabbitMQ.

 channel.start_consuming(to_tuple=False) 

Как публиковать сообщения на RabbitMQ

Начните с импорта библиотеки.

 from amqpstorm import Connection
from amqpstorm import Message

 

Далее нам нужно открыть соединение с сервером RabbitMQ.

 connection = Connection('127.0.0.1', 'guest', 'guest')

 

После этого нам нужно настроить канал. Каждое соединение может иметь несколько каналов, и, как правило, при выполнении многопоточных задач рекомендуется (но не обязательно) иметь один канал на поток.

 channel = connection.channel()

 

Как только мы настроим наш канал, мы можем начать готовить наше сообщение.

 # Message Properties.
properties = {
    'content_type': 'text/plain',
    'headers': {'key': 'value'}
}

# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)

 

Теперь мы можем опубликовать сообщение, просто вызывая publish и обеспечение routing_key.В этом случае мы будем посылать сообщение в очередь , называется simple_queue .

 message.publish(routing_key='simple_queue') 

Как создать отложенную очередь в RabbitMQ

Сначала нам нужно настроить два основных канала, один для основной очереди и один для очереди задержки. В конце моего примера я включил пару дополнительных флагов, которые не обязательны, но делают код более надежным; такие , как confirm delivery , delivery_mode и durable.Вы можете найти более подробную информацию о них в RabbitMQ руководстве .

После настройки каналов мы добавляем привязку к основному каналу, которую мы можем использовать для отправки сообщений из канала задержки в нашу основную очередь.

 channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

 

Затем нам нужно настроить наш канал задержки для пересылки сообщений в основную очередь после истечения срока их действия.

 delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000,
    'x-dead-letter-exchange': 'amq.direct',
    'x-dead-letter-routing-key': 'hello'
})

 

х-сообщение-ТТЛ (Message - Time To Live)

Обычно это используется для автоматического удаления старых сообщений в очереди через определенное время, но, добавив два необязательных аргумента, мы можем изменить это поведение, и вместо этого этот параметр определяет в миллисекундах, как долго сообщения будут оставаться в очереди с задержкой.

х анкерного письмо маршрутизации ключ

Эта переменная позволяет нам передавать сообщения в другую очередь после истечения срока их действия вместо того, чтобы по умолчанию полностью удалять его.

х-буквальный обмен

Эта переменная определяет, какой Exchange используется для передачи сообщения из hello_delay в очередь hello.

Публикация в очереди задержки

Когда мы закончим настройку всех основных параметров Pika, вы просто отправляете сообщение в очередь задержек с помощью базовой публикации.

 delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mod': 2})

 

После выполнения сценария вы должны увидеть следующие очереди, созданные в вашем модуле управления RabbitMQ.

Пример.

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
    'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
    'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mode': 2})

print("[x] Sent")

 

Синтаксис

Параметры

Примечания

Еще от кодкамп
Замечательно! Вы успешно подписались.
Добро пожаловать обратно! Вы успешно вошли
Вы успешно подписались на кодкамп.
Срок действия вашей ссылки истек.
Ура! Проверьте свою электронную почту на наличие волшебной ссылки для входа.
Успех! Ваша платежная информация обновлена.
Ваша платежная информация не была обновлена.