DynamoDB (DDB) 는 Non-sql에 Key-Value 의 형태로 Column을 형태에 상관없이 늘려가는 자유롭고 가벼운 저장과 RDMS에 비해 Serverless 형태로 access수에 비례하여 저렴한(하지만 예측하기 어려울 수 있음) 유지비용이 장점이라고 할 수 있는데요

문제는 Non-sql이다보니 검색이 자유롭지 않다는 문제가 있습니다. 좀 검색 기능에 하자가 많아요..
여기서 이번 product에 대한 AWS의 기술 조언 담당가 분하고 여러 이야기를 나눈 결과 dynamoDB + ElasticSearch 를 사용하는 것이 타당하다는 조언을 받았습니다.

dynamoDB는 stream형태로 변화를 관리할 수 있어서 여기에 추가, 삭제, 수정이 있으면 그 정보를 실시간으로 외부로 전달하여 처리하는 것이 가능합니다.

이번에는 이 DynamoDB Stream 기능을 활용하여 golang을 활용한 lambda로 Elastic Search ( AWS상에서의 서비스로는 Open Search )에 실시간 동기화 하는 과정을 적겠습니다.

import (
	"bytes"
	"context"
	"encoding/json"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
	"net/http"
	"os"
)

먼저 이번에 사용할 모듈들입니다.

bytes는 json으로 marshal하여 request의 body에 추가하기 위해서
context와 events와 lambda는 lambda 핸들러를 위해서
os는 제가 elastic search endpoint를 환경변수로 관리하여서 그 환경변수의 값을 가져오기 위해서
http는 http통신을 위해 사용합니다.

실제로는 상기의 패키지 이외에도

“github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute”
“io”

를 사용합니다. dynamodbattribute는 dynamodb특유의 데이터 타입 식별자 ( “S”, “Bool” “N” 등등 … ) 을 제거하기 위해서 unmarshal과정에서 사용합니다.
“io” 는 http의 통신 결과나 에러를 출력하기 위해서 입니다.

func HandleRequest(ctx context.Context, request events.DynamoDBEvent) {
}
func main() {
	lambda.Start(HandleRequest)
}

먼저 기본적인 lambda handler의 틀 입니다.
DynamoDB가 업데이트 되면 그 내용이 DynamoDBEvent 의 형식으로 들어오게 됩니다. 이 DynamoDBEvent는 DynamoDBEventRecord의 슬라이스(리스트) 입니다.

type DynamoDBEvent struct {
	Records []DynamoDBEventRecord `json:"Records"`
}

DynamoDBEventRecord 에는 Event가 발생한 instant에 대한 자세한 정보와 해당 Event가 INSERT인지 MODIFY인지 REMOVE인지 알려주는 EventName이 들어있습니다. 또한 변화내용을 담은 DynamoDBStreamRecord가 있습니다.

type DynamoDBEventRecord struct {
	// The region in which the GetRecords request was received.
	AWSRegion string `json:"awsRegion"`

	// The main body of the stream record, containing all of the DynamoDB-specific
	// fields.
	Change DynamoDBStreamRecord `json:"dynamodb"`

	// A globally unique identifier for the event that was recorded in this stream
	// record.
	EventID string `json:"eventID"`

	// The type of data modification that was performed on the DynamoDB table:
	//
	//    * INSERT - a new item was added to the table.
	//
	//    * MODIFY - one or more of an existing item's attributes were modified.
	//
	//    * REMOVE - the item was deleted from the table
	EventName string `json:"eventName"`

	// The AWS service from which the stream record originated. For DynamoDB Streams,
	// this is aws:dynamodb.
	EventSource string `json:"eventSource"`

	// The version number of the stream record format. This number is updated whenever
	// the structure of Record is modified.
	//
	// Client applications must not assume that eventVersion will remain at a particular
	// value, as this number is subject to change at any time. In general, eventVersion
	// will only increase as the low-level DynamoDB Streams API evolves.
	EventVersion string `json:"eventVersion"`

	// The event source ARN of DynamoDB
	EventSourceArn string `json:"eventSourceARN"` //nolint: stylecheck

	// Items that are deleted by the Time to Live process after expiration have
	// the following fields:
	//
	//    * Records[].userIdentity.type
	//
	// "Service"
	//
	//    * Records[].userIdentity.principalId
	//
	// "dynamodb.amazonaws.com"
	UserIdentity *DynamoDBUserIdentity `json:"userIdentity,omitempty"`
}

위에서 이번에 중요한 것은 EventName과 Change입니다. 먼저 Change의 타입인 DynamoDBStreamRecord 를 보면 이벤트의 발생시간, 변화한 Row의 Keys, 그리고 변경 전 후의 데이터와 크기 등등이 들어 있습니다.

type DynamoDBStreamRecord struct {

	// The approximate date and time when the stream record was created, in UNIX
	// epoch time (http://www.epochconverter.com/) format.
	ApproximateCreationDateTime SecondsEpochTime `json:"ApproximateCreationDateTime,omitempty"`

	// The primary key attribute(s) for the DynamoDB item that was modified.
	Keys map[string]DynamoDBAttributeValue `json:"Keys,omitempty"`

	// The item in the DynamoDB table as it appeared after it was modified.
	NewImage map[string]DynamoDBAttributeValue `json:"NewImage,omitempty"`

	// The item in the DynamoDB table as it appeared before it was modified.
	OldImage map[string]DynamoDBAttributeValue `json:"OldImage,omitempty"`

	// The sequence number of the stream record.
	SequenceNumber string `json:"SequenceNumber"`

	// The size of the stream record, in bytes.
	SizeBytes int64 `json:"SizeBytes"`

	// The type of data from the modified DynamoDB item that was captured in this
	// stream record.
	StreamViewType string `json:"StreamViewType"`
}

덮어쓰기 전에 이전 내용을 기록 해 두고 싶어! 하면 OldImage도 중요하지만 우선은 등록과 삭제만을 고려하기에 Key와 NewImage가 중요하게 됩니다.

DynamoDBAttributeValue는 “S” : “무언가의 문자열”, “N” : “무언가의 숫자” 등등으로 구성한 구조체입니다.

이러면 필요한 정보는 정리 했습니다. 저희는 request 라는 events.DynamoDBEvent안의 각각의 Records라는 DynamoDBEventRecord안에 있는 EventName과 change라는 DynamoDBStreamRecord안에 있는 Keys와 NewImage를 처리하게 됩니다.

우선 ES의 endpoint와 index 이름은 자유롭게 지정해주세요. ES안에 저장될 key는 DynamoDB에서의 Partition Key를 사용하도록 하겠습니다.

만약 index이름을 dev2로 정적으로 정한다면 osEndpoint := "https://" + os.Getenv("ELASTICSEARCH_URL") + "/dev2/_doc/" 와 같은 느낌이겠네요. 그리고 http 통신을 위한 client를 선언합니다. client := &http.Client{}

이 뒤에 partition key를 key로 붙여서 PUT이나 REMOVE http 통신을 하면 됩니다.

for _, record := range request.Records { 라는 for 문 안에서

p_key := record.Change.Keys["파티션키의 Column명"].String() 이걸 파티션 키라고 할 때

삭제에 대한 코드는 다음과 같아집니다.

for _, record := range request.Records {
		key := record.Change.Keys["여러분의 key의 이름"].String()
		// REMOVE
		if record.EventName == "REMOVE" {
			req, err := http.NewRequest(http.MethodDelete, osEndpoint+key, nil)
			if err != nil {
				log.Fatal(err)
			}
			resp, err := client.Do(req)
			if err != nil {
				log.Fatal(err)
			}
			log.Print(resp.Status)

		} else

REMOVE는 해당 키에 대해서 Delete Method로 http통신을 보내면 됩니다.

이번 경우에 대해 MODIFY와 INSERT는 같이 처리합니다. ( MODIFY 이전의 정보를 보관하지 않기 때문에 같은 PUT 통신만 사용 함)

 else { // INSERT OR MODIFY
			data, err := json.Marshal(record.Change.NewImage)
			req, err := http.NewRequest(http.MethodPut, osEndpoint+key, bytes.NewBuffer(data))
			if err != nil {
				log.Fatal(err)
			}
			req.Header.Set("Content-Type", "application/json")
			resp, err := client.Do(req)
			if err != nil {
				log.Fatal(err)
			}
			log.Print(resp.Status)
			bodyBytes, err := io.ReadAll(resp.Body)
			if err != nil {
				log.Fatal(err)
			}
			bodyString := string(bodyBytes)
			log.Print(bodyString)
		}
	}

NewImage의 내용을 json으로 Marshal하여 bytes.NewBuffer로 io 타입을 맞춰서 request를 작성. 이후 헤더에 json임을 알리는 헤더를 추가합니다.

혹시라도 400이나 404등의 에러에 대비하여 그 원인을 알 수 있게 io로 response의 body를 읽어서 출력하는 부분까지 작성합니다.

이후 lambda에 deploy하여 trigger에 추적을 원하는 DynamoDB를 설정해주시면 됩니다. DynamoDB에 무언가를 추가하면 Cloudwatch에서 201 Created를 확인할 수 있습니다.