Step Functions
官方文档:https://docs.aws.amazon.com/step-functions/latest/dg/welcome.html
公司中的项目需要用到延时任务去处理业务上的问题,AWS自带的SQS消息延时队列最大的延时消息是15分钟,完全不能满足我们可以自定义或固定超过15分钟去处理延时任务
因此,领导安排我调研AWS Step Functions看看能不能解决这些问题
Step Functions 基于状态机和任务。在 Step Functions 中,工作流被称为状态机,它是一系列事件驱动的步骤。
工作流程中的每个步骤都称为状态。状态代表其他AWS服务(例如AWS Lambda)执行的工作单元。任务状态可以调用任何AWS服务或API任务
Step Functions 也提供了等待状态传入自定义时间来控制任务处理, 用来做延时任务确实符合我们的业务场景
(虽然最后没有选择使用此方案做延时任务, 因为最大的活动数量上限是10000 不能满足我们的业务量对应的需求)
使用 AWS Step Functions 做延时任务
流程步骤
- 创建lambda函数
- 创建状态机添加延时任务并且绑定lambda函数
- 执行状态机
配置本地开发环境
前置条件 安装 aws-cli, sam-cli
lambda 函数
- 创建lambda函数
sam init
- 编译打包
sam build
- 本地运行
sam local start-lambda --host 0.0.0.0
创建状态机环境
docker run --name stepfunctions -p 8083:8083 --env-file env.txt amazon/aws-stepfunctions-local
环境变量env.txt
1 2 3 4
| AWS_DEFAULT_REGION=us-east-1 AWS_ACCESS_KEY_ID=012345678901 AWS_SECRET_ACCESS_KEY=012345678901 LAMBDA_ENDPOINT=http://10.0.8.246:3001
|
创建状态机
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import json import os
states = { "Comment": "example", "StartAt": "wait_until", "States": { "wait_until" : { "Type": "Wait", "SecondsPath": "$.expirydate", "Next": "LambdaState" }, "LambdaState": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:function:HelloWorldFunction:$LATEST" }, "Next": "NextState" }, "NextState": { "Type": "Pass", "End": True } } }
states_str = (json.dumps(json.dumps(states)))
cmd = "aws stepfunctions --endpoint http://localhost:8083 create-state-machine --definition " + states_str + " --name test --role-arn arn:aws:iam::123456789012:role/DummyRole"
os.system(cmd)
|
删除状态机
aws stepfunctions --endpoint-url http://localhost:8083 start-execution --state-machine-arn arn:aws:states:us-east-1:123456789012:stateMachine:HelloWorld --input “{"a":1}”
golang执行状态机方法并且传递参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package main
import ( "fmt" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sfn" "log" )
func main() { sess, err := session.NewSession(&aws.Config{ Region: aws.String("us-east-1"), Endpoint: aws.String("http://localhost:8083"), }) if err != nil { log.Fatal(err) }
sfnClient := sfn.New(sess)
input := `{"expirydate":11, "a":1233333}`
stateMachineARN := "arn:aws:states:us-east-1:123456789012:stateMachine:test"
result, err := sfnClient.StartExecution(&sfn.StartExecutionInput{ Input: &input, StateMachineArn: &stateMachineARN, }) if err != nil { log.Fatal(err) }
fmt.Println("Execution ARN:", *result.ExecutionArn) }
|