Với các ứng dụng hiện nay, việc giao tiếp giữa client và server phổ biến đang sử dụng Rest API. Trong một số trường hợp việc phải để client đợi xử lý là điều không thể chấp nhận được. Do đó để giải quyết vấn đề này thì phần lớn cách giải quyết là sử dụng batch process, nghĩa là tại thời điểm đó chúng ta sẽ trả lại dữ liệu cho client ở trạng thái đang xử lý(tránh xảy ra tình trạng timeout). Tuy nhiên ngay tại thời điểm đó một batch process sẽ được khởi động để thực thi tiếp các công việc còn lại. Trong bài viết này tôi sẽ hướng dẫn các bạn viết batch process bằng AWS Lambda bằng cách sử dụng DynamoDB Streams
DynamoDB Streams
DynamoDB Streams là một tính năng trong DynamoDB cho phép bạn lắng nghe thay đổi trên một bảng dữ liệu nào đó và thực hiện các tác vụ đáp ứng yêu cầu nghiệp vụ trong ứng dụng của bạn. Mỗi khi có sự thay đổi DynamoDB sẽ ghi các bản ghi gần như ngay lập tức là dòng dữ liệu mà các ứng dụng đang lắng nghe.
Với DynamoDB Streams để giải quyết vấn đề timeout của API chúng ta chỉ gần ghi dữ liệu vào bảng trong DynamoDB sau đó dữ liệu được ghi lên dòng dữ liệu mà batch process của chúng ta đang lắng nghe rồi tiếp tục thực hiện nhiệm vụ còn lại.
Các bạn tham khảo link sau để cài đặt DynamoDB ở local nhé.
Định nghĩa bảng trong DynamoDB
Các bạn có thẻ sử dụng NoSQL Workbench for Amazon DynamoDB để tạo bảng hoặc viết code để chia sẻ với các member khác như sau:
# -*- coding: utf-8 -*-
import os
from datetime import datetime
import boto3
dynamodb = boto3.client(
"dynamodb",
endpoint_url="http://localhost:8000",
region_name="us-east-1",
aws_access_key_id="test",
aws_secret_access_key="test",
)
def create_orders():
try:
dynamodb.delete_table(TableName="dev_orders")
except Exception as exp:
print(exp)
response = dynamodb.create_table(
TableName="dev_orders",
AttributeDefinitions=[
{"AttributeName": "id", "AttributeType": "S"},
{"AttributeName": "status", "AttributeType": "S"},
],
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
ProvisionedThroughput={"ReadCapacityUnits": 1, "WriteCapacityUnits": 1},
GlobalSecondaryIndexes=[
{
"IndexName": "statusGSIndex",
"KeySchema": [{"AttributeName": "status", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "ALL"},
"ProvisionedThroughput": {
"ReadCapacityUnits": 1,
"WriteCapacityUnits": 1,
},
},
],
# bắt buộc phải có khai báo này để sử dụng DynamoDB Streams cho bảng này
StreamSpecification={
"StreamEnabled": True,
"StreamViewType": "NEW_AND_OLD_IMAGES",
},
)
print(response)
create_orders()
Kiểm tra bảng được tạo bằng NoSQL Workbench for Amazon DynamoDB
Các bạn chú ý giá trị bôi vàng nhé. Đây là dòng dữ liệu sẽ được DynamoDB ghi lên đó. Khi ứng dụng của bạn lắng nghe dòng dữ liệu này thì bất kỳ hành động nào xảy ra trên bảng sẽ được ghi lên dòng dữ liệu và ứng dụng của chúng ta sẽ phát hiện được điều đó.
Định nghĩa Lambda lắng nghe dòng dữ liệu từ DynamoDB
Để lắng nghe dòng dữ liệu từ DynamoDB Streams bạn cần thêm serverless-offline-dynamodb-streams
và cấu hìn serverless.yml
như sau:
custom:
# ...
serverless-offline-dynamodb-streams:
endpoint: http://dynamodb:8000
accessKeyId: root
secretAccessKey: root
# ...
plugins:
- serverless-offline
- serverless-python-requirements
- serverless-offline-dynamodb-streams
Các bạn tham khảo bài viết Mô phỏng AWS Lambda & API Gateway bằng Serverless Offline để biết các viết API bằng Lambda nhé.
Trong bài viết này, dể thực hiện lắng nghe dòng dữ liệu, bạn định nghĩa hàm Lambda trong Serverless như sau:
jobs_order:
handler: src.jobs.order.lambda_handler
events:
- stream:
enabled: true
type: dynamodb
# đây là giá trị màu vàng tôi có đề cập ở trên
arn: arn:aws:dynamodb:ddblocal:000000000000:table/dev_orders/stream/2020-03-15T07:59:46.532
batchSize: 1
Thử viết Rest API ghi dữ liệu vào bảng và kiểm tra DynamoDB Streams
Các bạn định nghĩa một API như sau:
functions:
post_orders:
handler: src.api.post_orders.lambda_handler
events:
- http:
method: post
path: api/orders
cors: true
src/api/post_orders.py
import json
import logging
from datetime import datetime
from uuid import uuid4
import boto3
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
def lambda_handler(event, context):
headers = {"Access-Control-Allow-Origin": "*", "Accept": "application/json"}
body = json.loads(event["body"])
dynamodb = boto3.client(
"dynamodb",
endpoint_url="http://localhost:8000",
region_name="us-east-1",
aws_access_key_id="test",
aws_secret_access_key="test",
)
now = int(datetime.utcnow().timestamp())
body = dynamodb.put_item(
TableName="dev_orders",
Item={
"id": {"S": str(uuid4())},
"name": {"S": body["name"]},
"status": {"S": " "},
"created_at": {"N": str(now)},
"updated_at": {"N": str(now)},
},
)
return {
"statusCode": 200,
"headers": headers,
"body": json.dumps(body),
}
Các bạn thử post dữ liệu bằng Postman nhé
Các bạn để ý Terminal sau khi post dữ liệu nhé
Cám ơn các bạn đã theo dõi bài viết. Hy vọng bài viết đã giúp các bạn có thể sử dùng Lambda và DynamoDB tốt hơn.