Yesterday I showed how to identify a user when using the Pika library. However, Oslo Messaging still relies on the Kombu library. This, too, supports matching the user_id in the message to the username used to authenticate to the broker.
Again, a modification of an example from the documentation.
The important modification is to add the user_id to the publish call.
producer.publish( {'name': '/tmp/lolcat1.avi', 'size': 1301013}, exchange=media_exchange, routing_key='video', declare=[video_queue], user_id=rabbit_userid)
This Kombu based code does not raise an exception if the message is rejected. However, looking at the count of the number of messages and the dump of the properties on the consumer, only those where the usernames match.
The sender sends two message,on that matches, one that does not. Count the messages in the video queue
$ sudo rabbitmqctl list_queues | grep video video 0
Queue is empty.
Send two messages, one where the name matches, one where it does not.
$ python kombu-sender.py owned message sent misowned message sent
Now check there is only one message in the queue:
$ sudo rabbitmqctl list_queues | grep video video 1
And receive the message:
$ python kombu-recv.py recved {u'name': u'/tmp/lolcat1.avi', u'size': 1301013} sent by a5f56bdb395f53864a80b95f45dc395e94c546c7 $ sudo rabbitmqctl list_queues | grep video video 0
Only the one where the name matches is passed through.
Here is the full code.
kombu-sender.py
from kombu import Connection, Exchange, Queue media_exchange = Exchange('media', 'direct', durable=True) video_queue = Queue('video', exchange=media_exchange, routing_key='video') def process_media(body, message): print body message.ack() rabbit_host = '10.149.2.1' rabbit_userid = 'a5f56bdb395f53864a80b95f45dc395e94c546c7' rabbit_password = '06814091f31ad50b55a3509e9e3916082cce556d' # connections with Connection('amqp://%s:%s@%s//' % (rabbit_userid, rabbit_password, rabbit_host)) as conn: # produce producer = conn.Producer(serializer='json') try: producer.publish({'name': '/tmp/lolcat1.avi', 'size': 1301013}, exchange=media_exchange, routing_key='video', declare=[video_queue], user_id=rabbit_userid) print("owned message sent") except Exception as e: print(e) raise e try: producer.publish({'name': '/tmp/phish.avi', 'size': 1301013}, exchange=media_exchange, routing_key='video', declare=[video_queue], user_id='fake_user') print("misowned message sent") except Exception as e: print(e) raise e
kombu-recv.py
from kombu import Connection, Exchange, Queue media_exchange = Exchange('media', 'direct', durable=True) video_queue = Queue('video', exchange=media_exchange, routing_key='video') def process_media(body, message): print ('recved') print body print ('sent by ') print message.properties.get('user_id','no user id in message') message.ack() rabbit_host = '10.149.2.1' rabbit_userid = 'a5f56bdb395f53864a80b95f45dc395e94c546c7' rabbit_password = '06814091f31ad50b55a3509e9e3916082cce556d' # connections with Connection('amqp://%s:%s@%s//' % (rabbit_userid, rabbit_password, rabbit_host)) as conn: with conn.Consumer(video_queue, callbacks=[process_media]) as consumer: # Process messages and handle events on all channels while True: conn.drain_events() # Consume from several queues on the same channel: video_queue = Queue('video', exchange=media_exchange, key='video') image_queue = Queue('image', exchange=media_exchange, key='image')