Categories
aws sqs

sqs consumer in spring boot

In this tutorial, we will learn about sqs consumer in spring boot

AWS offers a Queue data structure with SQS products to produce and consume messages from the queue

SQS stands for Simple Queue Service

Create a spring boot project using spring boot initializr
https://start.spring.io/

Here I am using java 8 with maven.

Prerequisites

  • Queue url
  • access key
  • secret key id
  • SQS region

Dependency

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-aws-messaging</artifactId>
   <version>2.2.4.RELEASE</version>
</dependency>

SQS Config

We need to configure our spring boot application for the SQS producer.
In this class, you have to provide access key id and secret key, and region
Create a new class SQSConfig.java and the below code


import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory;
import org.springframework.cloud.aws.messaging.config.annotation.EnableSqs;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
@EnableSqs
public class SQSConfig {

    @Value("${cloud.aws.region:ap-south-1}")
    private String awsRegion;

    @Value("${cloud.aws.credentials.accessKeyId}")
    private String accessKey;

    @Value("${cloud.aws.credentials.secretKey}")
    private String secretKey;

    @Bean
    @Primary
    public AmazonSQSAsync amazonSQSAsync() {
        return AmazonSQSAsyncClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)))
                .withRegion(Regions.fromName(awsRegion))
                .build();
    }

    @Bean
    public AWSCredentialsProvider credentialsProvider() {
        return new DefaultAWSCredentialsProviderChain();
    }

    @Bean
    public QueueMessagingTemplate queueMessagingTemplate() {
        return new QueueMessagingTemplate(amazonSQSAsync());
    }


    @Bean
    public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSQSAsync) {
        SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
        factory.setAmazonSqs(amazonSQSAsync);
        factory.setAutoStartup(true);
        factory.setMaxNumberOfMessages(10);
        factory.setTaskExecutor(createDefaultTaskExecutor());
        return factory;
    }

    protected AsyncTaskExecutor createDefaultTaskExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setThreadNamePrefix("SQSExecutor - ");
        threadPoolTaskExecutor.setCorePoolSize(100);
        threadPoolTaskExecutor.setMaxPoolSize(100);
        threadPoolTaskExecutor.setQueueCapacity(2);
        threadPoolTaskExecutor.afterPropertiesSet();
        return threadPoolTaskExecutor;
    }

}

Message Consumer

With the above configuration, we can start the consumer using  @SqsListener

SQSListener

import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.stereotype.Component;

@Component
public class SQSListener {

    @SqsListener(value = "${cloud.queue.name}", deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
    public void processMessage(String message) {
        try {
            System.out.println("RECEIVED MESSAGE --> " + message);
        } catch (Exception e) {
            throw new RuntimeException("Cannot process message from SQS", e);
        }
    }
}

application.properties

cloud.aws.credentials.accessKeyId=***
cloud.aws.credentials.secretKey=***
cloud.queue.name=weather

Output

RECEIVED MESSAGE --> {"message":"1"}

GitHub

https://github.com/rkumar9090/sqs-consumer

Related Articles

Categories
aws sqs

sqs producer in spring boot

In this tutorial, we will learn about sqs producer in spring boot

AWS offers a Queue data structure with SQS products to produce and consume messages from the queue

SQS stands for Simple Queue Service

Create a spring boot project using spring boot initializr
https://start.spring.io/

Here I am using java 8 with maven.

Prerequisites

  • Queue url
  • access key
  • secret key id
  • SQS region

Maven dependency

Add the below dependency in your pom.xml

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-aws-messaging</artifactId>
</dependency>

SQS Config

We need to configure our spring boot application for the SQS producer.
In this class, you have to provide access key id and secret key, and region
Create a new class SQSConfig.java and the below code

@Configuration
public class SQSConfig {
  

    @Value("${cloud.aws.credentials.accessKeyId}")
    private String accessKey;

    @Value("${cloud.aws.credentials.secretKey}")
    private String secretKey;

    @Bean
    public QueueMessagingTemplate queueMessagingTemplate() {
        return new QueueMessagingTemplate(amazonSQSAsync());
    }

    @Bean
    @Primary
    public AmazonSQSAsync amazonSQSAsync() {
        return AmazonSQSAsyncClientBuilder.standard().withRegion(Regions.AP_SOUTH_1)
                .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)))
                .build();
    }
}

Message Producer

With the above configuration, we can start to publish the message to SQS using convertAndSend Method

@Component
public class SQSProducer {

    @Autowired
    private QueueMessagingTemplate queueMessagingTemplate;

    @Value("${cloud.aws.credentials.end-point}")
    private String endpoint;

    public String sendMessage(Pojo message) {
        queueMessagingTemplate.convertAndSend(endpoint,message);
        return "Successfully sent message to SQS";
    }
}

Now you can create one controller and send the message from rest endpoint

@RestController
@RequestMapping("/api/data/")
public class ProducerController {

    @Autowired
    private SQSProducer publisher;

    @PostMapping
    public String sendMessage(@RequestBody Pojo message) {
        return publisher.sendMessage(message);
    }
}

application.properties

cloud.aws.credentials.accessKeyId=****
cloud.aws.credentials.secretKey=*****
cloud.aws.credentials.end-point=https://sqs.ap-south-1.amazonaws.com/978751824592/producer
cloud.aws.region.static.auto=false
cloud.aws.stack.auto=false

If you are facing network issue, add the below snipped in the spring boot main class

@SpringBootApplication(
		exclude = {
				org.springframework.cloud.aws.autoconfigure.context.ContextInstanceDataAutoConfiguration.class,
				org.springframework.cloud.aws.autoconfigure.context.ContextStackAutoConfiguration.class,
				org.springframework.cloud.aws.autoconfigure.context.ContextRegionProviderAutoConfiguration.class
		}
)

Now you can hit the URL from the postman and you can see the message in SQS AWS console

To Create an access key id and secret key refer to this
https://beginnersbug.com/create-access-key-in-aws-console/

GitHub URL

https://github.com/rkumar9090/sqs-producer