Identifying the message sender with Rabbit MQ and Kombu

Yesterday I showed how to identify a user when using the Pika library. However, Oslo Messaging still relies on the Kombu library. This, too, supports matching the user_id in the message to the username used to authenticate to the broker.

Again, a modification of an example from the documentation.

The important modification is to add the user_id to the publish call.

producer.publish(
    {'name': '/tmp/lolcat1.avi', 'size': 1301013},
    exchange=media_exchange, routing_key='video',
    declare=[video_queue], user_id=rabbit_userid)

This Kombu based code does not raise an exception if the message is rejected. However, looking at the count of the number of messages and the dump of the properties on the consumer, only those where the usernames match.

The sender sends two message,on that matches, one that does not. Count the messages in the video queue

$ sudo rabbitmqctl list_queues | grep video
video	0

Queue is empty.

Send two messages, one where the name matches, one where it does not.

$ python kombu-sender.py
owned message sent
misowned message sent

Now check there is only one message in the queue:

$ sudo rabbitmqctl list_queues | grep video
video	1

And receive the message:

$ python kombu-recv.py
recved
{u'name': u'/tmp/lolcat1.avi', u'size': 1301013}
sent by 
a5f56bdb395f53864a80b95f45dc395e94c546c7
$ sudo rabbitmqctl list_queues | grep video
video	0

Only the one where the name matches is passed through.

Here is the full code.
kombu-sender.py


from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print body
    message.ack()


rabbit_host = '10.149.2.1'
rabbit_userid = 'a5f56bdb395f53864a80b95f45dc395e94c546c7'
rabbit_password = '06814091f31ad50b55a3509e9e3916082cce556d'


# connections                                                                                   
with Connection('amqp://%s:%s@%s//' % (rabbit_userid, rabbit_password, rabbit_host)) as conn:

    # produce                                                                                   
    producer = conn.Producer(serializer='json')
    try:
        producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013},
                         exchange=media_exchange, routing_key='video',
                         declare=[video_queue], user_id=rabbit_userid)
        print("owned message sent")
    except Exception as e:
        print(e)
        raise e
    try:
        producer.publish({'name': '/tmp/phish.avi', 'size': 1301013},
                         exchange=media_exchange, routing_key='video',
                         declare=[video_queue], user_id='fake_user')
	print("misowned message sent")
    except Exception as e:
        print(e)
        raise e

kombu-recv.py


from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', 'direct', durable=True)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print ('recved')
    print body
    print ('sent by ')
    print message.properties.get('user_id','no user id in message')
    message.ack()


rabbit_host = '10.149.2.1'
rabbit_userid = 'a5f56bdb395f53864a80b95f45dc395e94c546c7'
rabbit_password = '06814091f31ad50b55a3509e9e3916082cce556d'


# connections                                                                                   
with Connection('amqp://%s:%s@%s//' % (rabbit_userid, rabbit_password, rabbit_host)) as conn:

    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
	# Process messages and handle events on all channels                                    
	while True:
            conn.drain_events()

# Consume from several queues on the same channel:                                              
video_queue = Queue('video', exchange=media_exchange, key='video')
image_queue = Queue('image', exchange=media_exchange, key='image')

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.