+ + +import; +import java.util.ArrayList; +import java.util.Hashtable; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; + +import lombok.extern.slf4j.Slf4j; +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsConnectionListener; +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.apache.tomcat.util.codec.binary.Base64; + + +@Slf4j +public class AmqpClient { + private static String accessKey = "LTAI5t9DYVYpzxbmYBx3t4LU"; + private static String accessSecret = "tM4EmXqeohWTxJ5R1AmsItai6zjiaQ"; + private static String consumerGroupId = "DEFAULT_GROUP"; + + //iotInstanceId:实例ID。若是2021年07月30日之前(不含当日)开通的公共实例,请填空字符串。 + private static String iotInstanceId = "iot-06z00hk8iq70r0s"; + + //控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。 + //建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。 + private static String clientId = "boom"; + + //${YourHost}为接入域名,请参见AMQP客户端接入说明文档。 + private static String host = ""; + + // 指定单个进程启动的连接数 + // 单个连接消费速率有限,请参考使用限制,最大64个连接 + // 连接数和消费速率及rebalance相关,建议每500QPS增加一个连接 + private static int connectionCount = 4; + + //业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。 + private final static ExecutorService executorService = new ThreadPoolExecutor( + Runtime.getRuntime().availableProcessors(), + Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, + new LinkedBlockingQueue(50000)); + + public static void main(String[] args) throws Exception { + List connections = new ArrayList<>(); + + //参数说明,请参见AMQP客户端接入说明文档。 + for (int i = 0; i < connectionCount; i++) { + long timeStamp = System.currentTimeMillis(); + //签名方法:支持hmacmd5、hmacsha1和hmacsha256。 + String signMethod = "hmacsha1"; + + //userName组装方法,请参见AMQP客户端接入说明文档。 + String userName = clientId + "-" + i + "|authMode=aksign" + + ",signMethod=" + signMethod + + ",timestamp=" + timeStamp + + ",authId=" + accessKey + + ",iotInstanceId=" + iotInstanceId + + ",consumerGroupId=" + consumerGroupId + + "|"; + //计算签名,password组装方法,请参见AMQP客户端接入说明文档。 + String signContent = "authId=" + accessKey + "×tamp=" + timeStamp; + String password = doSign(signContent, accessSecret, signMethod); + String connectionUrl = "failover:(amqps://" + host + ":5671?amqp.idleTimeout=80000)" + + "?failover.reconnectDelay=30"; + + Hashtable hashtable = new Hashtable<>(); + hashtable.put("connectionfactory.SBCF", connectionUrl); + hashtable.put("queue.QUEUE", "default"); + hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); + Context context = new InitialContext(hashtable); + ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF"); + Destination queue = (Destination)context.lookup("QUEUE"); + // 创建连接。 + Connection connection = cf.createConnection(userName, password); + connections.add(connection); + + ((JmsConnection)connection).addConnectionListener(myJmsConnectionListener); + // 创建会话。 + // Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。 + // Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。 + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + connection.start(); + // 创建Receiver连接。 + MessageConsumer consumer = session.createConsumer(queue); + consumer.setMessageListener(messageListener); + } + +"amqp demo is started successfully, and will exit after 60s "); + + // 结束程序运行 + Thread.sleep(60 * 1000); +"run shutdown"); + + connections.forEach(c-> { + try { + c.close(); + } catch (JMSException e) { + log.error("failed to close connection", e); + } + }); + + executorService.shutdown(); + if (executorService.awaitTermination(10, TimeUnit.SECONDS)) { +"shutdown success"); + } else { +"failed to handle messages"); + } + } + + private static MessageListener messageListener = new MessageListener() { + @Override + public void onMessage(final Message message) { + try { + //1.收到消息之后一定要ACK。 + // 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。 + // 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。 + // message.acknowledge(); + //2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。 + // 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。 + executorService.submit(new Runnable() { + @Override + public void run() { + processMessage(message); + } + }); + } catch (Exception e) { + log.error("submit task occurs exception ", e); + } + } + }; + + /** + * 在这里处理您收到消息后的具体业务逻辑。 + */ + private static void processMessage(Message message) { + try { + byte[] body = message.getBody(byte[].class); + String content = new String(body); + String topic = message.getStringProperty("topic"); + String messageId = message.getStringProperty("messageId"); +"receive message" + + ",\n topic = " + topic + + ",\n messageId = " + messageId + + ",\n content = " + content); + } catch (Exception e) { + log.error("processMessage occurs error ", e); + } + } + + private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() { + /** + * 连接成功建立。 + */ + @Override + public void onConnectionEstablished(URI remoteURI) { +"onConnectionEstablished, remoteUri:{}", remoteURI); + } + + /** + * 尝试过最大重试次数之后,最终连接失败。 + */ + @Override + public void onConnectionFailure(Throwable error) { + log.error("onConnectionFailure, {}", error.getMessage()); + } + + /** + * 连接中断。 + */ + @Override + public void onConnectionInterrupted(URI remoteURI) { +"onConnectionInterrupted, remoteUri:{}", remoteURI); + } + + /** + * 连接中断后又自动重连上。 + */ + @Override + public void onConnectionRestored(URI remoteURI) { +"onConnectionRestored, remoteUri:{}", remoteURI); + } + + @Override + public void onInboundMessage(JmsInboundMessageDispatch envelope) {} + + @Override + public void onSessionClosed(Session session, Throwable cause) {} + + @Override + public void onConsumerClosed(MessageConsumer consumer, Throwable cause) {} + + @Override + public void onProducerClosed(MessageProducer producer, Throwable cause) {} + }; + + /** + * 计算签名,password组装方法,请参见AMQP客户端接入说明文档。 + */ + private static String doSign(String toSignString, String secret, String signMethod) throws Exception { + SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod); + Mac mac = Mac.getInstance(signMethod); + mac.init(signingKey); + byte[] rawHmac = mac.doFinal(toSignString.getBytes()); + return Base64.encodeBase64String(rawHmac); + } +} diff --git a/src/main/java/com/example/demo/ b/src/main/java/com/example/demo/ new file mode 100644 index 0000000..6100d37 --- /dev/null +++ b/src/main/java/com/example/demo/ @@ -0,0 +1,13 @@ +package com.example.demo; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Demo1Application { + + public static void main(String[] args) { +, args); + } + +} diff --git a/src/main/java/com/example/demo/config/ b/src/main/java/com/example/demo/config/ new file mode 100644 index 0000000..4e6ec63 --- /dev/null +++ b/src/main/java/com/example/demo/config/ @@ -0,0 +1,26 @@ +package com.example.demo.config; + + +import com.aliyun.iot20180120.Client; +import com.aliyun.teaopenapi.models.Config; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Configuration +@Slf4j +public class AliMqttConfig { + + private String accessKey = "LTAI5t9DYVYpzxbmYBx3t4LU"; + + private String accessSecret = "tM4EmXqeohWTxJ5R1AmsItai6zjiaQ"; + + @Bean + public Client config() throws Exception { + Config config = new Config(); + config.setAccessKeyId(accessKey); + config.setAccessKeySecret(accessSecret); + config.setRegionId("cn-shanghai"); + return new Client(config); + } +} diff --git a/src/main/java/com/example/demo/config/ b/src/main/java/com/example/demo/config/ new file mode 100644 index 0000000..9370e65 --- /dev/null +++ b/src/main/java/com/example/demo/config/ @@ -0,0 +1,106 @@ +package com.example.demo.config; + +import lombok.extern.slf4j.Slf4j; +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsConnectionListener; +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.apache.tomcat.util.codec.binary.Base64; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import; +import java.util.Hashtable; + +//@Configuration +@Slf4j +public class AmqpConfig { + + private String accessKey = "LTAI5t9DYVYpzxbmYBx3t4LU"; + + private String accessSecret = "tM4EmXqeohWTxJ5R1AmsItai6zjiaQ"; + + private String groupId = "DEFAULT_GROUP"; + + private String clientId = "zyh"; + private String iotInstanceId = "iot-06z00hk8iq70r0s"; + private String host = ""; + + @Bean + public Session consumer() throws Exception { + +"start..."); + + String signMethod = "hmacsha1"; + + long timeStamp = System.currentTimeMillis(); + //userName组装方法,请参见AMQP客户端接入说明文档。 + String userName = clientId + "|authMode=aksign" + + ",signMethod=" + signMethod + + ",timestamp=" + timeStamp + + ",authId=" + accessKey + + ",iotInstanceId=" + iotInstanceId + + ",consumerGroupId=" + groupId + + "|"; +"username:{}",userName); + //计算签名,password组装方法,请参见AMQP客户端接入说明文档。 + String signContent = "authId=" + accessKey + "×tamp=" + timeStamp; + String password = doSign(signContent, accessSecret, signMethod); + String connectionUrl = "failover:(amqps://" + host + ":5671?amqp.idleTimeout=80000)" + + "?failover.reconnectDelay=30"; + + + Hashtable hashtable = new Hashtable<>(); + hashtable.put("connectionfactory.URL", connectionUrl); + hashtable.put("queue.QUEUE", "default"); + hashtable.put("queue.ZYH", "default"); + hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); + + Context context = new InitialContext(hashtable); + + ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("URL"); + Destination queue = (Destination) context.lookup("QUEUE"); + + Connection connection = connectionFactory.createConnection(userName,password); + + ((JmsConnection) connection).addConnectionListener(new ConnectListener()); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + MessageConsumer consumer = session.createConsumer(queue); + + consumer.setMessageListener(message -> { + try { + byte[] body = message.getBody(byte[].class); + String content = new String(body); + String topic = message.getStringProperty("topic"); + String messageId = message.getStringProperty("messageId"); +"receive message" + + ",\n topic = " + topic + + ",\n messageId = " + messageId + + ",\n content = " + content); + } catch (Exception e) { + log.error("processMessage occurs error ", e); + } + }); + + return session; + + } + + private static String doSign(String toSignString, String secret, String signMethod) throws Exception { + SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod); + Mac mac = Mac.getInstance(signMethod); + mac.init(signingKey); + byte[] rawHmac = mac.doFinal(toSignString.getBytes()); + return Base64.encodeBase64String(rawHmac); + } + + +} \ No newline at end of file diff --git a/src/main/java/com/example/demo/config/ b/src/main/java/com/example/demo/config/ new file mode 100644 index 0000000..202f101 --- /dev/null +++ b/src/main/java/com/example/demo/config/ @@ -0,0 +1,58 @@ +package com.example.demo.config; + +import lombok.extern.slf4j.Slf4j; +import org.apache.qpid.jms.JmsConnectionListener; +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; + +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import; + +@Slf4j +public class ConnectListener implements JmsConnectionListener { + + public ConnectListener() { +"ConnectListener init..."); + } + + @Override + public void onConnectionEstablished(URI uri) { + log.error("uri:{}",uri); + } + + @Override + public void onConnectionFailure(Throwable throwable) { + throwable.printStackTrace(); + } + + @Override + public void onConnectionInterrupted(URI uri) { + log.error("uri:{}",uri); + } + + @Override + public void onConnectionRestored(URI uri) { + log.error("uri:{}",uri); + } + + @Override + public void onInboundMessage(JmsInboundMessageDispatch jmsInboundMessageDispatch) { + log.error("jmsInboundMessageDispatch:{}",jmsInboundMessageDispatch); + } + + @Override + public void onSessionClosed(Session session, Throwable throwable) { + log.error("session:{}",throwable); + } + + @Override + public void onConsumerClosed(MessageConsumer messageConsumer, Throwable throwable) { + log.error("messageConsumer:{}",throwable); + } + + @Override + public void onProducerClosed(MessageProducer messageProducer, Throwable throwable) { + log.error("messageProducer:{}",throwable); + } +} diff --git a/src/main/java/com/example/demo/controller/ b/src/main/java/com/example/demo/controller/ new file mode 100644 index 0000000..f443059 --- /dev/null +++ b/src/main/java/com/example/demo/controller/ @@ -0,0 +1,72 @@ +package com.example.demo.controller; + +import cn.hutool.json.JSONUtil; +import com.aliyun.iot20180120.Client; +import com.aliyun.iot20180120.models.PubRequest; +import com.aliyun.iot20180120.models.PubResponse; +import com.example.demo.dto.VoiceData; +import io.netty.util.internal.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.tomcat.util.codec.binary.Base64; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; + +import javax.jms.*; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; +import java.util.Hashtable; + +@RestController +@Slf4j +public class IndexController { + + @Autowired + private Client client; + private Session session; + + private String iotInstanceId = "iot-06z00hk8iq70r0s"; + + @GetMapping("/1") + public void index() throws NamingException, JMSException { + log.error("gogo"); + Hashtable hashtable = new Hashtable<>(); + hashtable.put("queue.QUEUE", "default"); + hashtable.put("queue.ZYH", "default"); + hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory"); + + Context context = new InitialContext(hashtable); + Destination prod = (Destination) context.lookup("ZYH"); + + MessageProducer prodocer = session.createProducer(prod); + + prodocer.send(session.createTextMessage("sssss")); + log.error("bye."); + } + + // http://localhost:8082/msg/zsw38/真的食物/get + @GetMapping("/msg/{device}/{content}/{volume}") + public String index2(@PathVariable("device") String device,@PathVariable("content") String context,@PathVariable("volume") Integer volume) throws Exception { + VoiceData voiceData = new VoiceData(); + voiceData.setContext(context); + voiceData.setVolume(volume); + + String json = JSONUtil.toJsonStr(voiceData); + + String topic = "/hhlz49DDHhl/"+device+"/user/get"; + PubRequest pubRequest = new PubRequest(); + pubRequest.setIotInstanceId(iotInstanceId); + pubRequest.setDeviceName(device); + pubRequest.setMessageContent(Base64.encodeBase64String(json.getBytes())); + pubRequest.setQos(0); + pubRequest.setProductKey("hhlz49DDHhl"); + pubRequest.setTopicFullName(topic); + PubResponse res =; +"device:{},context:{}",device,json); +//"body:{}",res.getBody()); +//"status:{}",res.getStatusCode()); + return res.getBody().getMessageId(); + } +} diff --git a/src/main/java/com/example/demo/dto/ b/src/main/java/com/example/demo/dto/ new file mode 100644 index 0000000..05f01b0 --- /dev/null +++ b/src/main/java/com/example/demo/dto/ @@ -0,0 +1,12 @@ +package com.example.demo.dto; + +import lombok.Data; + +@Data +public class VoiceData { + + private String context; + + private Integer volume; + +} diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml new file mode 100644 index 0000000..0884131 --- /dev/null +++ b/src/main/resources/application.yaml @@ -0,0 +1,2 @@ +server: + port: 8082 \ No newline at end of file diff --git a/src/test/java/com/example/demo/ b/src/test/java/com/example/demo/ new file mode 100644 index 0000000..dd22115 --- /dev/null +++ b/src/test/java/com/example/demo/ @@ -0,0 +1,13 @@ +package com.example.demo; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class Demo1ApplicationTests { + + @Test + void contextLoads() { + } + +}