I need some aws EventBridge CDK deployment- Python example ?
Did anyone has some code ?
The following example is a well working Python CDK code.
The example demonstrates how to deploy an AWS EventBridge that simultaneously triggers a Step Function and a Lambda Function using AWS CDK (Cloud Deployment Kit).
by yl.
Pre-requisite:
Python 3.7
AWS CDK
Steps:
1) Deploys a lambda function
2) Prepares resources: EventBus, EventPattern (an EventBridge filter will be explained later on)
3) Creates a Step functions StateMachine
4) Creates rule using resources: EventBus, EventPattern, Targets: Lambda and State-Machine
from aws_cdk import aws_events as events
from aws_cdk import aws_events_targets as targets
from aws_cdk import (
aws_lambda as _lambda,
aws_stepfunctions as sfn,
aws_stepfunctions_tasks as sfn_tasks
)
from aws_cdk import core
import typing
class CdkEventBridgeStack(core.Stack):
def __init__(self, scope: core.Construct, id: str, **kwargs) -> None:
super().__init__(scope, id, **kwargs)
# 1) deploy lambda function
base_lambda = CdkEventBridgeStack.lambdaDeploy(self)
# 2) prepare resources EventBus, EventPattern, Rule-Target
eventBus, eventPattern, lambdaTarget1 = CdkEventBridgeStack.creEventBusAndEventPatternAndLambdaTarget(self, base_lambda)
# 3) create step functions StateMachine
stateMachineTarget = CdkEventBridgeStack.createStateMachine(self, base_lambda)
# 4) create rule using resources: EventBus, EventPattern, Rule-Targets: Lambda and State-Machine
CdkEventBridgeStack.createRule(self, [lambdaTarget1, stateMachineTarget], eventBus, eventPattern)
# -----------------------------------------------------------------------------------
@staticmethod
def lambdaDeploy(this) -> None:
base_lambda = _lambda.Function(this, 'MyLambda',
function_name='MyLambda1',
handler='testTargtetLambda.lambda_handler',
runtime=_lambda.Runtime.PYTHON_3_7,
code=_lambda.Code.asset('cdk_event_bridge/functions'),
)
return base_lambda
# -----------------------------------------------------------------------------------
@staticmethod
def creEventBusAndEventPatternAndLambdaTarget(this, base_lambda: _lambda.IFunction):
eventBus = events.EventBus(scope=this, id="orders1", event_bus_name="orders2")
eventPattern = events.EventPattern(source=['XXX'])
lambdaTarget1 = targets.LambdaFunction(handler=base_lambda)
return eventBus, eventPattern, lambdaTarget1
# -----------------------------------------------------------------------------------
@staticmethod
def createRule(this, targetsList : typing.Optional[typing.List["IRuleTarget"]] = None, eventBus: typing.Optional["IEventBus"]=None, eventPattern: typing.Optional["EventPattern"]=None):
events.Rule(scope=this,
id="x123",
rule_name="routeToLambda",
targets=targetsList,
description="cdk Test Rule navigates to Lambda",
event_bus=eventBus,
event_pattern=eventPattern,
)
# -----------------------------------------------------------------------------------
@staticmethod
def createStateMachine(this, lambda_function: _lambda.IFunction):
submit_job_activity = sfn.Activity(
this, "SubmitJob"
)
check_job_activity = sfn.Activity(
this, "CheckJob"
)
submit_job = sfn.Task(
this, "Submit Job",
# task=sfn_tasks.InvokeActivity(submit_job_activity),
task=sfn_tasks.InvokeFunction(lambda_function),
result_path="$.guid",
)
get_status = sfn.Task(
this, "Get Job Status",
# task=sfn_tasks.InvokeActivity(check_job_activity),
task=sfn_tasks.InvokeFunction(lambda_function),
input_path="$.guid",
result_path="$.status",
)
finalStatus = sfn.Succeed(
this, 'Final Job Status'
)
# definition = submit_job.next(get_status).end_states(sfn.Succeed())
definition = submit_job\
.next(get_status) ##.next(finalStatus)
machineHandler = sfn.StateMachine(
this, "StateMachine222",
definition=definition,
state_machine_name="CdkTestStateMachine",
timeout=core.Duration.seconds(30),
)
# machine: aws_cdk.aws_stepfunctions.IStateMachine
stateMachineTarget = targets.SfnStateMachine(machine=machineHandler)
return stateMachineTarget
# -----------------------------------------------------------------------------------
Note:
code=_lambda.Code.asset('cdk_event_bridge/functions'),
on the Lambda function deployment
base_lambda = _lambda.Function(this, 'MyLambda',
function_name='MyLambda1',
handler='testTargtetLambda.lambda_handler',
runtime=_lambda.Runtime.PYTHON_3_7,
code=_lambda.Code.asset('cdk_event_bridge/functions'),
)
The _lambda.Code.asset points to the 'functions' folder where the lambda code is located when the folder hierarchy is:
/cdk_event_bridge+
+-cdk_event_bridge_stack.py <= this cdk deployment script
+-/functions/+ <= a folder
+-testTargtetLambda.py <= Lambda code
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With