消息订阅
准备工作
下载安装
下载安装:rabbitmq-server官方下载地址: http://www.rabbitmq.com/download.html(注:如未安装Erlang,请首先安装Erlang)
通过命令行工具直接安装(brew或yum等)
启动服务
当前窗口启动:rabbitmq-server
如需开启插件,在启动前设置:rabbitmq-plugins enable rabbitmq_management
docker启动
启动带管理界面的MQ:
docker run -d --name myrabbitmq -p 5672:5672 -p 15672:15672 docker.io/rabbitmq:3-management
启动不带管理界面的MQ:
docker run -d --name myrabbitmq -p 5672:5672 -p 15672:15672 docker.io/rabbitmq:latest
配置MQ服务
配置文件所在目录为: node/namespaces/your_namespace/config/namespace.toml
配置文件中涉及的配置项包括:
[[service]]
service_name = "MQ" # 如果不配置,表示不启用服务
[amqp]
url = "amqp://guest:guest@localhost:5672/"
autoDeleted = false
mqdb = "MQDB"
mq_persist = true
接口说明
jsonrpc接口
MQ服务对外提供的API有六种,分别用于注册queue,删除queue,以及查询指定节点注册的所有queue,通知平台rabbitmq-broker正常工作,查询当前节点最新的exchangerName,删除指定名称的exchanger;
注册queue
平台接口名称为mq_register,需要一个参数:RegisterMeta结构体;MQ服务将做验签以及创建与发送目标节点绑定的exchanger
curl localhost:8081 --data '{"jsonrpc":"2.0","namespace":"global","method":"mq_register","params":[{"addresses":[],"toBlock":"","signature":"010492bf5d92d97bfbf6ab67eb6b5ceadee91a73a2e33876a339b3d25e23c9cd2322104eef8a9191bab5728f401adcd95a00f7e3f802bb374598f47593fc82bb0bd23045022047204b8b763f7dab117e530a06c0d9ac6ef2a0b77a1179be46f75ca0efcc450c02210098e9836dc9bbf0d8a39206f7c9bdc3148e3805e79e58d98df3cd0cbe2e42aff6","isVerbose":true,"topics":[],"routingKeys":["MQBlock","MQLog","MQException"], "fromBlock":"", "queueName":"gw_node3queue", "from":"B2CC762775FC1AA7486AA2FCF0D7885D4EEE4DA2"}],"id":1}'
返回值示例:
{"jsonrpc":"2.0","namespace":"global","id":1,"code":0,"message":"SUCCESS","result":"{\"queueName\":\"node1queue1\",\"exchangerName\":\"global_fa34664e_1529920070165345052\"}"}
其中的result字段内容为:队列名称(queueName),exchanger名称(exchangerName);如果发生异常,这两个返回值为空字符串
删除queue
平台接口名称为mq_unRegister,需要四个参数,分别为:队列名称,exchanger名称,账户地址,签名(目前只用到前三个)
curl localhost:8081 --data '{"jsonrpc":"2.0","namespace":"global","method":"mq_unRegister","params":["gw_node4queue","global_34d29974_1533616771290750181","B2CC762775FC1AA7486AA2FCF0D7885D4EEE4DA2","010492bf5d92d97bfbf6ab67eb6b5ceadee91a73a2e33876a339b3d25e23c9cd2322104eef8a9191bab5728f401adcd95a00f7e3f802bb374598f47593fc82bb0bd2304502205c7a9e27263fb39ba5e3c6ec1d1fde6bff113577087579a7c23a9c167f93ceaf022100a197f0b514be1dd78ddc480af4372425a26f5b20832b83707e77ae58e1d4994e"],"id":1}'
返回值示例:
{"jsonrpc":"2.0","namespace":"global","id":1,"code":0,"message":"SUCCESS","result":"{"count":1, "success":true, "error":null}"}
其中result字段内容为:队列中还未消费掉的消息数目(count),是否删除成功(success),以及过程中产生的error信息(error,正常情况下为“null”),如果exchangerName与注册queue时的返回值不匹配,会删除失败
查询所有的queueName
平台接口名称为mq_getAllQueueNames,不需要参数
curl localhost:8081 --data '{"jsonrpc":"2.0","namespace":"global","method":"mq_getAllQueueNames","params":[],"id":1}'
返回值示例:
{"jsonrpc":"2.0","namespace":"global","id":1,"code":0,"message":"SUCCESS","result":["node1queue1"]}
通知节点rabbitmq-broker正常工作
平台接口为mq_informNormal,不需要参数
curl localhost:8081 --data '{"jsonrpc":"2.0","namespace":"global","method":"mq_informNormal","params":[""],"id":1}'
返回值示例:
{"jsonrpc":"2.0","namespace":"global","id":1,"code":0,"message":"SUCCESS","result":"{"success":true,"error":null}"}
查询节点的exchangerName
平台接口为mq_getExchangerName,不需要参数
curl localhost:8081 --data '{"jsonrpc":"2.0","namespace":"global","method":"mq_getExchangerName","params":[],"id":1}'
返回值示例:
{"jsonrpc":"2.0","namespace":"global","id":1,"code":0,"message":"SUCCESS","result":"global_fa34664e_1530265355500005853"}
删除exchanger
平台接口为mq_deleteExchanger,参数为exchanger名称
curl localhost:8081 --data '{"jsonrpc":"2.0","namespace":"global","method":"mq_deleteExchanger","params":["global_34d29974_1532417825700513378"],"id":1}'
返回值示例:
{"jsonrpc":"2.0","namespace":"global","id":1,"code":0,"message":"SUCCESS","result":"{"success":true,"error":null}"}
API接口
订阅
方法名称:registerQueue
方法说明:注册queue队列,只有调用了registerQueue,服务才能够正常工作
入参:RegisterMeta结构体
返回值:exchanger名称 queue名称
调用示例:
public String registerQueue(int id){
...
ArrayList<String> array = new ArrayList<String>();
array.add("MQBlock");
array.add("MQLog");
array.add("MQException");
String qname = "node1queue";
MQParam blockInfoParam = new MQParam.Builder().
//队列订阅相关的参数
regRoutingKeys(array). //表示queue绑定的routingKey集合
regQueueName(qname). //表示队列名称,已经使用过的队列名称,没有手动删除前,不可以重复使用
// MQBlock事件订阅需要的参数
setVerbose(false). //表示推送区块时是否需要推送交易列表,true表示推送交易列表
// MQlog事件订阅需要的参数
fromBlock("1"). //表示需要推送log事件的起始区块号
toBlock("2"). //表示需要推送log事件的终止区块号
addAddress("0x1258b70e2d620805a5351d8553ca76606a0c6fc5"). //表示log事件需要匹配的合约地址
addAddress("0xea22068f8ef04f6c4b5b73e55b51c4c758c7276e"). //可多次调用添加多个合约地址
addTopics(new String[]{"topicA", "topicB"}). //表示log事件需要匹配的topic集合
addTopics(new String[]{"topicC", "topicD"}). //可多次调用添加多个topic数组
//请求发起人自身信息
setFrom(TEST_FROM). //表示请求发起者的账户地址,用于权限控制
build(TEST_ACCOUNT_JSON, TEST_PASSWD); //表示请求者以RegisterMeta结构体为内容,使用自己的私钥做sm2签名,格式为16进制字符串
String result = hyperchain.registerQueue(blockInfoParam, SPEC_ID);
return result;
}
/** 输出结果为:
{
"queueName": "node1queue",
"exchangerName": "global_fa34664e_1529591668938108811"
} **/
MQ中事件Demo
以订阅了MQBlock为例,在rabbitmq-broker中收到的事件内容为:
{
"timestamp": 1532443198896718675,
"type": "MQBlock_384ea6bea",
"body": {
"type": "MQBlock_384ea6bea",
"name": "BlockInfo",
"version": "1.4",
"number": "2",
"hash": "0x6ec2dc5e863e7e2445ca9633b6eb46d707005f7f7398a8997f214e7f1f97015c",
"parentHash": "0xb6a10ec6b99d41e182792736d16ec60e54becb39bf49f2fa649e196f6316d2d0",
"writeTime": 1532443198895164511,
"avgTime": 17,
"txcounts": 1,
"merkleRoot": "0xc74e74692af9ad23f320200dccd59c46ba78ac75d8262c02de7d6f94a008062d",
"txs": [{
"extra": "",
"from": "0xb2cc762775fc1aa7486aa2fcf0d7885d4eee4da2",
"to": "0x9298c40187386ee623f52f352eeb0a23c2816302",
"amount": "0",
"timestamp": "1532443198656489814",
"nonce": "1028581882657355",
"payload": "0xab6d1c093132330000000000000000000000000000000000000000000000000000000000",
"signature": "010492bf5d92d97bfbf6ab67eb6b5ceadee91a73a2e33876a339b3d25e23c9cd2322104eef8a9191bab5728f401adcd95a00f7e3f802bb374598f47593fc82bb0bd230440220675a570abf520b2d8f8910f6fb9097f545c77290138c5075301ae1390267d49502201ea918b2e042baa4095a8357cdd29f04dc20fcf6df0f5275f5f1619e79d26d2e",
"version": "1.4",
"txHash": "0x911170e1fb16722180ae35ca132b1bb9dd9b67c72279a49b6f2a1288450dc163",
"vmType": "EVM",
"contractAddress": "0x0000000000000000000000000000000000000000",
"gasUsed": 1323,
"ret": "0x0",
"log": [{
"address": "0x9298c40187386ee623f52f352eeb0a23c2816302",
"topics": ["0x8c5ef8f5670e096749c7d0bd79364744b28e871bc6c65c5541b0b30f727ad8be"],
"data": "3132330000000000000000000000000000000000000000000000000000000000",
"blockNumber": 2,
"blockHash": "0x6ec2dc5e863e7e2445ca9633b6eb46d707005f7f7398a8997f214e7f1f97015c",
"txHash": "0x911170e1fb16722180ae35ca132b1bb9dd9b67c72279a49b6f2a1288450dc163",
"txIndex": 0,
"index": 0
}]
}]
}
}
解订阅
方法名称:unregisterQueue
方法简介:删除指定名字的queue,如果queue不存在或者给出了不匹配的excahngerName,都将删除失败
入参:queue名称(string) exchanger名称(string) 发起者地址(string) 签名(string) 节点ID(int)
返回值:queue删除时仍持有的消息数目 是否删除成功 过程中的错误信息
调用示例:
public String unRegisterQueue(int id) {
String qname = "node1queue";
String exchName = "global_34d29974_1533616771290750181";
String signature = new UnRegParam(qname, exchName).signWithSM2(TEST_ACCOUNT_JSON2, TEST_PASSWD);
String result = hyperchain.unregisterQueue(qname, exchName, TEST_FROM2, signature, 1);
return result;
}
/**
输出结果为:{"count":0, "success":true, "error":"null"}
**/
查询指定节点应注册的所有的queue名称
方法名称:getAllQueueNames
入参:节点ID(int)
返回值:queue名称列表
调用示例:
public String testGetAllQName(int id) {
String result = hyperchain.getAllQueueNames(id);
return result;
}
/**
输出结果为:["node1queue"]
**/
通知平台rabbitmq-broker正常工作
方法名称:informNormal
入参:rabbitmq-broker的完整url 节点ID(int)
简介:
该方法主要用于在broker宕机后,可以通过调用该方法,通知节点,节点会尝试建立连接以及恢复曾经注册的queue
返回值:平台建立连接 队列恢复的结果
调用示例:
public void testInformNormal(String brokerUrl, int id) throws Exception {
// 空字符串表示使用平台默认url
String result = hyperchain.informNormal(brokerUrl, id);
System.out.println(result);
}
/**
输出结果为:{"success":true,"error":null}
**/
删除指定的exchanger
方法名称:deleteExch
入参:exchanger名称(String) 节点ID(int)
返回值:是否删除成功
调用示例:
public String testDeleteExchanger() throws Exception {
...
String exchName = "global_34d29974_1532437541218895978";
String result = hyperchain.deleteExch(exchName, SPEC_ID);
return result
}
/**
输出结果为:"{"success":true,"error":null}"
**/
Demo说明
消费者程序调用receiveMsg()接口,可以消费相应的消息,外部程序可以通过getMessages()接口,获得所有消费者消费掉的消息记录
public class HPCMQClient {
private static String qname = "node1queue1"; // should read from properties
private final ArrayList<String> messages = new ArrayList<String>();
public HPCMQClient() {}
public void receiveMsg() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setPort(5672); // should set your own hostname and port
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(qname, true, false, false, null);
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("[CLIENT] Received '" + message + "'");
messages.add(message);
System.out.println("[CLIENT] Done");
}
};
channel.basicConsume(qname, true, consumer);
}
public ArrayList<String> getMessages() {
return messages;
}
}
操作流程
调用informNormal,通知节点与MQ建立连接
注册队列
正常发送交易,产生消息
消费者等待rabbitmq-broker推送的消息
当queue不再使用时,删除queue
当exchanger没有绑定的queue时,删除exchanger
可订阅事件路由键
MQBlock: 表示每一次提交区块时,主动推送区块内容
MQLog: 合约中的event关键字定义的接口,每次被调用时推送出来的数据
MQException: 每次平台进入非正常状态,主动推送通知消息
订阅规则
订阅规则依赖于一个RegisterMeta结构实现,该结构体表现为一个json格式的字符串作为参数传入RegisterQueue()方法,RegisterMeta中不同的字段应用于不同的事件订阅规则,具体如下:
// 排表方式为:变量名称 | 变量类型 | json格式key | 字段含义
type RegisterMeta struct {
//队列订阅相关的参数
RoutingKeys List<string> // `json:"routingKeys"` 表示queue绑定的routingKey集合
QueueName string // `json:"queueName"` 表示队列名称,已经使用过的队列名称,没有手动删除前,不可以重复使用
//请求发起人自身信息
From string // `json:"from"` 表示请求发起者的账户地址,用于权限控制
Signature string // `json:"signature"` 表示请求者以RegisterMeta结构体为内容,使用自己的私钥做sm2签名,格式为16进制字符串
// MQBlock事件订阅需要的参数
IsVerbose boolean // `json:"isVerbose"` 表示推送区块时是否需要推送交易列表,true表示推送交易列表
// MQlog事件订阅需要的参数
FromBlock string // `json:"fromBlock"` 表示需要推送log事件的起始区块号
ToBlock string // `json:"toBlock"` 表示需要推送log事件的终止区块号
Addresses List<String> // `json:"addresses"` 表示log事件需要匹配的合约地址集合
Topics List<String[]> // `json:"topics"` 表示log事件需要匹配的topic集合
//目前Exception相关的操作不需要用户提供参数,订阅了Exception事件后,所有的异常都会主动推送
}
异常情况处理
broker未启动
broker未启动时,在不注册队列,不需要消息推送时,平台正常输出日志,在用户调用API发起请求注册队列时,平台会打印错误日志
broker宕机
rabbitmq-broker端单方面宕机,宕机之后,平台可以继续的提交区块,但会打印相关的推送消息失败的日志;
在rabbitmq-broker重启恢复后,需要用户再次注册队列,如果注册了与异常发生前同名的队列,则可以自动恢复broker宕机过程中的消息,平台会打印相关的日志提示消息已经恢复
节点宕机
节点宕机重启后,需要用户调用InformNormal接口,MQ服务会自动恢复队列,权限映射关系,并将缓存的数据推送到broker
注意事项
目前Log事件中的data字段编码方式比较特殊,为base64格式,直接将value值按照base64格式解码,即可得到原始的字节数组