Работа RabbitMQ с использованием AMQPStorm

Введение

Примеры

  • 3

    Как использовать сообщения от RabbitMQ

    Начните с импорта библиотеки.

     from amqpstorm import Connection
    
     

    При использовании сообщений сначала нужно определить функцию для обработки входящих сообщений. Это может быть любой вызываемой функции, и должен принимать объект сообщения, или кортеж сообщения ( в зависимости от to_tuple параметра , определенного в start_consuming ).

    Помимо обработки данных из входящего сообщения, мы также должны подтвердить или отклонить сообщение. Это важно, так как мы должны сообщить RabbitMQ, что мы правильно получили и обработали сообщение.

     def on_message(message):
        """This function is called on message received.
    
        :param message: Delivered message.
        :return:
        """
        print("Message:", message.body)
    
        # Acknowledge that we handled the message without any issues.
        message.ack()
    
        # Reject the message.
        # message.reject()
    
        # Reject the message, and put it back in the queue.
        # message.reject(requeue=True)
    
     

    Далее нам нужно настроить соединение с сервером RabbitMQ.

     connection = Connection('127.0.0.1', 'guest', 'guest')
    
     

    После этого нам нужно настроить канал. Каждое соединение может иметь несколько каналов, и, как правило, при выполнении многопоточных задач рекомендуется (но не обязательно) иметь один канал на поток.

     channel = connection.channel()
    
     

    После того, как мы настроили наш канал, нам нужно сообщить RabbitMQ, что мы хотим начать принимать сообщения. В этом случае мы будем использовать нашу ранее определенную on_message функцию , чтобы обрабатывать все наши потребляемые сообщения.

    Очередь мы будем слушать на сервере RabbitMQ будет simple_queue , и мы также говорят RabbitMQ , что мы будем признающее все входящие сообщения , как только мы сделали с ними.

     channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)
    
     

    Наконец, нам нужно запустить цикл ввода-вывода, чтобы начать обработку сообщений, доставляемых сервером RabbitMQ.

     channel.start_consuming(to_tuple=False) 
  • 2

    Как публиковать сообщения на RabbitMQ

    Начните с импорта библиотеки.

     from amqpstorm import Connection
    from amqpstorm import Message
    
     

    Далее нам нужно открыть соединение с сервером RabbitMQ.

     connection = Connection('127.0.0.1', 'guest', 'guest')
    
     

    После этого нам нужно настроить канал. Каждое соединение может иметь несколько каналов, и, как правило, при выполнении многопоточных задач рекомендуется (но не обязательно) иметь один канал на поток.

     channel = connection.channel()
    
     

    Как только мы настроим наш канал, мы можем начать готовить наше сообщение.

     # Message Properties.
    properties = {
        'content_type': 'text/plain',
        'headers': {'key': 'value'}
    }
    
    # Create the message.
    message = Message.create(channel=channel, body='Hello World!', properties=properties)
    
     

    Теперь мы можем опубликовать сообщение, просто вызывая publish и обеспечение routing_key . В этом случае мы будем посылать сообщение в очередь , называется simple_queue .

     message.publish(routing_key='simple_queue') 
  • 1

    Как создать отложенную очередь в RabbitMQ

    Сначала нам нужно настроить два основных канала, один для основной очереди и один для очереди задержки. В конце моего примера я включил пару дополнительных флагов, которые не обязательны, но делают код более надежным; такие , как confirm delivery , delivery_mode и durable . Вы можете найти более подробную информацию о них в RabbitMQ руководстве .

    После настройки каналов мы добавляем привязку к основному каналу, которую мы можем использовать для отправки сообщений из канала задержки в нашу основную очередь.

     channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
    
     

    Затем нам нужно настроить наш канал задержки для пересылки сообщений в основную очередь после истечения срока их действия.

     delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
        'x-message-ttl': 5000,
        'x-dead-letter-exchange': 'amq.direct',
        'x-dead-letter-routing-key': 'hello'
    })
    
     
    • х-сообщение-ТТЛ (Message - Time To Live)

      Обычно это используется для автоматического удаления старых сообщений в очереди через определенное время, но, добавив два необязательных аргумента, мы можем изменить это поведение, и вместо этого этот параметр определяет в миллисекундах, как долго сообщения будут оставаться в очереди с задержкой.

    • х анкерного письмо маршрутизации ключ

      Эта переменная позволяет нам передавать сообщения в другую очередь после истечения срока их действия вместо того, чтобы по умолчанию полностью удалять его.

    • х-буквальный обмен

      Эта переменная определяет, какой Exchange используется для передачи сообщения из hello_delay в очередь hello.

    Публикация в очереди задержки

    Когда мы закончим настройку всех основных параметров Pika, вы просто отправляете сообщение в очередь задержек с помощью базовой публикации.

     delay_channel.basic.publish(exchange='',
                                routing_key='hello_delay',
                                body='test',
                                properties={'delivery_mod': 2})
    
     

    После выполнения сценария вы должны увидеть следующие очереди, созданные в вашем модуле управления RabbitMQ. введите описание изображения здесь

    Пример.

    from amqpstorm import Connection
    
    connection = Connection('127.0.0.1', 'guest', 'guest')
    
    # Create normal 'Hello World' type channel.
    channel = connection.channel()
    channel.confirm_deliveries()
    channel.queue.declare(queue='hello', durable=True)
    
    # We need to bind this channel to an exchange, that will be used to transfer
    # messages from our delay queue.
    channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')
    
    # Create our delay channel.
    delay_channel = connection.channel()
    delay_channel.confirm_deliveries()
    
    # This is where we declare the delay, and routing for our delay channel.
    delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
        'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
        'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
        'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
    })
    
    delay_channel.basic.publish(exchange='',
                                routing_key='hello_delay',
                                body='test',
                                properties={'delivery_mode': 2})
    
    print("[x] Sent")
    
     

Синтаксис

Параметры

Примечания