TestContainers,LocalStack SES并发布给SNS

发布于 2025-01-19 13:32:10 字数 16366 浏览 0 评论 0原文

我正在使用 Spring Boot 2.6.3、testcontainers 版本 1.16.3 和 aws-java-sdk 版本 1.12.178。我正在尝试使用 testcontainers 和 testcontainer localstack 模块创建集成测试。测试是向 SES 发送电子邮件,并让 SES 将消息发布到 SNS 主题,其中 SQS 队列订阅该 SNS 主题。

我已经成功构建了一个测试,将消息直接发布到 SNS 并从 SQS 队列中读取消息,但是通过 SES,消息永远不会发送到 SQS,或者至少不会发布。

我必须认为我没有正确设置 ACL,或者可能通过 Localstack 的 SES 不允许发布到其他 AWS 服务。

我哪里错了?

build.gradle

plugins {
    id 'java'
    id 'jacoco'
    id 'idea'
    id 'maven-publish'
    id 'signing'
    id 'com.palantir.docker' version '0.32.0'
    id 'net.researchgate.release' version '2.8.1'
    id 'io.spring.dependency-management' version '1.0.11.RELEASE'
    id 'org.springframework.boot' version '2.6.3'
    id 'com.diffplug.spotless' version '6.2.0'
}

dependencyManagement {
    imports {
        mavenBom 'io.awspring.cloud:spring-cloud-aws-dependencies:2.3.3'
        mavenBom 'com.amazonaws:aws-java-sdk-bom:1.12.178'
    }
}

dependencies {
    // implementation
    implementation 'org.springframework.boot:spring-boot-starter-log4j2'
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-validation'
    implementation 'org.projectlombok:lombok'
    implementation 'net.devh:grpc-server-spring-boot-starter:2.13.1.RELEASE'
    implementation 'org.springframework.data:spring-data-envers'

    

    // These dependencies (spring-boot-starter-data-jpa, liquibase-core, postgresql) are needed to
    // have liquibase startup and execute the changelog if you don't have jpa nothing gets picked
    // up, if you don't have postgresql we bomb out stating that it can't find a driver
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    implementation 'org.liquibase:liquibase-core'
    implementation 'org.postgresql:postgresql'

    implementation 'org.springframework:spring-jms'
    implementation 'com.amazonaws:amazon-sqs-java-messaging-lib:1.0.8'
    implementation 'com.amazonaws:aws-java-sdk-sns'
    implementation 'com.amazonaws:aws-java-sdk-sts'
    implementation 'com.amazonaws:aws-java-sdk-core'
    implementation 'com.amazonaws:aws-java-sdk-ses'
    implementation 'io.awspring.cloud:spring-cloud-starter-aws-secrets-manager-config'

    annotationProcessor 'org.projectlombok:lombok'

    // runtime
    runtimeOnly 'org.apache.logging.log4j:log4j-layout-template-json'

    // testing implementations
    testImplementation 'org.mockito:mockito-inline'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    testImplementation 'org.junit.jupiter:junit-jupiter-api'
    testImplementation 'org.junit.jupiter:junit-jupiter-params'
    testImplementation 'org.junit.jupiter:junit-jupiter-engine'
    testImplementation 'org.testcontainers:testcontainers:1.16.3'
    testImplementation 'org.testcontainers:junit-jupiter:1.16.3'
    testImplementation 'org.testcontainers:postgresql:1.16.3'
    testImplementation "org.testcontainers:localstack:1.16.3"
    integrationTestImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
}

AbstractIT.java

@ActiveProfiles("it")
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
@ContextConfiguration(initializers = AbstractIT.DockerPostgreDataSourceInitializer.class)
@Transactional
@Testcontainers
public class AbstractIT {

  public static final PostgreSQLContainer<?> postgresDBContainer =
      new PostgreSQLContainer<>("postgres:12.1");

  static {
    postgresDBContainer.start();
  }

  public static class DockerPostgreDataSourceInitializer
      implements ApplicationContextInitializer<ConfigurableApplicationContext> {

    @Override
    @DynamicPropertySource
    public void initialize(ConfigurableApplicationContext applicationContext) {
      TestPropertySourceUtils.addInlinedPropertiesToEnvironment(
          applicationContext,
          "spring.datasource.web.hikari.jdbc-url=" + postgresDBContainer.getJdbcUrl(),
          "spring.datasource.web.hikari.username=" + postgresDBContainer.getUsername(),
          "spring.datasource.password=" + postgresDBContainer.getPassword());
    }
  }
}

LocalStackAbstractIT.java

@Testcontainers
public class LocalStackAbstractIT extends AbstractIT {

  @ClassRule
  public static final LocalStackContainer localStackContainer =
      new LocalStackContainer(DockerImageName.parse("localstack/localstack").withTag("0.14.2"))
          .withServices(Service.S3, Service.SES, Service.SNS, Service.SQS)
          .withEnv("AWS_ACCESS_KEY_ID", "accesskey")
          .withEnv("AWS_SECRET_ACCESS_KEY", "secretkey");

  static {
    localStackContainer.start();
    try {
      verifyEmail();
    } catch (IOException | InterruptedException e) {
      e.printStackTrace();
    }
  }

  private static void verifyEmail() throws IOException, InterruptedException {
    String command =
        "aws ses verify-email-identity --email-address [email protected] --region us-east-1 --endpoint-url http://localhost:4566";
    ExecResult result = localStackContainer.execInContainer(command.split(" "));
    if (result.getExitCode() != 0) {
      throw new RuntimeException(result.getStderr());
    }
  }

  @DynamicPropertySource
  static void overrideConfiguration(DynamicPropertyRegistry registry) {
    registry.add(
        "cloud.aws.sqs.endpoint", () -> localStackContainer.getEndpointOverride(Service.SQS));
    registry.add(
        "cloud.aws.s3.endpoint", () -> localStackContainer.getEndpointOverride(Service.S3));
    registry.add(
        "cloud.aws.sns.endpoint", () -> localStackContainer.getEndpointOverride(Service.SNS));
    registry.add(
        "cloud.aws.ses.endpoint", () -> localStackContainer.getEndpointOverride(Service.SES));
    registry.add("cloud.aws.credentials.access-key", localStackContainer::getAccessKey);
    registry.add("cloud.aws.credentials.secret-key", localStackContainer::getSecretKey);
  }

}

EmailNotificationRequestHandlerITTest.java

@AutoConfigureTestEntityManager
public class EmailNotificationRequestHandlerITTest extends LocalStackAbstractIT {
  @Autowired private ApplicationConfig config;
  @Autowired private NotificationRepository notificationRepository;
  @Autowired private TemplateRepository templateRepository;
  @Autowired private EmailNotificationRepository emailNotificationRepository;
  @Autowired private AmazonSNS snsClient;
  @Autowired private AmazonSQS sqsClient;
  @Autowired private AmazonSimpleEmailService sesClient;
  @Autowired private EmailNotificationRequestHandler handler;
  @Autowired private ObjectMapper objectMapper;

  @Test
  public void testHandleEmailNotificationCreate() throws JsonProcessingException {
    ConfigurationSet configurationSet = new ConfigurationSet();
    configurationSet.withName("click-send-local");

    CreateConfigurationSetRequest createConfigurationSetRequest =
            new CreateConfigurationSetRequest();
    createConfigurationSetRequest.withConfigurationSet(configurationSet);
    sesClient.createConfigurationSet(createConfigurationSetRequest);

    CreateTopicResult emailRequestActivityTopic =
        snsClient.createTopic("EmailRequestActivityTopicITTest");
    Condition accountCondition = new StringCondition(StringComparisonType.StringEqualsIgnoreCase, "AWS:SourceAccount", "000000000000");
    Condition sourceArnCondition = new StringCondition(StringComparisonType.StringEqualsIgnoreCase, "AWS:SourceArn", "arn:aws:ses:us-east-1:000000000000:configuration-set/click-send-local");
    Policy policy =
        new Policy()
            .withStatements(
                new Statement(Effect.Allow)
                    .withPrincipals(Principal.All)
                    .withActions(SNSActions.Publish)
                    .withResources(new Resource(emailRequestActivityTopic.getTopicArn()))
                    .withConditions(accountCondition, sourceArnCondition));
    SetTopicAttributesRequest setTopicAttributesRequest = new SetTopicAttributesRequest();
    setTopicAttributesRequest
        .withTopicArn(emailRequestActivityTopic.getTopicArn())
        .withAttributeName("Policy")
        .withAttributeValue(policy.toJson());
    snsClient.setTopicAttributes(setTopicAttributesRequest);

    CreateQueueResult emailRequestActivityQueueResult =
        sqsClient.createQueue(config.notificationEmailRequestActivityCreateQueue);
    SubscribeRequest subscribeRequest = new SubscribeRequest();
    subscribeRequest
        .withTopicArn(emailRequestActivityTopic.getTopicArn())
        .withEndpoint(emailRequestActivityQueueResult.getQueueUrl())
        .withProtocol("sqs");
    snsClient.subscribe(subscribeRequest);

    SNSDestination snsDestination = new SNSDestination();
    snsDestination.withTopicARN(emailRequestActivityTopic.getTopicArn());
    EventDestination eventDestination = new EventDestination();
    eventDestination
        .withSNSDestination(snsDestination)
        .withName("EmailRequestActivityTopicITTest")
        .withEnabled(true)
        .withMatchingEventTypes(
            "Send",
            "Reject",
            "Bounce",
            "Complaint",
            "Delivery",
            "Open",
            "Click",
            "RenderingFailure");
    CreateConfigurationSetEventDestinationRequest configurationSetEventDestinationRequest =
        new CreateConfigurationSetEventDestinationRequest();
    configurationSetEventDestinationRequest
        .withConfigurationSetName(configurationSet.getName())
        .withEventDestination(eventDestination);
    sesClient.createConfigurationSetEventDestination(configurationSetEventDestinationRequest);

    NotificationRequest notificationRequest = createNotificationRequest();
    EmailNotification emailNotification =
        EmailNotification.builder()
            .notificationRequest(notificationRequest)
            .toAddress("[email protected]")
            .title("title")
            .message("message")
            .actionText("actionText")
            .actionUrl("actionUrl")
            .status(EmailNotificationStatus.PENDING)
            .id(UUID.randomUUID())
            .build();
    EmailNotification savedEmailNotification = emailNotificationRepository.save(emailNotification);
    EmailNotificationMessage emailNotificationMessage = new EmailNotificationMessage();
    emailNotificationMessage.setId(savedEmailNotification.getId().toString());
    handler.handleEmailNotificationCreate(emailNotificationMessage);

    EmailNotification finalEmailNotification =
        emailNotificationRepository.getById(savedEmailNotification.getId());
    assertNotNull(finalEmailNotification.getMessageId());

    ReceiveMessageResult receiveMessageResult =
        sqsClient.receiveMessage(emailRequestActivityQueueResult.getQueueUrl());
    assertNotNull(receiveMessageResult);
    List<Message> messageList = receiveMessageResult.getMessages();
    assertNotNull(messageList);
    assertFalse(messageList.isEmpty());
    assertEquals(1, messageList.size());
    Message message = messageList.get(0);
    Map<String, String> payload = objectMapper.readValue(message.getBody(), Map.class);
    String messageStr = payload.get("Message");
    System.out.println("MS STR: " + messageStr);
  }

  private NotificationRequest createNotificationRequest() {
    return createNotificationRequest(UUID.randomUUID());
  }

  private NotificationRequest createNotificationRequest(UUID id) {
    UUID templateId = createTemplate();
    NotificationRequest request =
        NotificationRequest.builder()
            .id(id)
            .templateId(templateId)
            .languageTag("language-tag")
            .relatedUserId("recipientId")
            .createdAt(null)
            .build();

    return notificationRepository.saveAndFlush(request);
  }

  private UUID createTemplate() {
    return createTemplate(UUID.randomUUID());
  }

  private UUID createTemplate(UUID id) {
    Template template =
        Template.builder()
            .id(id)
            .translationGroupId("translationGroupId-NotificationRepository")
            .titleKey("title-NotificationRepository")
            .messageKey("message-NotificationRepository")
            .application("application-NotificationRepository")
            .owner("ownerIntegrationTest-NotificationRepository")
            .recipient(TemplateRecipient.SABBY_USER)
            .status(TemplateStatus.DRAFT)
            .urgent(false)
            .library(false)
            .build();
    templateRepository.saveAndFlush(template);
    return id;
  }
}

EmailNotificationRequestHandler.java

@Component
@RequiredArgsConstructor
@Slf4j
@Validated
public class EmailNotificationRequestHandler {
  private final EmailNotificationRepository emailNotificationRepository;
  private final TemplateRepository templateRepository;
  private final AmazonSimpleEmailService sesClient;
  private final ApplicationConfig applicationConfig;

  @Transactional
  public void handleEmailNotificationCreate(@Valid @NotNull EmailNotificationMessage message) {
    String id = message.getId();
    log.debug("sending email notification for {}", id);
    Optional<EmailNotification> optionalEmailNotification =
        emailNotificationRepository.findById(UUID.fromString(id));
    if (optionalEmailNotification.isEmpty()) {
      String errorMsg = String.format("Email Notification %s does not exist, unable to send.", id);
      log.warn(errorMsg);
      throw new IllegalArgumentException(errorMsg);
    }
    EmailNotification emailNotification = optionalEmailNotification.get();
    Optional<Template> optionalTemplate =
        templateRepository.findById(emailNotification.getNotificationRequest().getTemplateId());
    if (optionalTemplate.isEmpty()) {
      String errorMsg =
          String.format(
              "Template with id %s, does not exist for Email Notification %s",
              emailNotification.getNotificationRequest().getTemplateId(), id);
      log.warn(errorMsg);
      throw new IllegalArgumentException(errorMsg);
    }
    Template template = optionalTemplate.get();
    log.debug("Found Template: {} for EmailNotification: {}", template, emailNotification);
    SendEmailRequest emailRequest = buildSendEmailRequest(emailNotification, template);
    log.debug("Built Send Email Request: {}", emailRequest);
    SendEmailResult result = sesClient.sendEmail(emailRequest);
    log.debug("Sent Email Result: {}", result.toString());
    EmailNotification updateEmailNotification =
        emailNotification.toBuilder().messageId(result.getMessageId()).build();
    EmailNotification savedUpdateEmailNotification =
        emailNotificationRepository.save(updateEmailNotification);
    log.debug("Saved Updated Email Notification: {}", savedUpdateEmailNotification);
  }

  @VisibleForTesting
  SendEmailRequest buildSendEmailRequest(
      @NonNull EmailNotification emailNotification, @NonNull Template template) {
    return new SendEmailRequest()
        .withDestination(new Destination().withToAddresses(emailNotification.getToAddress()))
        .withMessage(
            new Message(
                new Content(emailNotification.getTitle()),
                new Body(new Content(emailNotification.getMessage()))))
        .withSource(applicationConfig.emailSourceAddress)
        .withConfigurationSetName(applicationConfig.sesConfigSet)
        .withTags(getMessageTags(template.getApplication() + "-" + template.getTitleKey()));
  }

  @VisibleForTesting
  MessageTag getMessageTags(@NonNull String name) {
    MessageTag tags = new MessageTag().withName(applicationConfig.cloudWatchMetric).withValue(name);
    return tags;
  }
}

I'm using Spring Boot 2.6.3, with testcontainers version 1.16.3 and aws-java-sdk version 1.12.178. I'm trying to create an Integration test using testcontainers and the testcontainer localstack module. The test is to send an email to SES and have SES publish the message to an SNS topic, where an SQS queue is subscribed to the SNS topic.

I've successfully built a test where I publish a message directly to SNS and read the message from the SQS queue, however going through SES the message never makes it to SQS or at the very least is not getting published.

I have to think that I'm not setting the ACL correctly or that maybe SES via Localstack does not allow for publishing to other AWS services.

Where am I going wrong?

build.gradle

plugins {
    id 'java'
    id 'jacoco'
    id 'idea'
    id 'maven-publish'
    id 'signing'
    id 'com.palantir.docker' version '0.32.0'
    id 'net.researchgate.release' version '2.8.1'
    id 'io.spring.dependency-management' version '1.0.11.RELEASE'
    id 'org.springframework.boot' version '2.6.3'
    id 'com.diffplug.spotless' version '6.2.0'
}

dependencyManagement {
    imports {
        mavenBom 'io.awspring.cloud:spring-cloud-aws-dependencies:2.3.3'
        mavenBom 'com.amazonaws:aws-java-sdk-bom:1.12.178'
    }
}

dependencies {
    // implementation
    implementation 'org.springframework.boot:spring-boot-starter-log4j2'
    implementation 'org.springframework.boot:spring-boot-starter-actuator'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-validation'
    implementation 'org.projectlombok:lombok'
    implementation 'net.devh:grpc-server-spring-boot-starter:2.13.1.RELEASE'
    implementation 'org.springframework.data:spring-data-envers'

    

    // These dependencies (spring-boot-starter-data-jpa, liquibase-core, postgresql) are needed to
    // have liquibase startup and execute the changelog if you don't have jpa nothing gets picked
    // up, if you don't have postgresql we bomb out stating that it can't find a driver
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    implementation 'org.liquibase:liquibase-core'
    implementation 'org.postgresql:postgresql'

    implementation 'org.springframework:spring-jms'
    implementation 'com.amazonaws:amazon-sqs-java-messaging-lib:1.0.8'
    implementation 'com.amazonaws:aws-java-sdk-sns'
    implementation 'com.amazonaws:aws-java-sdk-sts'
    implementation 'com.amazonaws:aws-java-sdk-core'
    implementation 'com.amazonaws:aws-java-sdk-ses'
    implementation 'io.awspring.cloud:spring-cloud-starter-aws-secrets-manager-config'

    annotationProcessor 'org.projectlombok:lombok'

    // runtime
    runtimeOnly 'org.apache.logging.log4j:log4j-layout-template-json'

    // testing implementations
    testImplementation 'org.mockito:mockito-inline'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
    testImplementation 'org.junit.jupiter:junit-jupiter-api'
    testImplementation 'org.junit.jupiter:junit-jupiter-params'
    testImplementation 'org.junit.jupiter:junit-jupiter-engine'
    testImplementation 'org.testcontainers:testcontainers:1.16.3'
    testImplementation 'org.testcontainers:junit-jupiter:1.16.3'
    testImplementation 'org.testcontainers:postgresql:1.16.3'
    testImplementation "org.testcontainers:localstack:1.16.3"
    integrationTestImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
}

AbstractIT.java

@ActiveProfiles("it")
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@AutoConfigureTestDatabase(replace = AutoConfigureTestDatabase.Replace.NONE)
@ContextConfiguration(initializers = AbstractIT.DockerPostgreDataSourceInitializer.class)
@Transactional
@Testcontainers
public class AbstractIT {

  public static final PostgreSQLContainer<?> postgresDBContainer =
      new PostgreSQLContainer<>("postgres:12.1");

  static {
    postgresDBContainer.start();
  }

  public static class DockerPostgreDataSourceInitializer
      implements ApplicationContextInitializer<ConfigurableApplicationContext> {

    @Override
    @DynamicPropertySource
    public void initialize(ConfigurableApplicationContext applicationContext) {
      TestPropertySourceUtils.addInlinedPropertiesToEnvironment(
          applicationContext,
          "spring.datasource.web.hikari.jdbc-url=" + postgresDBContainer.getJdbcUrl(),
          "spring.datasource.web.hikari.username=" + postgresDBContainer.getUsername(),
          "spring.datasource.password=" + postgresDBContainer.getPassword());
    }
  }
}

LocalStackAbstractIT.java

@Testcontainers
public class LocalStackAbstractIT extends AbstractIT {

  @ClassRule
  public static final LocalStackContainer localStackContainer =
      new LocalStackContainer(DockerImageName.parse("localstack/localstack").withTag("0.14.2"))
          .withServices(Service.S3, Service.SES, Service.SNS, Service.SQS)
          .withEnv("AWS_ACCESS_KEY_ID", "accesskey")
          .withEnv("AWS_SECRET_ACCESS_KEY", "secretkey");

  static {
    localStackContainer.start();
    try {
      verifyEmail();
    } catch (IOException | InterruptedException e) {
      e.printStackTrace();
    }
  }

  private static void verifyEmail() throws IOException, InterruptedException {
    String command =
        "aws ses verify-email-identity --email-address [email protected] --region us-east-1 --endpoint-url http://localhost:4566";
    ExecResult result = localStackContainer.execInContainer(command.split(" "));
    if (result.getExitCode() != 0) {
      throw new RuntimeException(result.getStderr());
    }
  }

  @DynamicPropertySource
  static void overrideConfiguration(DynamicPropertyRegistry registry) {
    registry.add(
        "cloud.aws.sqs.endpoint", () -> localStackContainer.getEndpointOverride(Service.SQS));
    registry.add(
        "cloud.aws.s3.endpoint", () -> localStackContainer.getEndpointOverride(Service.S3));
    registry.add(
        "cloud.aws.sns.endpoint", () -> localStackContainer.getEndpointOverride(Service.SNS));
    registry.add(
        "cloud.aws.ses.endpoint", () -> localStackContainer.getEndpointOverride(Service.SES));
    registry.add("cloud.aws.credentials.access-key", localStackContainer::getAccessKey);
    registry.add("cloud.aws.credentials.secret-key", localStackContainer::getSecretKey);
  }

}

EmailNotificationRequestHandlerITTest.java

@AutoConfigureTestEntityManager
public class EmailNotificationRequestHandlerITTest extends LocalStackAbstractIT {
  @Autowired private ApplicationConfig config;
  @Autowired private NotificationRepository notificationRepository;
  @Autowired private TemplateRepository templateRepository;
  @Autowired private EmailNotificationRepository emailNotificationRepository;
  @Autowired private AmazonSNS snsClient;
  @Autowired private AmazonSQS sqsClient;
  @Autowired private AmazonSimpleEmailService sesClient;
  @Autowired private EmailNotificationRequestHandler handler;
  @Autowired private ObjectMapper objectMapper;

  @Test
  public void testHandleEmailNotificationCreate() throws JsonProcessingException {
    ConfigurationSet configurationSet = new ConfigurationSet();
    configurationSet.withName("click-send-local");

    CreateConfigurationSetRequest createConfigurationSetRequest =
            new CreateConfigurationSetRequest();
    createConfigurationSetRequest.withConfigurationSet(configurationSet);
    sesClient.createConfigurationSet(createConfigurationSetRequest);

    CreateTopicResult emailRequestActivityTopic =
        snsClient.createTopic("EmailRequestActivityTopicITTest");
    Condition accountCondition = new StringCondition(StringComparisonType.StringEqualsIgnoreCase, "AWS:SourceAccount", "000000000000");
    Condition sourceArnCondition = new StringCondition(StringComparisonType.StringEqualsIgnoreCase, "AWS:SourceArn", "arn:aws:ses:us-east-1:000000000000:configuration-set/click-send-local");
    Policy policy =
        new Policy()
            .withStatements(
                new Statement(Effect.Allow)
                    .withPrincipals(Principal.All)
                    .withActions(SNSActions.Publish)
                    .withResources(new Resource(emailRequestActivityTopic.getTopicArn()))
                    .withConditions(accountCondition, sourceArnCondition));
    SetTopicAttributesRequest setTopicAttributesRequest = new SetTopicAttributesRequest();
    setTopicAttributesRequest
        .withTopicArn(emailRequestActivityTopic.getTopicArn())
        .withAttributeName("Policy")
        .withAttributeValue(policy.toJson());
    snsClient.setTopicAttributes(setTopicAttributesRequest);

    CreateQueueResult emailRequestActivityQueueResult =
        sqsClient.createQueue(config.notificationEmailRequestActivityCreateQueue);
    SubscribeRequest subscribeRequest = new SubscribeRequest();
    subscribeRequest
        .withTopicArn(emailRequestActivityTopic.getTopicArn())
        .withEndpoint(emailRequestActivityQueueResult.getQueueUrl())
        .withProtocol("sqs");
    snsClient.subscribe(subscribeRequest);

    SNSDestination snsDestination = new SNSDestination();
    snsDestination.withTopicARN(emailRequestActivityTopic.getTopicArn());
    EventDestination eventDestination = new EventDestination();
    eventDestination
        .withSNSDestination(snsDestination)
        .withName("EmailRequestActivityTopicITTest")
        .withEnabled(true)
        .withMatchingEventTypes(
            "Send",
            "Reject",
            "Bounce",
            "Complaint",
            "Delivery",
            "Open",
            "Click",
            "RenderingFailure");
    CreateConfigurationSetEventDestinationRequest configurationSetEventDestinationRequest =
        new CreateConfigurationSetEventDestinationRequest();
    configurationSetEventDestinationRequest
        .withConfigurationSetName(configurationSet.getName())
        .withEventDestination(eventDestination);
    sesClient.createConfigurationSetEventDestination(configurationSetEventDestinationRequest);

    NotificationRequest notificationRequest = createNotificationRequest();
    EmailNotification emailNotification =
        EmailNotification.builder()
            .notificationRequest(notificationRequest)
            .toAddress("[email protected]")
            .title("title")
            .message("message")
            .actionText("actionText")
            .actionUrl("actionUrl")
            .status(EmailNotificationStatus.PENDING)
            .id(UUID.randomUUID())
            .build();
    EmailNotification savedEmailNotification = emailNotificationRepository.save(emailNotification);
    EmailNotificationMessage emailNotificationMessage = new EmailNotificationMessage();
    emailNotificationMessage.setId(savedEmailNotification.getId().toString());
    handler.handleEmailNotificationCreate(emailNotificationMessage);

    EmailNotification finalEmailNotification =
        emailNotificationRepository.getById(savedEmailNotification.getId());
    assertNotNull(finalEmailNotification.getMessageId());

    ReceiveMessageResult receiveMessageResult =
        sqsClient.receiveMessage(emailRequestActivityQueueResult.getQueueUrl());
    assertNotNull(receiveMessageResult);
    List<Message> messageList = receiveMessageResult.getMessages();
    assertNotNull(messageList);
    assertFalse(messageList.isEmpty());
    assertEquals(1, messageList.size());
    Message message = messageList.get(0);
    Map<String, String> payload = objectMapper.readValue(message.getBody(), Map.class);
    String messageStr = payload.get("Message");
    System.out.println("MS STR: " + messageStr);
  }

  private NotificationRequest createNotificationRequest() {
    return createNotificationRequest(UUID.randomUUID());
  }

  private NotificationRequest createNotificationRequest(UUID id) {
    UUID templateId = createTemplate();
    NotificationRequest request =
        NotificationRequest.builder()
            .id(id)
            .templateId(templateId)
            .languageTag("language-tag")
            .relatedUserId("recipientId")
            .createdAt(null)
            .build();

    return notificationRepository.saveAndFlush(request);
  }

  private UUID createTemplate() {
    return createTemplate(UUID.randomUUID());
  }

  private UUID createTemplate(UUID id) {
    Template template =
        Template.builder()
            .id(id)
            .translationGroupId("translationGroupId-NotificationRepository")
            .titleKey("title-NotificationRepository")
            .messageKey("message-NotificationRepository")
            .application("application-NotificationRepository")
            .owner("ownerIntegrationTest-NotificationRepository")
            .recipient(TemplateRecipient.SABBY_USER)
            .status(TemplateStatus.DRAFT)
            .urgent(false)
            .library(false)
            .build();
    templateRepository.saveAndFlush(template);
    return id;
  }
}

EmailNotificationRequestHandler.java

@Component
@RequiredArgsConstructor
@Slf4j
@Validated
public class EmailNotificationRequestHandler {
  private final EmailNotificationRepository emailNotificationRepository;
  private final TemplateRepository templateRepository;
  private final AmazonSimpleEmailService sesClient;
  private final ApplicationConfig applicationConfig;

  @Transactional
  public void handleEmailNotificationCreate(@Valid @NotNull EmailNotificationMessage message) {
    String id = message.getId();
    log.debug("sending email notification for {}", id);
    Optional<EmailNotification> optionalEmailNotification =
        emailNotificationRepository.findById(UUID.fromString(id));
    if (optionalEmailNotification.isEmpty()) {
      String errorMsg = String.format("Email Notification %s does not exist, unable to send.", id);
      log.warn(errorMsg);
      throw new IllegalArgumentException(errorMsg);
    }
    EmailNotification emailNotification = optionalEmailNotification.get();
    Optional<Template> optionalTemplate =
        templateRepository.findById(emailNotification.getNotificationRequest().getTemplateId());
    if (optionalTemplate.isEmpty()) {
      String errorMsg =
          String.format(
              "Template with id %s, does not exist for Email Notification %s",
              emailNotification.getNotificationRequest().getTemplateId(), id);
      log.warn(errorMsg);
      throw new IllegalArgumentException(errorMsg);
    }
    Template template = optionalTemplate.get();
    log.debug("Found Template: {} for EmailNotification: {}", template, emailNotification);
    SendEmailRequest emailRequest = buildSendEmailRequest(emailNotification, template);
    log.debug("Built Send Email Request: {}", emailRequest);
    SendEmailResult result = sesClient.sendEmail(emailRequest);
    log.debug("Sent Email Result: {}", result.toString());
    EmailNotification updateEmailNotification =
        emailNotification.toBuilder().messageId(result.getMessageId()).build();
    EmailNotification savedUpdateEmailNotification =
        emailNotificationRepository.save(updateEmailNotification);
    log.debug("Saved Updated Email Notification: {}", savedUpdateEmailNotification);
  }

  @VisibleForTesting
  SendEmailRequest buildSendEmailRequest(
      @NonNull EmailNotification emailNotification, @NonNull Template template) {
    return new SendEmailRequest()
        .withDestination(new Destination().withToAddresses(emailNotification.getToAddress()))
        .withMessage(
            new Message(
                new Content(emailNotification.getTitle()),
                new Body(new Content(emailNotification.getMessage()))))
        .withSource(applicationConfig.emailSourceAddress)
        .withConfigurationSetName(applicationConfig.sesConfigSet)
        .withTags(getMessageTags(template.getApplication() + "-" + template.getTitleKey()));
  }

  @VisibleForTesting
  MessageTag getMessageTags(@NonNull String name) {
    MessageTag tags = new MessageTag().withName(applicationConfig.cloudWatchMetric).withValue(name);
    return tags;
  }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文