Distributed tracing with AWS SQS message attributes

Keshav Bist
5 min readJul 3, 2021

Amazon Simple Queue Service (SQS) is a fully managed message queueing service in AWS cloud. We can use SQS to send, store, and receive message between the different services or software components at high volume, without losing the message even if receiving service is down. The maximum message size is limited to 256KB, however messages up to 2GB are supported using extended client library.

Apart from message body, SQS support sending structured metadata such as message signature, timestamp, geospatial information, and identifiers using message attributes. There are two sets of message attributes, Message Attributes and Message System Attributes. Message Attributes are provided for general purpose, and each message can have up to ten message attributes. while Message System Attributes are for the AWS services such as AWS X-Ray.

The message body in SQS can be either plain text, JSON document or XML document. If the SQS contains these different type of messages from different services, our message consuming service needs to handle these messages differently. In this scenario we can send message attribute along with message, so that the consuming service look at the attribute and call proper parser for parsing the message.

One of the common use of the SQS in the microservices is to facilitate communication channel between services. In this architecture it is tricky to trace the requests, because there are many components communicating to each others in a huge volume of messages, but it is the essential feature to have for debugging and monitoring. In the following sections of this articles we will look into how we can use message attributes for distributed tracing.

Prerequisites:

  • AWS cli and sam cli installed and configured
  • Basic knowledge on aws cloudformation
  • Basics of go lang
  • Basic knowledge of Jaeger.

Architecture:

Fig 1.0 — Architecture Diagram

The gateway expose the API endpoint for HTTP access to the user, and forward the request to trigger the endpoint lambda. This lambda generates the tracing id and send it to tracing service and to the SQS queue as a message attribute. The messages in the queue triggers the consumer lambda, which consumes the messages and send the tracing information to the tracing services. Configuring and loading the tracing service is out of the scope of this article.

Go code for lambda handling http request (API Lambda):

package mainimport (
"context"
"fmt"
"io"
"os"
"strings"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/gofrs/uuid"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
"github.com/uber/jaeger-client-go/log"
"github.com/aws/aws-lambda-go/lambda"
)
var (
queueURL = os.Getenv("SQS_URL")
collectorEndPoint = os.Getenv("COLLECTOR_ENDPOINT") + "/api/traces"
)
type QueueListener struct {
Client sqsiface.SQSAPI
}
func NewQueListener() QueueListener {

sess := session.Must(session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Region: aws.String(os.Getenv("AWS_REGION"))
},
SharedConfigState: session.SharedConfigEnable,
Profile: os.Getenv("AWS_PROFILE"),
}))
return QueueListener{Client: sqs.New(sess)}
}
// sends message to the SQS
func SendMessage(tracingId string) (string, error) {
queueListener := NewQueListener()
messageAttribute := map[string]*sqs.MessageAttributeValue{
"activity_id": {
DataType: aws.String("String"),
StringValue: aws.String(tracingId),
},
}
msg, err := queueListener.Client.SendMessage(&sqs.SendMessageInput{
MessageBody: aws.String("Hello from API lambda"),
QueueUrl: &queueURL,
MessageAttributes: messageAttribute,
})
if err != nil {
fmt.Printf("Error sending message: %+v", err)
return "", err
}
return *msg.MessageId, nil
}
func Init() (io.Closer, error) {
cfg := config.Configuration{
ServiceName: "API-LAMBDA",
Sampler: &config.SamplerConfig{
Type: jaeger.SamplerTypeConst,
Param: 1,
},
Reporter: &config.ReporterConfig{
CollectorEndpoint: collectorEndPoint,
LogSpans: true,
},
Disabled: false,
}
tracer, closer, err := cfg.NewTracer(config.Logger(log.StdLogger))if err != nil {
fmt.Printf("Error %+v", err)
cfg.Disabled = true
return nil, err
}
opentracing.SetGlobalTracer(tracer)return closer, nil
}
func HandleRequest(ctx context.Context, event events.APIGatewayProxyRequest) (*events.APIGatewayProxyResponse, error) {closer, err := Init()
if err != nil {
return &events.APIGatewayProxyResponse{
StatusCode: 500,
Body: "Some Error Occured",
}, nil
}
defer closer.Close()
span, context := opentracing.StartSpanFromContext(ctx, "api_lambda_HandleRequest")
defer span.Finish()
fmt.Printf("context: %+v", context)
activity_id_uuid, _ := uuid.NewV1()
activityId := strings.Split(activity_id_uuid.String(), "-")[0]
span.SetTag("component_type", "lambda")
span.SetTag("activity_id", activityId)
span.SetTag("tracingId", activityId)
msgId, err := SendMessage(activityId)if err != nil {
return &events.APIGatewayProxyResponse{
StatusCode: 500,
Body: "Some Error Occured",
}, nil
}
return &events.APIGatewayProxyResponse{
StatusCode: 200,
Body: "Published Tracing ID = " + activityId + ", Message Id = " + msgId,
}, nil
}
func main () {
lambda.Start(HandleRequest)
}

Go Code for Lambda Consuming SQS Messages (Consumer Lambda):

package mainimport (
"context"
"fmt"
"io"
"os"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"
"github.com/uber/jaeger-client-go/log"
)
var collectorEndPoint = os.Getenv("COLLECTOR_ENDPOINT") + "/api/traces"func Init() (io.Closer, error) {
cfg := config.Configuration{
ServiceName: "CONSUMER-LAMBDA",
Sampler: &config.SamplerConfig{
Type: jaeger.SamplerTypeConst,
Param: 1,
},
Reporter: &config.ReporterConfig{
CollectorEndpoint: collectorEndPoint,
LogSpans: true,
},
Disabled: false,
}
tracer, closer, err := cfg.NewTracer(config.Logger(log.StdLogger)) if err != nil {
fmt.Printf("Error %+v\n", err)
cfg.Disabled = true
return nil, err
}
opentracing.SetGlobalTracer(tracer)
return closer, nil
}
func HandleRequest(ctx context.Context, event events.SQSEvent) error { closer, err := Init()
if err != nil {
return err
}
defer closer.Close()
fmt.Printf("ctx: %v\n", ctx)
for _, message := range event.Records {
span, _ := opentracing.StartSpanFromContext(ctx, "consumer_lambda_HandleRequest")
defer span.Finish()
fmt.Printf("\nSpan: %+v", span)
activityId :=message.MessageAttributes["activity_id"].StringValue
fmt.Printf("\nReceived message id: %s", message.MessageId)
span.SetTag("component_type", "lambda")
span.SetTag("traceId", *activityId)
fmt.Printf("\ntraceid: %s\n", *activity_id)
}
return nil
}
func main() {
lambda.Start(HandleRequest)
}

Creating Infrastructure:

Queue:

This SQS queue is used to send messages between the systems.

AWSTemplateFormatVersion: '2010-09-09'
Description: SQS Queue for demo purpose
Resources:
TestMessageBusQueue:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 300
QueueName: 'test-message-bus-queue'
TestMessageBusQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
Queues: [!Ref TestMessageBusQueue]
PolicyDocument:
Statement:
-
Action:
- "SQS:SendMessage"
- "SQS:ReceiveMessage"
Effect: "Allow"
Resource: !GetAtt TestMessageBusQueue.Arn
Principal:
AWS: "*"
Outputs:
TestMessageBusQueueArn:
Value: !GetAtt TestMessageBusQueue.Arn
Description: Demand pool listener queue Arn
Export:
Name: !Join [ ':', [ 'test', TestMessageBusQueueArn ] ]
TestMessageBusQueueURL:
Value: !Ref TestMessageBusQueue
Description: Demand pool listener queue URL
Export:
Name: !Join [ ':', [ 'test', TestMessageBusQueueURL ] ]

Deploying Queue:

aws cloudformation create-stack --stack-name your-stack-name \
--template-body file://sqs.yaml \
--profile $(AWS_PROFILE) --region $(AWS_REGION) \
--parameters

Lambda:

Consumer Lambda: This lambda is triggered by SQS queue created above. So it needs the SQS related security policies allowed in lambda execution role.

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Globals:
Function:
Timeout: 300
Resources:
ConsumerLambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- "lambda.amazonaws.com"
Action:
- "sts:AssumeRole"
Path: "/"
ManagedPolicyArns:
- "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
Policies:
- PolicyName: !Sub ${AWS::StackName}-lambda-policy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- sqs:ReceiveMessage
- sqs:DeleteMessage
- sqs:GetQueueAttributes
- sqs:ChangeMessageVisibility
Resource:
- Fn::ImportValue: 'test:TestMessageBusQueueArn'
- Effect: Allow
Action:
- logs:*
Resource: arn:aws:logs:*:*:*
TestConsumerLambda:
Type: AWS::Serverless::Function
Properties:
FunctionName: 'test-consumer-lambda'
Handler: main
CodeUri: ../src/
Runtime: go1.x
Role: !GetAtt ConsumerLambdaExecutionRole.Arn
Environment:
Variables:
COLLECTOR_ENDPOINT: 'your-jaeger-url-here'
EventSourceMapping:
Type: AWS::Lambda::EventSourceMapping
Properties:
BatchSize: 10
Enabled: true
EventSourceArn:
Fn::ImportValue: 'test:TestMessageBusQueueArn'
FunctionName: !Ref TestConsumerLambda

API Lambda:

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Globals:
Function:
Timeout: 300
Resources:
ApiLambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
Service:
- "lambda.amazonaws.com"
Action:
- "sts:AssumeRole"
Path: "/"
ManagedPolicyArns:
- "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
Policies:
- PolicyName: !Sub ${AWS::StackName}-lambda-policy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- ec2:DescribeNetworkInterfaces
- ec2:CreateNetworkInterface
- ec2:DeleteNetworkInterface
Resource: "*"
- Effect: Allow
Action:
- logs:*
Resource: arn:aws:logs:*:*:*
TestAPILambda:
Type: AWS::Serverless::Function
Properties:
FunctionName: 'test-api-lambda'
Handler: main
CodeUri: ../src/
Runtime: go1.x
Role: !GetAtt ApiLambdaExecutionRole.Arn
Events:
TestApi:
Type: Api
Properties:
Path: /ping
Method: GET
Environment:
Variables:
SQS_URL:
Fn::ImportValue: 'test:TestMessageBusQueueURL'
COLLECTOR_ENDPOINT:
Fn::ImportValue: '<your-jaeger-tracing-url-here'

Deploying Lambda:

cd src && go mod download && \ 
GOOS=linux GOARCH=amd64 go build -o main *.go
sam package --template-file build/saml.yaml \
--output-template-file build/packaged.yaml \
--s3-bucket <s3-bucket>
--profile $(AWS_PROFILE) --region $(AWS_REGION)
sam deploy --template-file build/packaged.yaml \
--stack-name <your-stack-name> \
--capabilities CAPABILITY_IAM \
--profile $(AWS_PROFILE) --region $(AWS_REGION) \

Deploying API lambda will create an API gate-way to expose it to the users.

--

--

Keshav Bist

Professional software engineer since 2017 and tech enthusiast who love to solve social problems using technology.