Worldskills 2024
Menu
Deployment
EC2
ECR
EKS
Serverless
Other
Utility
Best Practices
Home
CLI
lambda
0 Clicks
comprehend
sagemaker
bedrock
dynamoDb Put Item
dynamoDb Get Item
S3 Put Object Event Trigger
Kinesis Read Records
comprehend
Copy
import os, boto3 ESCALATION_INTENT_MESSAGE="Seems that you are having troubles with our service. Would you like to be transferred to the associate?" FULFILMENT_CLOSURE_MESSAGE="Seems that you are having troubles with our service. Let me transfer you to the associate." escalation_intent_name = os.getenv('ESCALATION_INTENT_NAME', None) client = boto3.client('comprehend') def lambda_handler(event, context): sentiment=client.detect_sentiment(Text=event['inputTranscript'],LanguageCode='en')['Sentiment'] if sentiment=='NEGATIVE': if escalation_intent_name: result = { "sessionAttributes": { "sentiment": sentiment }, "dialogAction": { "type": "ConfirmIntent", "message": { "contentType": "PlainText", "content": ESCALATION_INTENT_MESSAGE }, "intentName": escalation_intent_name } } else: result = { "sessionAttributes": { "sentiment": sentiment }, "dialogAction": { "type": "Close", "fulfillmentState": "Failed", "message": { "contentType": "PlainText", "content": FULFILMENT_CLOSURE_MESSAGE } } } else: result ={ "sessionAttributes": { "sentiment": sentiment }, "dialogAction": { "type": "Delegate", "slots" : event["currentIntent"]["slots"] } } return result
sagemaker
Copy
import os import io import boto3 import json import csv # grab environment variables ENDPOINT_NAME = "demk-asdasd" runtime= boto3.client('runtime.sagemaker') def lambda_handler(event, context): print("Received event: " + json.dumps(event, indent=2)) data = json.loads(json.dumps(event)) payload = data['data'] test = "0,14,0.0,1.0,1.0,0.0,0.0,0.0,0.0,0,0,12,1.0" response = runtime.invoke_endpoint(EndpointName=ENDPOINT_NAME, ContentType='text/csv', Body=test) print(response) result = json.loads(response['Body'].read().decode()) return result
bedrock
Copy
import json import boto3 # Bedrock client used to interact with APIs around models bedrock = boto3.client( service_name='bedrock', region_name='eu-central-1' ) # Bedrock Runtime client used to invoke and question the models bedrock_runtime = boto3.client( service_name='bedrock-runtime', region_name='eu-central-1' ) def lambda_handler(event, context): # Retrieve information about available models #foundation_models = bedrock.list_foundation_models() # Find the model named "Claude" #matching_model = next((model for model in foundation_models["modelSummaries"] if model.get("modelName") == "Claude"), None) modelId='anthropic.claude-v2:1' prompt = "Human: is touching grass healthy?Assistant:" # The payload to be provided to Bedrock body = json.dumps({ "prompt": prompt, "max_tokens_to_sample": 200, "temperature": 0.7, "top_p": 1, }) # The actual call to retrieve an answer from the model response = bedrock_runtime.invoke_model( body=body, modelId=modelId, accept='application/json', contentType='application/json' ) response_body = json.loads(response['body'].read()) return { 'statusCode': 200, 'body': json.dumps({"Answer": response_body}) }
dynamoDb Put Item
Copy
import json import boto3 import os def lambda_handler(event, context): # Initialize the DynamoDB client dynamodb = boto3.client('dynamodb') # Get the table name from environment variables table_name = "TABLE_NAME" # Construct the data with explicit DynamoDB data types data = { "id": {"S": "12345"}, # String type "name": {"S": "John Doe"}, # String type "age": {"N": "30"}, # Number type (stored as string) "is_active": {"BOOL": True}, # Boolean type "tags": {"SS": ["developer", "python", "aws"]}, # String Set type "preferences": { # Map type "M": { "newsletter": {"BOOL": False}, "notifications": {"S": "email"} } }, "scores": {"L": [ # List type {"N": "99"}, {"N": "85"}, {"N": "91"} ]} } try: # Insert the data into the DynamoDB table dynamodb.put_item(TableName=table_name, Item=data) return { 'statusCode': 200, 'body': json.dumps('Data inserted successfully') } except Exception as e: return { 'statusCode': 500, 'body': json.dumps(f'Error inserting data: {str(e)}') }
dynamoDb Get Item
Copy
import json import boto3 import os def lambda_handler(event, context): # Initialize the DynamoDB client dynamodb = boto3.client('dynamodb') # Get the table name from environment variables table_name = os.environ.get('TABLE_NAME') # Construct the key for the item you want to retrieve # The key should match the structure of your table's primary key key = { "id": {"S": "12345"} # Example: Assuming "id" is the primary key (String type) } try: # Retrieve the item from the DynamoDB table response = dynamodb.get_item(TableName=table_name, Key=key) # Check if the item exists in the response if 'Item' in response: item = response['Item'] # Convert the DynamoDB item format to a Python dictionary data = { "id": item["id"]["S"], "name": item["name"]["S"], "age": int(item["age"]["N"]), "is_active": item["is_active"]["BOOL"], "tags": item["tags"]["SS"], "preferences": { "newsletter": item["preferences"]["M"]["newsletter"]["BOOL"], "notifications": item["preferences"]["M"]["notifications"]["S"] }, "scores": [int(score["N"]) for score in item["scores"]["L"]] } return { 'statusCode': 200, 'body': json.dumps(data) } else: return { 'statusCode': 404, 'body': json.dumps('Item not found') } except Exception as e: return { 'statusCode': 500, 'body': json.dumps(f'Error retrieving item: {str(e)}') }
s3 put object event trigger
Copy
import json import boto3 # Create an S3 client s3_client = boto3.client('s3') def lambda_handler(event, context): # Extract the bucket name and object key from the S3 event bucket_name = event['Records'][0]['s3']['bucket']['name'] object_key = event['Records'][0]['s3']['object']['key'] # Print the bucket and key for logging/debugging purposes print(f"Bucket: {bucket_name}") print(f"Key: {object_key}") # Retrieve the object from S3 try: response = s3_client.get_object(Bucket=bucket_name, Key=object_key) # Read the content of the file file_content = response['Body'].read().decode('utf-8') # Print or process the content of the file print("File Content:") print(file_content) # You can also return the content if needed return { 'statusCode': 200, 'body': json.dumps({ 'message': 'File processed successfully', 'content': file_content }) } except Exception as e: print(f"Error getting object {object_key} from bucket {bucket_name}. Error: {str(e)}") return { 'statusCode': 500, 'body': json.dumps({ 'message': 'Error processing file', 'error': str(e) }) }
kinesis read stream
Copy
import base64 def lambda_handler(event, context): for record in event['Records']: try: print(f"Processed Kinesis Event - EventID: {record['eventID']}") record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8') print(f"Record Data: {record_data}") except Exception as e: print(f"An error occurred {e}") raise e print(f"Successfully processed {len(event['Records'])} records.")