Skip to main content

SNS Message Processor

Publish/subscribe (pub/sub) is one of the two fundamental integration patterns in messaging (the other being point-to-point). In a pub/sub integration a producer publishes a message onto a channel, and a subscriber receives that message. The channel in the middle could be an event bus, a topic or a stream. This pattern focuses on topic based publish subscribe.

A topic is a message channel typically focused on a specific type of message. You may have order-created and order-updated channels. This differs from an event bus where you typically have a single bus with different types of events flowing through the same channel.

Amazon Simple Notification Service (SNS) is an example of how you can implement topic based publish/subscribe using AWS services. If you're looking to introduce pub/sub to your system, using SNS, Rust and AWS Lambda then this is the article for you.

How It Works

The SNS to Lambda integration is an example of an async invoke. The SNS service calls the InvokeAsync API on the Lambda service passing the message payload. Internally, the Lambda service queues up these messages and invokes your function.

It's important to remember that SNS itself is ephemeral and provides no durability guarantees.

Project Structure

A SNS to Lambda template is found under the ./templates directory in the GitHub repo. You can use template to get started building with SNS and Lambda.

The project separates the SNS/Lambda handling code from your business logic. This allows you to share domain code between multiple Lambda functions that are contained within the same service.


lambdas
- new-message-processor
- shared

This tutorial will mostly focus on the code under lambdas/new-message-processor. Although shared code will be referenced to discuss how you can take this template and 'plug-in' your own implementation.

Lambda Code

Whenever you are working with SNS and Lambda your main function will look the same. This example doesn't focus on initializing AWS SDK's or reusable code. However, inside the main method is where you would normally initialize anything that is re-used between invokes.


#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_target(false)
.without_time()
.init();
run(service_fn(function_handler)).await
}

One thing to note is the tokio macro macro. Macros in Rust are signals to the compiler to generate some code based upon the macros' definition. The tokio macro allows the main function to run asynchronous, which is what the Lambda handler function requires.

It's worth noting, that this main function example would work for almost all Lambda Event Sources. The difference coming when moving on to the function_handler itself.

The main bulk of an SNS sourced Lambda function is implemented in the function_handler function. The first piece to note in this handler is that the event argument is typed to an event. This event uses the Lambda events Crate which defines the struct definition for the record definition specified by AWS. As you are sourcing your function with SNS, this uses the SnsEvent type.


async fn function_handler(event: LambdaEvent<SnsEvent>) -> Result<(), String> {
for message in &event.payload.records {
let new_message: Result<OrderCreatedMessage, MessageParseError>
= InternalSnsMessage::new(message.clone()).try_into();
if new_message.is_err(){
return Err("Failure deserializing message body".to_string());
}
let _ = OrderCreatedMessageHandler::handle(&new_message.unwrap()).await?;
}
Ok(())
}

As you learned earlier, SNS invokes your Lambda function using the InvokeAsync API call. This means SNS can continue doing other work and the Lambda service can invoke your function asynchronously. The SnsEvent struct, contains a vector of SnsRecords. However, this vector will only ever contain a single message. For re-usability, a custom InternalSnsMessage struct is used as a wrapper around the SnsRecord type that comes from the Lambda events Crate. This allows the try_into() function to be used to handle the conversion from the custom SNS type into the OrderCreatedMessage type used by the application.

You'll notice that if a failure occurs either in the initial message parsing or the actual handling of the message an error is returned. This ensures an error is passed back up to the Lambda service and retries can happen.


async fn function_handler(event: LambdaEvent<SnsEvent>) -> Result<(), String> {
for message in &event.payload.records {
let new_message: Result<OrderCreatedMessage, MessageParseError>
= InternalSnsMessage::new(message.clone()).try_into();
if new_message.is_err(){
return Err("Failure deserializing message body".to_string());
}
let _ = OrderCreatedMessageHandler::handle(&new_message.unwrap()).await?;
}
Ok(())
}

The main bulk of an SNS sourced Lambda function is implemented in the function_handler function. The first piece to note in this handler is that the event argument is typed to an event. This event uses the Lambda events Crate which defines the struct definition for the record definition specified by AWS. As you are sourcing your function with SNS, this uses the SnsEvent type.

As you learned earlier, SNS invokes your Lambda function using the InvokeAsync API call. This means SNS can continue doing other work and the Lambda service can invoke your function asynchronously. The SnsEvent struct, contains a vector of SnsRecords. However, this vector will only ever contain a single message. For re-usability, a custom InternalSnsMessage struct is used as a wrapper around the SnsRecord type that comes from the Lambda events Crate. This allows the try_into() function to be used to handle the conversion from the custom SNS type into the OrderCreatedMessage type used by the application.

You'll notice that if a failure occurs either in the initial message parsing or the actual handling of the message an error is returned. This ensures an error is passed back up to the Lambda service and retries can happen.


async fn function_handler(event: LambdaEvent<SnsEvent>) -> Result<(), String> {
for message in &event.payload.records {
let new_message: Result<OrderCreatedMessage, MessageParseError>
= InternalSnsMessage::new(message.clone()).try_into();
if new_message.is_err(){
return Err("Failure deserializing message body".to_string());
}
let _ = OrderCreatedMessageHandler::handle(&new_message.unwrap()).await?;
}
Ok(())
}

Shared Code & Reusability

The shared code in this example contains a custom OrderCreatedMessage struct representing the actual message payload that was published. The shared code also contains a OrderCreatedMessageHandler that contains a handle function, taking the OrderCreatedMessage struct as a input parameter.

If you want to use this template in your own applications, replace the OrderCreatedMessage struct with your own custom struct and update the handle function with your custom business logic.

lib.rs

#[derive(Debug)]
pub enum OrderCreatedMessageHandleError{
UnexpectedError
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct OrderCreatedMessage {
pub order_id: String
}
pub struct OrderCreatedMessageHandler {}
impl OrderCreatedMessageHandler {
pub async fn handle(message: &OrderCreatedMessage) -> Result<(), OrderCreatedMessageHandleError> {
tracing::info!("New message is for {}", message.order_id);
if message.order_id == "error" {
return Err(OrderCreatedMessageHandleError::UnexpectedError);
}
Ok(())
}
}

The shared library also contains code to convert an SnsRecord into the custom OrderCreatedMessage struct. It does this using the TryFrom trait. Because the SnsRecord struct is defined in an external crate, the InternalSnsMessage struct is used as a wrapper. Traits cannot be implemented for structs outside of the current crate.

You'll notice the try_from function returns a custom MessageParseError type depending if the message body is empty or the message fails to deserialize correctly.

lib.rs

pub struct InternalSnsMessage{
message: SnsRecord
}
impl InternalSnsMessage{
pub fn new(message: SnsRecord) -> Self {
Self {
message
}
}
}
impl TryFrom<InternalSnsMessage> for OrderCreatedMessage {
type Error = MessageParseError;
fn try_from(value: InternalSnsMessage) -> Result<Self, Self::Error> {
let parsed_body: OrderCreatedMessage = serde_json::from_str(value.message.sns.message.as_str())?;
Ok(parsed_body)
}
}

Congratulations, you now know how to implement an SNS sourced Lambda function in Rust and do that in a way that separates your Lambda handling code from your business logic.

Deploy Your Own

If you want to deploy this exact example, clone the GitHub repo and run the below commands:

deploy.sh

cd templates/patterns/messaging-patterns/sns-message-processor
sam build
sam deploy

You can then invoke the function using the below CLI command, replacing the <TOPIC_ARN> with the ARN that was output as part of the sam deploy step. The sam logs command will grab the latest logs.

test.sh

aws sns publish --message '{"orderId":"1234"}' --region eu-west-1 --profile dev --topic-arn <TOPIC_ARN>
sam logs

If you run the below command, you can test failure scenarios. In this example, the Lambda function uses a OnFailure Destination to route failed invokes to an SQS queue.

failure-test.sh

aws sns publish --message '{"orderId":"error"}' --region eu-west-1 --profile dev --topic-arn <TOPIC_ARN>
sam logs