Docs

Docs

  • 文档
  • github

›操作指南

概述

  • 简介
  • 术语
  • 版本发布

架构特性

  • 整体架构
  • 共识机制
  • 账本存储
  • 智能合约
  • 数据归档
  • 数据可视化
  • 消息订阅

快速入门

  • 部署安装
  • 使用示例
  • 配置说明

操作指南

  • EVM
  • HVM
  • 消息订阅
  • 数据可视化
  • 交互式命令

SDK

  • LITESDK
  • JAVASDK

消息订阅

准备工作

下载安装

下载安装:rabbitmq-server官方下载地址: http://www.rabbitmq.com/download.html(注:如未安装Erlang,请首先安装Erlang)

通过命令行工具直接安装(brew或yum等)

启动服务

当前窗口启动:rabbitmq-server

如需开启插件,在启动前设置:rabbitmq-plugins enable rabbitmq_management

docker启动

启动带管理界面的MQ:

docker run -d --name myrabbitmq -p 5672:5672 -p 15672:15672 docker.io/rabbitmq:3-management

启动不带管理界面的MQ:

docker run -d --name myrabbitmq -p 5672:5672 -p 15672:15672 docker.io/rabbitmq:latest

配置MQ服务

配置文件所在目录为: node/namespaces/your_namespace/config/namespace.toml

配置文件中涉及的配置项包括:

[[service]]
    service_name = "MQ"  # 如果不配置,表示不启用服务
[amqp]
  url             = "amqp://guest:guest@localhost:5672/"
  autoDeleted     = false
  mqdb            = "MQDB"
  mq_persist      = true

接口说明

jsonrpc接口

MQ服务对外提供的API有六种,分别用于注册queue,删除queue,以及查询指定节点注册的所有queue,通知平台rabbitmq-broker正常工作,查询当前节点最新的exchangerName,删除指定名称的exchanger;

注册queue

平台接口名称为mq_register,需要一个参数:RegisterMeta结构体;MQ服务将做验签以及创建与发送目标节点绑定的exchanger

curl localhost:8081 --data '{"jsonrpc":"2.0","namespace":"global","method":"mq_register","params":[{"addresses":[],"toBlock":"","signature":"010492bf5d92d97bfbf6ab67eb6b5ceadee91a73a2e33876a339b3d25e23c9cd2322104eef8a9191bab5728f401adcd95a00f7e3f802bb374598f47593fc82bb0bd23045022047204b8b763f7dab117e530a06c0d9ac6ef2a0b77a1179be46f75ca0efcc450c02210098e9836dc9bbf0d8a39206f7c9bdc3148e3805e79e58d98df3cd0cbe2e42aff6","isVerbose":true,"topics":[],"routingKeys":["MQBlock","MQLog","MQException"], "fromBlock":"", "queueName":"gw_node3queue", "from":"B2CC762775FC1AA7486AA2FCF0D7885D4EEE4DA2"}],"id":1}'

返回值示例:

{"jsonrpc":"2.0","namespace":"global","id":1,"code":0,"message":"SUCCESS","result":"{\"queueName\":\"node1queue1\",\"exchangerName\":\"global_fa34664e_1529920070165345052\"}"}

其中的result字段内容为:队列名称(queueName),exchanger名称(exchangerName);如果发生异常,这两个返回值为空字符串

删除queue

平台接口名称为mq_unRegister,需要四个参数,分别为:队列名称,exchanger名称,账户地址,签名(目前只用到前三个)

curl localhost:8081 --data '{"jsonrpc":"2.0","namespace":"global","method":"mq_unRegister","params":["gw_node4queue","global_34d29974_1533616771290750181","B2CC762775FC1AA7486AA2FCF0D7885D4EEE4DA2","010492bf5d92d97bfbf6ab67eb6b5ceadee91a73a2e33876a339b3d25e23c9cd2322104eef8a9191bab5728f401adcd95a00f7e3f802bb374598f47593fc82bb0bd2304502205c7a9e27263fb39ba5e3c6ec1d1fde6bff113577087579a7c23a9c167f93ceaf022100a197f0b514be1dd78ddc480af4372425a26f5b20832b83707e77ae58e1d4994e"],"id":1}'

返回值示例:

{"jsonrpc":"2.0","namespace":"global","id":1,"code":0,"message":"SUCCESS","result":"{"count":1, "success":true, "error":null}"}

其中result字段内容为:队列中还未消费掉的消息数目(count),是否删除成功(success),以及过程中产生的error信息(error,正常情况下为“null”),如果exchangerName与注册queue时的返回值不匹配,会删除失败

查询所有的queueName

平台接口名称为mq_getAllQueueNames,不需要参数

curl localhost:8081 --data '{"jsonrpc":"2.0","namespace":"global","method":"mq_getAllQueueNames","params":[],"id":1}'

返回值示例:

{"jsonrpc":"2.0","namespace":"global","id":1,"code":0,"message":"SUCCESS","result":["node1queue1"]}

通知节点rabbitmq-broker正常工作

平台接口为mq_informNormal,不需要参数

curl localhost:8081 --data '{"jsonrpc":"2.0","namespace":"global","method":"mq_informNormal","params":[""],"id":1}'

返回值示例:

{"jsonrpc":"2.0","namespace":"global","id":1,"code":0,"message":"SUCCESS","result":"{"success":true,"error":null}"}

查询节点的exchangerName

平台接口为mq_getExchangerName,不需要参数

curl localhost:8081 --data '{"jsonrpc":"2.0","namespace":"global","method":"mq_getExchangerName","params":[],"id":1}'

返回值示例:

{"jsonrpc":"2.0","namespace":"global","id":1,"code":0,"message":"SUCCESS","result":"global_fa34664e_1530265355500005853"}

删除exchanger

平台接口为mq_deleteExchanger,参数为exchanger名称

curl localhost:8081 --data '{"jsonrpc":"2.0","namespace":"global","method":"mq_deleteExchanger","params":["global_34d29974_1532417825700513378"],"id":1}'

返回值示例:

{"jsonrpc":"2.0","namespace":"global","id":1,"code":0,"message":"SUCCESS","result":"{"success":true,"error":null}"}

API接口

订阅

方法名称:registerQueue

方法说明:注册queue队列,只有调用了registerQueue,服务才能够正常工作

入参:RegisterMeta结构体

返回值:exchanger名称 queue名称

调用示例:

  public String registerQueue(int id){
      ...
      ArrayList<String> array = new ArrayList<String>();
      array.add("MQBlock");
      array.add("MQLog");
      array.add("MQException");
      String qname = "node1queue";
      MQParam blockInfoParam = new MQParam.Builder().
                //队列订阅相关的参数
                regRoutingKeys(array).  //表示queue绑定的routingKey集合
                regQueueName(qname).    //表示队列名称,已经使用过的队列名称,没有手动删除前,不可以重复使用
                // MQBlock事件订阅需要的参数
                setVerbose(false).      //表示推送区块时是否需要推送交易列表,true表示推送交易列表
                // MQlog事件订阅需要的参数
                fromBlock("1").         //表示需要推送log事件的起始区块号
                toBlock("2").           //表示需要推送log事件的终止区块号
                addAddress("0x1258b70e2d620805a5351d8553ca76606a0c6fc5").   //表示log事件需要匹配的合约地址
                addAddress("0xea22068f8ef04f6c4b5b73e55b51c4c758c7276e").   //可多次调用添加多个合约地址
                addTopics(new String[]{"topicA", "topicB"}).    //表示log事件需要匹配的topic集合
                addTopics(new String[]{"topicC", "topicD"}).    //可多次调用添加多个topic数组
                //请求发起人自身信息
                setFrom(TEST_FROM).     //表示请求发起者的账户地址,用于权限控制
                build(TEST_ACCOUNT_JSON, TEST_PASSWD);  //表示请求者以RegisterMeta结构体为内容,使用自己的私钥做sm2签名,格式为16进制字符串
      String result = hyperchain.registerQueue(blockInfoParam, SPEC_ID);
      return result;
  }
  /** 输出结果为:
  {
      "queueName": "node1queue",
      "exchangerName": "global_fa34664e_1529591668938108811"
  } **/

MQ中事件Demo

以订阅了MQBlock为例,在rabbitmq-broker中收到的事件内容为:

{
    "timestamp": 1532443198896718675,
    "type": "MQBlock_384ea6bea",
    "body": {
        "type": "MQBlock_384ea6bea",
        "name": "BlockInfo",
        "version": "1.4",
        "number": "2",
        "hash": "0x6ec2dc5e863e7e2445ca9633b6eb46d707005f7f7398a8997f214e7f1f97015c",
        "parentHash": "0xb6a10ec6b99d41e182792736d16ec60e54becb39bf49f2fa649e196f6316d2d0",
        "writeTime": 1532443198895164511,
        "avgTime": 17,
        "txcounts": 1,
        "merkleRoot": "0xc74e74692af9ad23f320200dccd59c46ba78ac75d8262c02de7d6f94a008062d",
        "txs": [{
            "extra": "",
            "from": "0xb2cc762775fc1aa7486aa2fcf0d7885d4eee4da2",
            "to": "0x9298c40187386ee623f52f352eeb0a23c2816302",
            "amount": "0",
            "timestamp": "1532443198656489814",
            "nonce": "1028581882657355",
            "payload": "0xab6d1c093132330000000000000000000000000000000000000000000000000000000000",
            "signature": "010492bf5d92d97bfbf6ab67eb6b5ceadee91a73a2e33876a339b3d25e23c9cd2322104eef8a9191bab5728f401adcd95a00f7e3f802bb374598f47593fc82bb0bd230440220675a570abf520b2d8f8910f6fb9097f545c77290138c5075301ae1390267d49502201ea918b2e042baa4095a8357cdd29f04dc20fcf6df0f5275f5f1619e79d26d2e",
            "version": "1.4",
            "txHash": "0x911170e1fb16722180ae35ca132b1bb9dd9b67c72279a49b6f2a1288450dc163",
            "vmType": "EVM",
            "contractAddress": "0x0000000000000000000000000000000000000000",
            "gasUsed": 1323,
            "ret": "0x0",
            "log": [{
                "address": "0x9298c40187386ee623f52f352eeb0a23c2816302",
                "topics": ["0x8c5ef8f5670e096749c7d0bd79364744b28e871bc6c65c5541b0b30f727ad8be"],
                "data": "3132330000000000000000000000000000000000000000000000000000000000",
                "blockNumber": 2,
                "blockHash": "0x6ec2dc5e863e7e2445ca9633b6eb46d707005f7f7398a8997f214e7f1f97015c",
                "txHash": "0x911170e1fb16722180ae35ca132b1bb9dd9b67c72279a49b6f2a1288450dc163",
                "txIndex": 0,
                "index": 0
            }]
        }]
    }
}

解订阅

方法名称:unregisterQueue

方法简介:删除指定名字的queue,如果queue不存在或者给出了不匹配的excahngerName,都将删除失败

入参:queue名称(string) exchanger名称(string) 发起者地址(string) 签名(string) 节点ID(int)

返回值:queue删除时仍持有的消息数目 是否删除成功 过程中的错误信息

调用示例:

public String unRegisterQueue(int id) {
      String qname = "node1queue";
      String exchName = "global_34d29974_1533616771290750181";
      String signature = new UnRegParam(qname, exchName).signWithSM2(TEST_ACCOUNT_JSON2, TEST_PASSWD);
      String result = hyperchain.unregisterQueue(qname, exchName, TEST_FROM2, signature, 1);
      return result;
   }
   /**
    输出结果为:{"count":0, "success":true, "error":"null"}
   **/   

查询指定节点应注册的所有的queue名称

方法名称:getAllQueueNames

入参:节点ID(int)

返回值:queue名称列表

调用示例:

  public String testGetAllQName(int id) {
      String result = hyperchain.getAllQueueNames(id);
      return result;  
   }
   /**
    输出结果为:["node1queue"]
   **/   

通知平台rabbitmq-broker正常工作

方法名称:informNormal

入参:rabbitmq-broker的完整url 节点ID(int)

简介:

该方法主要用于在broker宕机后,可以通过调用该方法,通知节点,节点会尝试建立连接以及恢复曾经注册的queue

返回值:平台建立连接 队列恢复的结果

调用示例:

public void testInformNormal(String brokerUrl, int id) throws Exception {
        // 空字符串表示使用平台默认url
        String result = hyperchain.informNormal(brokerUrl, id);
        System.out.println(result);
   }
   /**
    输出结果为:{"success":true,"error":null}
   **/  

删除指定的exchanger

方法名称:deleteExch

入参:exchanger名称(String) 节点ID(int)

返回值:是否删除成功

调用示例:

public String testDeleteExchanger() throws Exception {
        ...
        String exchName = "global_34d29974_1532437541218895978";
        String result = hyperchain.deleteExch(exchName, SPEC_ID);
        return result
    }
   /**
    输出结果为:"{"success":true,"error":null}"
   **/  

Demo说明

消费者程序调用receiveMsg()接口,可以消费相应的消息,外部程序可以通过getMessages()接口,获得所有消费者消费掉的消息记录

public class HPCMQClient {
    private static String qname = "node1queue1"; // should read from properties
    private final ArrayList<String> messages = new ArrayList<String>();

    public HPCMQClient() {}

    public void receiveMsg() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setPort(5672);   // should set your own hostname and port
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(qname, true, false, false, null);

        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("[CLIENT] Received '" + message + "'");
                messages.add(message);
                System.out.println("[CLIENT] Done");
            }
        };
        channel.basicConsume(qname, true, consumer);
    }
    
    public ArrayList<String> getMessages() {
        return messages;
    }
}

操作流程

  1. 调用informNormal,通知节点与MQ建立连接

  2. 注册队列

  3. 正常发送交易,产生消息

  4. 消费者等待rabbitmq-broker推送的消息

  5. 当queue不再使用时,删除queue

  6. 当exchanger没有绑定的queue时,删除exchanger

可订阅事件路由键

MQBlock: 表示每一次提交区块时,主动推送区块内容

MQLog: 合约中的event关键字定义的接口,每次被调用时推送出来的数据

MQException: 每次平台进入非正常状态,主动推送通知消息

订阅规则

订阅规则依赖于一个RegisterMeta结构实现,该结构体表现为一个json格式的字符串作为参数传入RegisterQueue()方法,RegisterMeta中不同的字段应用于不同的事件订阅规则,具体如下:

// 排表方式为:变量名称 | 变量类型 | json格式key | 字段含义
type RegisterMeta struct {
    //队列订阅相关的参数
    RoutingKeys    List<string> // `json:"routingKeys"` 表示queue绑定的routingKey集合
    QueueName      string       // `json:"queueName"`   表示队列名称,已经使用过的队列名称,没有手动删除前,不可以重复使用
    
    //请求发起人自身信息
    From           string   // `json:"from"`      表示请求发起者的账户地址,用于权限控制
    Signature      string   // `json:"signature"` 表示请求者以RegisterMeta结构体为内容,使用自己的私钥做sm2签名,格式为16进制字符串

    // MQBlock事件订阅需要的参数
    IsVerbose      boolean   // `json:"isVerbose"` 表示推送区块时是否需要推送交易列表,true表示推送交易列表
    
    // MQlog事件订阅需要的参数
    FromBlock      string           // `json:"fromBlock"` 表示需要推送log事件的起始区块号
    ToBlock        string           // `json:"toBlock"`   表示需要推送log事件的终止区块号
    Addresses      List<String>     // `json:"addresses"` 表示log事件需要匹配的合约地址集合
    Topics         List<String[]>   // `json:"topics"`    表示log事件需要匹配的topic集合
    
    //目前Exception相关的操作不需要用户提供参数,订阅了Exception事件后,所有的异常都会主动推送
}

异常情况处理

broker未启动

broker未启动时,在不注册队列,不需要消息推送时,平台正常输出日志,在用户调用API发起请求注册队列时,平台会打印错误日志

broker宕机

rabbitmq-broker端单方面宕机,宕机之后,平台可以继续的提交区块,但会打印相关的推送消息失败的日志;

在rabbitmq-broker重启恢复后,需要用户再次注册队列,如果注册了与异常发生前同名的队列,则可以自动恢复broker宕机过程中的消息,平台会打印相关的日志提示消息已经恢复

节点宕机

节点宕机重启后,需要用户调用InformNormal接口,MQ服务会自动恢复队列,权限映射关系,并将缓存的数据推送到broker

注意事项

目前Log事件中的data字段编码方式比较特殊,为base64格式,直接将value值按照base64格式解码,即可得到原始的字节数组

← HVM数据可视化 →
  • 准备工作
    • 下载安装
    • 启动服务
    • docker启动
    • 配置MQ服务
  • 接口说明
    • jsonrpc接口
    • API接口
  • Demo说明
  • 操作流程
    • 可订阅事件路由键
    • 订阅规则
  • 异常情况处理
  • 注意事项
QTechGitHub
Copyright © 2025 Hyperchain Co., Ltd.