0%

Python 操作 RocketMQ

安装pip

1
2
3
4
yum install python-pip -y

# 升级pip
pip install -upgrade pip

推送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from rocketmq.client import Producer, Message

producer = Producer('PID-XXX')
producer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
# producer.set_namesrv_addr('127.0.0.1:9887')
producer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
producer.start()

msg = Message('YOUR-TOPIC')
msg.set_keys('XXX')
msg.set_tags('XXX')
msg.set_body('XXXX')
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()

推送消息的时候,如果消息所占字节太长,需要手动设置size,代码中设置的是1M。

1
producer = Producer('PID-001',max_message_size=1024*1024)

消费方式PullConsumer(全部消费)(可重复消费)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from rocketmq.client import PullConsumer


consumer = PullConsumer('CID_XXX')
consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
# consumer.set_namesrv_addr('127.0.0.1:9887')
consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
consumer.start()

for msg in consumer.pull('YOUR-TOPIC'):
print(msg.id, msg.body)

consumer.shutdown()

消费方式PushConsumer(即时消费)(不可重复消费)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time

from rocketmq.client import PushConsumer


def callback(msg):
print(msg.id, msg.body)


consumer = PushConsumer('CID_XXX')
consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
# consumer.set_namesrv_addr('127.0.0.1:9887')
consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN') # No need to call this function if you don't use Aliyun.
consumer.subscribe('YOUR-TOPIC', callback)
consumer.start()

while True:
time.sleep(3600)

consumer.shutdown()