AWS Step Function
- Step functions are state machines.
- Graph of steps with start, intermediate, and terminating nodes.
- Each step is one of the following
- Lambda function
- choice (branch)
- Wait state (sleep)
- “Pass” state (can modify state without calling a function)
- terminate (success/fail)
- Each step accepts and emits a state object(Json).
- Looping is OK
- Plumbing included (in expensive to operate)
- Allows us to the create workflows that follow a fixed or dynamic sequence – a.k.a. “steps”.
- So In a traditional application based on an API request we may want to write to database and depending on what the value was there we may want to proceed to next step or return.
- If our database calls Fail we simply Return.
- Step function allows us to model this as independent units.
- These independent units and tiny tasks can reach and directly interact with AWS resources such as Dynamo DB, SNS, lambda to go and perform some job and return the response back to the step function.
- Step functions are an orchestration for an application.
- Built in retry functionality – don’t progress until success.
- We setup retry times or can have no retry policy.
- We can define attributes to our retry policy.
- We can set our task for exponential back off or try linear we set those in our policy.
- Native integration with AWS services such as lambda, SNS, SQS, Dynamo DB etc.
- These integrations are defined in ADSL language, which is Json like language where we define steps and we chain the steps together.
- One of the tasks in the chain may be interacting with AWS service such as lambda or SNS.
- GUI for auditing workflow process, input/output etc.
- Helps to identify where in the application flow we are and keep track of it.
- Helps to identify inputs and outputs between steps.
- Identify failed steps and where the gaps are.
- Highly scalable and low cost($.025/1000 state transition).
- Example
- We have a request which hits our transaction processing service(EC2 or lambda).
- We store request in history (Dynamo DB).
- Next we authorize the credentials.(Auth Service)
- Next we commit the transaction.(Authority service).
- Next we broadcast to SNS about success.
- A task type state definition can be defined as follows
definition: Comment: Hello world application for AWS step functions StartAt: Hello States: Hello: Type: Task Resource: ‘arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:hello’ Next: World World: Type: Task Resource: ‘arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:world’ Next: ChoiceState ChoiceState: Type: Choice Choices: — Variable: $.hello.name StringEquals: ‘Hadrian’ Next: WaitState — Variable: $.hello.success BooleanEquals: true Next: SucceedState Default: ‘FailedState’ WaitState: Seconds: 5 Next: Greeting Greeting: Type: Task Resource: ‘arn:aws:lambda:#{AWS::Region}:#{AWS::AccountId}:function:fromUsername’ End: true SucceedState: Type: Success FailedState: Type: Fail
AWS step functions using lambdas
- Transaction processor state machine prototype.
- We have a Jason input event.
- We will feed it to input of state machine.
- Next we have a choice task which makes a decision based on input value.
- Next depending on about choice we go to the required processor which processes the input.
- Both processors are backed by lambda functions.
- Next we return results back to the state machine.
- Next we terminate our state machine.
- Setting up our AWS console
- Create two lambda functions “process purchase” and “process refund” which output response objects as following code example.
import json import datetime import urllib import boto3 def lambda_handler(message, context): # TODO implement print("received messsage from step fn") print(message) response = {} response['TransactionType'] = message['TransactionType'] response['Timestamp'] = datetime.datetime.now().strftime("%Y-%m-%d %H-%M-%S") response['Message'] = "Hello from process purchase lambda" return response
- Copy ARN of both the functions This is required at conditional step function to point to lambda.
- Create a role in IAM which can invoke the lambda function.
- We already have a policy for step functions we can use that.
- Next go to step function service in AWS click on create state machine to create our state machine Or business process modeling notation.
- Give a name to state machine
- Next we have to define our state machine.
- Step functions are beaten in Amazon state language ASL. It is a proprietary language only specific to Amazon.
- For Given scenario our code would be like.
{ "Comment": "A simple AWS Step Functions state machine that automates a call center support session.", "StartAt": "ProcessTransaction", "States": { "ProcessTransaction": { "Type" : "Choice", "Choices": [ { "Variable": "$.TransactionType", "StringEquals": "PURCHASE", "Next": "ProcessPurchase" }, { "Variable": "$.TransactionType", "StringEquals": "REFUND", "Next": "ProcessRefund" } ] }, "ProcessRefund": { "Type": "Task", "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME", "End": true }, "ProcessPurchase": { "Type": "Task", "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME", "End": true } } }
- Give the state machine role to execute lambda.
- Click create state machine.
- Next to run step function click on start execution.
- Give the inputs as follows and click start execution.
{ "Transaction Type":"Purchase" }
- We will get a view of what state machine is actually doing and all the steps which have executed successfully will be marked in green.
- This is how we can run lambda function from step functions
AWS Step Functions with SQS
- Create a Queue in SQS.
- Give a Queue name.
- Select type of queue
- Standard queue
- fifo queue
- Copy the URL off the queue.
- Create a new state machine in step function with definition as follows.
{ "Comment":"Transaction Processor State Machine Using SQS", "StartAt":"ProcessTransaction", "States":{ "ProcessTransaction":{ "Type":"Pass", "Next":"BroadcastToSqs" }, "BroadcastToSqs":{ "Type":"Task", "Resource":"arn:aws:states:::sqs:sendMessage", "Parameters":{ "QueueUrl":"", "MessageBody":{ "TransactionId.$":"$.TransactionId", "Type.$":"$.Type" } }, "End":true } } }
- Add an IAM role to access SQS.
- Start the execution with following diameters.
{ "TransactionId": "abc", "Type": "PURCHASE" }
- Check for message in queue.
Step function with SNS
- Create a topic in SNS
- Copy the ARN
- Create a step function with following definition.
{ "Comment":"Transaction Processor State Machine Using SNS", "StartAt":"ProcessTransaction", "States":{ "ProcessTransaction":{ "Type":"Pass", "Next":"BroadcastToSns" }, "BroadcastToSns":{ "Type":"Task", "Resource":"arn:aws:states:::sns:publish", "Parameters":{ "TopicArn":"Replace Me!", "Message":{ "TransactionId.$":"$.TransactionId", "Type.$":"$.Type", "Source": "Step Functions!" } }, "End":true } } }
- Add an IAM role to access SNS.
- Start execution with following parameters.
{ "TransactionId": "abc", "Type": "PURCHASE" }
- Check AWS notification in mail.
Step function with Dynamo DB
- Create a table in Dynamo DB.
- Give name
- Give Primary key.
- Click create
- Go to step function and create a state machine with the following definition.
To read from Dynmo DB we use{ "Comment": "PutItem into DynamoDB", "StartAt": "1st", "States": { "1st": { "Type": "Task", "Resource": "arn:aws:states:::dynamodb:putItem", "Parameters": { "TableName": "CLIENTS", "Item": { "CLIENT_ID": { "S.$": "$.CLIENT_ID" }, "CLIENT_NAME": { "S.$": "$.CLIENT_NAME" }, "SOURCE_IDENTIFIER": { "S.$": "$.SOURCE_IDENTIFIER" }, "CREATED_TIMESTAMP": { "S.$": "$.CREATED_TIMESTAMP" }, **"ENTITIES": { "S.$":"$.ENTITIES.L" }** } }, "End":true, "ResultPath":"$.DynamoDB" } } }
"Read Next Message from DynamoDB": { "Type": "Task", "Resource": "arn:aws:states:::dynamodb:getItem", "Parameters": { "TableName": "TransferDataRecords-DDBTable-3I41R5L5EAGT", "Key": { "MessageId": {"S.$": "$.List[0]"} } }, "ResultPath": "$.DynamoDB", "Next": "Send Message to SQS" },
- Create a role with policy that provides access to Dynamo DB.
- Create a state machine.
- Start execution with required inputs.
- Check for row in Dynamo DB.
Start an AWS step function workflow from Lambda
- Create a step function.
- Copy the ARN of step function.
- Create a lambda function m
- Author from scratch.
- Give a name.
- Select runtime.
- Select role which has gives access to step function using required permissions.
- Preferably use access policy based on need like specific to starting a function, full access etc.
- Create function.
- Add Code to lambda functions we are using python here.
import json import boto3 import uuid client = boto3.client('stepfunctions') def lambda_handler(event, context): #INPUT -> { "TransactionId": "foo", "Type": "PURCHASE"} transactionId = str(uuid.uuid1()) #90a0fce-sfhj45-fdsfsjh4-f23f input = {'TransactionId': transactionId, 'Type': 'PURCHASE'} response = client.start_execution( stateMachineArn='YOUR ARN HERE!', name=transactionId, input=json.dumps(input) )
- The key value for step function Instance needs to be unique.
- We cannot have two step function instances which correspond with same “key” value.
- Transaction ID is the unique “key” value here.
- Configure test event and test the function
- We can Ignore input while configuring test event.
- Go to step function and check the Executed instances Of step function.
- Click on any of the instance and see how it performs.
AWS step function Map Task
- Map task is a new feature in step functions.
- Defines a parallel running workflow To process a list of items.
- We process each of the items in a step function, Then we hand off each one to a lambda and lambda saves in database.
- Previously using Step function we had to manually create a mediator and use lambda to glue it together to logic of iterating over items but now we use the map type to automatically iterate over the list of items and process them only by one and we can define our parallelism as well.
- Create a step function with following definition.
- Below code is from a yml snippet
stepFunctions: stateMachines: fetch-pdfs-stepfn: name: fetch-pdfs-stepfn events: # step function entry event definition: StartAt: GeneratePayload States: GeneratePayload: ... # Task step MergePDFs: ... # Task Step FetchPDFs: Type: Map ItemsPath: $.payload # Array of data ResultPath: $.responses # Array of PDF files MaxConcurrency: 5 # Max number of parallel invocations Iterator: StartAt: FetchPDF States: FetchPDF: Type: Task Resource: Fn::GetAtt: - fetchPDF # Name of Lambda - Arn End: true Next: MergePDFs
- Run the function using input of pdf's it will merge them.
- We see that we get output also as a list if not changed.
EC2/ECS with step functions
- Why ECS?
- ECS is a segment engineer mean which loves containers.
- Segments love AWS.
- ECS integrates well with other AWS components
- EC2
- ELB (Elastic load-balancing)
- Autoscaling (via cloud watch alarms)
- ECR (elastic container registry)
- Cloud watch logs.
- ECS best fits in following scenarios
- Stateless services
- Tasks that can be terminated at Will.
- State full services that can run in ECS
- Kafka
- Zookeeper
- consul
- NSQD
- event de-duplication(custom)
- State full services need attention before termination.
- All queues should drain.
- All tasks should sign off and preserve quorum.
- Replacement task if any should be on standby on another instance.
- ECS instance draining using step function
- Auto scaler prepares to terminate instance.
- Instance goes into terminating: wait state
- Termination will be deferred until we complete life cycle action with continue signal.
- A state machine for scaling EC2 instance can be as follows
- Autoscaling triggers a cloud watch event or SNS event.(SNS topics are now Legacy Better to use cloud watch Events.)
- Event triggers a lambda function called as “start – ECS – drain – process”
- Lambda function triggers step function called as “ECS-drainer”.
- We can use above state machine to trigger a step function from a service.
Best practices with step functions
- Surface logic in the machine(don’t buy it in a function.
- Keep function steps simple
- Put switching logic in choice steps.
- Notify when execution time exceeds reasonable threshold.
- Don’t burry errors
- Catch when possible and notify.
Comments
Post a Comment