一、导入依赖
开发人员可以从demo工程中将h2-client
,ddq-common
,xlink-utils
三个jar包移动到自己工程的lib/目录下。
开发人员在自己的maven工程,并通过本地引入依赖包的方式集成ddq-h2-client。在pom文件添加如下内容:
<properties>
<log4j2.version>2.17.0</log4j2.version>
<slf4j.version>1.7.25</slf4j.version>
<xlink.utils.version>1.4.5</xlink.utils.version>
<xlink.h2.client.version>3.5.8</xlink.h2.client.version>
<xlink.ddq.common.version>3.5.8</xlink.ddq.common.version>
</properties>
<!-- 本地jar包 -->
<dependency>
<groupId>cn.xlink</groupId>
<artifactId>h2-client</artifactId>
<version>${xlink.h2.client.version}</version>
<scope>system</scope>
<systemPath>${pom.basedir}/lib/h2-client-${xlink.h2.client.version}.jar</systemPath>
</dependency>
<dependency>
<groupId>cn.xlink</groupId>
<artifactId>ddq-common</artifactId>
<version>${xlink.ddq.common.version}</version>
<scope>system</scope>
<systemPath>${pom.basedir}/lib/ddq-common-${xlink.ddq.common.version}.jar</systemPath>
</dependency>
<dependency>
<groupId>cn.xlink</groupId>
<artifactId>xlink-utils</artifactId>
<version>${xlink.utils.version}</version>
<scope>system</scope>
<systemPath>${pom.basedir}/lib/xlink-utils-${xlink.utils.version}.jar</systemPath>
</dependency>
<!-- ddq-h2-client 所使用的依赖库 -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.45.Final</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.2-jre</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
二、创建客户端并开始连接
1、定义消息处理类
开发者需要实现DdqClientMessageHandler
接口,在这个回调接口中完成自己的处理逻辑。接口传入的参数分别是:
- topic:消息的主题。每一种消息都有固定的主题。
- messageId:消息ID,用于客户端向服务端进行消息确认。
- payload:具体的消息内容
因为客户端采用单线程拉取数据,并且回调messageRead方式传递数据到上层应用。开发者在实现DddClientMessageHandler的messageRead方法时,请将处理逻辑放到其他的业务线程执行。否则会影响到客户端拉取数据的速度。
/**
* 具体的处理函数
*/
public static class DataReceiverDisplay implements DdqClientMessageHandler {
@Override
public void messageRead(String topic, String messageId, JSONObject payload) {
/* 以物模型属性上报为例子:
* topic=/service/device-biz/attribute,
* messageId=消息的唯一ID
* payload内容是->
* {
"version": 1,
"time": "触发时间",
"type": "service",
"id": "设备唯一ID",
"corp_id": "企业ID",
"iot": {
"id": "IoT设备ID",
"mac": "IoT设备MAC",
"product": {
"id": "IoT产品ID"
},
"attribute": [{
"exception_info": {
"time": 1642*****0036,
"status": 2
},
"index": 1,
"field": "mode",
"value": "close"
}]
},
"project": {
"id": "项目ID"
}
}
*/
// 以下代码只是一个demo,用于展示怎么解析消息字段。
log.debug("dataPayload {}", payload);
String corpId = payload.getString("corp_id");
JSONObject iot = payload.getJSONObject("iot");
int deviceId = iot.getInteger("id");
String mac = iot.getString("mac");
JSONArray attributes = iot.getJSONArray("attribute");
for (int i = 0, size = attributes.size(); i < size; i++) {
JSONObject item = attributes.getJSONObject(i);
String fieldName = item.getString("field");
Object value = item.get("value");
}
}
}
2、创建一个DdqH2Client并启动连接
DdqH2ClientBuilder
具体参数参考DDQ配置说明
一般情况下,一个进程只需要启动一个DdqH2Client即可。
// 通过构造器DdqH2ClientBuilder进行创建
DdqH2Client h2Client = new DdqH2ClientBuilder().setHost(host).setPort(port).setHeartbeatInterval(100000) .setHeartbeatTimeoutThreshold(300000).setClientMode(ClientModeType.Pull)
.setPullLimit(pullLimit)
.setPullInterval(pullPeriod)
.setClientId(clientIdStr)
.setAppId(appId)
.setAppSecret(appSecret)
.setAuthType(authType)
.setAuthHost(authHost)
.setAutoAck(true)
.setClientMode(ClientModeType.Pull)
.setNettyIoThreadNum(Integer.valueOf(threadNum))
.setGroupId(groupId)
.build();
h2Client.addListener(new DataReceiverDisplay());
// 开始连接服务端并进行认证
h2Client.start();