import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.*;
import java.util.*;
public class SqsUtil {
private static final String ARN_ATTRIBUTE_NAME = "QueueArn";
private static AmazonSQS sqs;
static {
sqs = AmazonSQSClientBuilder.standard().withRegion(Regions.CN_NORTH_1).build();
}
private SqsUtil() {
}
// 根据Queue Name查询Url
public static String getQueueUrl(String queueName) {
return sqs.getQueueUrl(queueName).getQueueUrl();
}
// 创建Queue
public static String createQueue(String queueName) {
System.out.println("Creating a new SQS queue called " + queueName);
CreateQueueRequest createQueueRequest = new CreateQueueRequest(queueName);
Map attributes = new HashMap();
// 接收消息等待时间
attributes.put("ReceiveMessageWaitTimeSeconds", "30");
createQueueRequest.withAttributes(attributes);
return sqs.createQueue(createQueueRequest).getQueueUrl();
}
// 创建死信Queue
public static String createDeadLetterQueue(String queueName) {
String queueUrl = createQueue(queueName);
// 配置Dead Letter Queue时使用ARN
return getQueueArn(queueUrl);
}
// 配置死信Queue
public static void configDeadLetterQueue(String queueUrl, String deadLetterQueueArn) {
System.out.println("Config dead letter queue for " + queueUrl);
Map attributes = new HashMap();
// 最大接收次数设为5,当接收次数超过5后,消息未被处理和删除将被转到死信队列
attributes.put("RedrivePolicy", "{\"maxReceiveCount\":\"5\", \"deadLetterTargetArn\":\"" + deadLetterQueueArn + "\"}");
sqs.setQueueAttributes(queueUrl, attributes);
}
// 发送消息
public static void sendMessage(String queueUrl, String message) {
System.out.println("Sending a message to " + queueUrl);
SendMessageRequest request = new SendMessageRequest();
request.withQueueUrl(queueUrl);
request.withMessageBody(message);
Map messageAttributes = new HashMap();
// 添加消息属性,注意必须要有DataType和Value
messageAttributes.put("Hello", new MessageAttributeValue().withDataType("String").withStringValue("COCO"));
request.withMessageAttributes(messageAttributes);
sqs.sendMessage(request);
}
// 接收消息
public static void receiveMessages(String queueUrl) {
System.out.println("Receiving messages from " + queueUrl);
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);
receiveMessageRequest.setMaxNumberOfMessages(5);
receiveMessageRequest.withWaitTimeSeconds(10);
// 要添加MessageAttributeNames,否则不能接收
receiveMessageRequest.setMessageAttributeNames(Arrays.asList("Hello"));
List messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
for (Message message : messages) {
System.out.println("Message: " + message.getBody());
for (Map.Entry entry : message.getMessageAttributes().entrySet()) {
System.out.println(" Attribute");
System.out.println(" Name: " + entry.getKey());
System.out.println(" Value: " + entry.getValue().getStringValue());
}
// 删除消息
System.out.println("Deleting a message.");
sqs.deleteMessage(queueUrl, message.getReceiptHandle());
}
}
// 删除Queue
public static void deleteQueue(String queueUrl) {
System.out.println("Deleting the queue " + queueUrl);
sqs.deleteQueue(queueUrl);
}
// 查询Queue Arn
public static String getQueueArn(String queueUrl) {
List attributes = new ArrayList();
attributes.add(ARN_ATTRIBUTE_NAME);
GetQueueAttributesResult queueAttributes = sqs.getQueueAttributes(queueUrl, attributes);
return queueAttributes.getAttributes().get(ARN_ATTRIBUTE_NAME);
}
}
测试一下:
// 创建Dead Letter Queue
String deadLetterQueueArn = createDeadLetterQueue("DeadLetterQueue");
// 创建Task Queue
String queueUrl = createQueue("TaskQueue");
// 配置Dead Letter Queue
configDeadLetterQueue(queueUrl, deadLetterQueueArn);
// 发送Message
for (int i = 0; i < 6; i++) {
sendMessage(queueUrl, "Hello COCO " + i);
}
// 接收Message
receiveMessages(queueUrl);
// 删除Queue
deleteQueue(queueUrl); 构造与解析消息
SQS消息体是字符串,可以使用jackson-databind进行对象与JSON字符串转换,来发送、接收消息。 JsonUtil
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
public final class JsonUtil {
private static ObjectMapper mapper = new ObjectMapper();
private JsonUtil() {
}
public static String generate(Object object) throws JsonProcessingException {
return mapper.writeValueAsString(object);
}
public static T parse(String content, Class valueType) throws IOException {
return mapper.readValue(content, valueType);
}
} SNS
Amazon Simple Notification Service (Amazon SNS)是AWS消息通知服务,发布者通过创建消息并将消息发送至主题与订阅者进行异步交流。
订阅者可以为Web服务器、电子邮件地址、SQS队列、Lambda函数。一个主题可以有多个或多种订阅者。
POM依赖
com.amazonaws
aws-java-sdk-sns
示例代码
下面列出了常用的方法:创建、删除主题,创建、删除订阅,确认订阅,发布消息。
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sns.model.*;
public class SnsUtil {
private static AmazonSNS sns;
static {
sns = AmazonSNSClientBuilder.standard().withRegion(Regions.CN_NORTH_1).build();
}
private SnsUtil() {
}
/**
* Creates a topic to which notifications can be published
*/
public static CreateTopicResult createTopic(String name) {
return sns.createTopic(name);
}
/**
* Deletes a topic and all its subscriptions
*/
public static DeleteTopicResult deleteTopic(String topicArn) {
return sns.deleteTopic(topicArn);
}
/**
* Prepares to subscribe an endpoint by sending the endpoint a confirmation message
*/
public static SubscribeResult subscribe(String topicArn, String protocol, String endpoint) {
return sns.subscribe(topicArn, protocol, endpoint);
}
/**
* Deletes a subscription
*/
public static UnsubscribeResult unsubscribe(String subscriptionArn) {
return sns.unsubscribe(subscriptionArn);
}
/**
* Verifies an endpoint owner's intent to receive messages by validating the token sent to the endpoint by an earlier Subscribe action
*/
public static ConfirmSubscriptionResult confirmSubscription(String topicArn, String token) {
return sns.confirmSubscription(topicArn, token);
}
/**
* Sends a message to an Amazon SNS topic
*/
public static PublishResult publish(String topicArn, String message, String subject) {
return sns.publish(topicArn, message, subject);
}
}
以Email为例,创建主题、订阅方法如下: