본문 바로가기
EFK

[EFK] kinesis firehose를 사용하여 fluent-bit 로그를 es로 보내기(datatransfer with lambda)

by devjh 2022. 6. 16.
반응형

이번 게시글에서는 kinesis firehose를 사용하여 fluent bit 로그를 es로 보내는 방법에 대해 정리합니다.

fluent bit 의 output 설정은 이전게시글을 참고해주세요

 

[devops] fluent bit을 사용해 로그를 elastic search로 보내는 방법들

이번 게시글에서는 fluent bit에서 나온 데이터를 elastic search로 보내는 방법에 대해 정리합니다. fluent-bit 사용법은 이전 게시글을 참고해주세요 [devops] fluent bit 으로 k8s(eks)로그 관리하기(bluent-bi..

frozenpond.tistory.com

 

1 .kinesis firehose란

대용량 스트리밍 데이터 처리 기능을 제공하는 aws 서비스로 kinesis delivery stream 을 생성해서 사용합니다.

kinesis data stream은 데이터를 분석, 전달 하는데, kinesis delivery stream은 데이터를 전달, 적재하는데 사용된다고 알려져 있습니다. 로그의 경우 firehose만 사용해도 충분합니다.

 

2. kinesis firehose 생성

(1). Source, Destination, 스트림 이름을 설정합니다.

direct PUT을 선택하고, 목적지로 opensearch를 선택한후 stream 이름을 지정합니다.

 

(2). transform records

불필요한 로그를 lambda에서 걸러야하거나, 추가 로직이 필요하다면

labmda function을 생성하고 해당 function을 선택해줍니다.

 

(3). Destination setting

browse버튼을 눌러 생성되어있는 elasticsearch를 선택하고

index를 지정해줍니다.

elastic search는 샤드의 크기가 너무 비대해지는걸 막기위해 보통 하루단위로 인덱스를 rotation 시킵니다.

rotation을 지정하게 되면 index-abcde-2022-08-01 등으로 인덱스가 생성됩니다. 

 

(4). backup settings

backup 정책을 설정합니다.

Failed data only를 선택하면 에러로그만 s3에 쌓이게 되고

All data를 선택하면 stream으로 들어온 데이터도 s3에 쌓이게 됩니다.

 

3. lambda 생성(필요하다면)

package main

import (
   "context"
   "github.com/aws/aws-lambda-go/events"
   "github.com/aws/aws-lambda-go/lambda"
   "log"
)

func HandleRequest(ctx context.Context, event events.KinesisFirehoseEvent) (events.KinesisFirehoseResponse, error) {
   var responseRecord []events.KinesisFirehoseResponseRecord
   for _, record := range event.Records {
      result := events.KinesisFirehoseTransformedStateOk
      log.Println(string(record.Data))
      responseRecord = append(responseRecord, events.KinesisFirehoseResponseRecord{
         RecordID: record.RecordID,
         Result:   result,
         Data:     record.Data,
         Metadata: events.KinesisFirehoseResponseRecordMetadata{
         },
      })
   }
   return events.KinesisFirehoseResponse{Records: responseRecord}, nil
}

func main() {
   lambda.Start(HandleRequest)
}

data transfer이 필요한경우 lambda를 생성해서 구독시켜야 합니다.

lambda는 메모리 사용량에 따라서 요금이 부과되니 가장 메모리를 덜 사용하고 빠른 golang을 선택했습니다.

aws-lambda-go/events에서 제공하는 형식으로 request를 받아 동일한 recordId를 지정해 스트림을 만들어 내려주면 정상 동작합니다.

소스코드는 기존에 들어왔던 record를 변경없이 그대로 내려주는 소스코드이며

데이터의 수정이 필요한경우record.Data를 언마샬링한후 수정해서 내려주도록 합니다.

 

4. elastic search 검증

데이터가 정상적으로 적재되는지는 Kibana의 DevTools에 가서 확인합니다.

dsl을 보낼수 있는 창에 query를 보내봅니다.

[ index의 전체 데이터 조회 쿼리 ]

GET your_index_name/_search
{
    "query": {
        "match_all": {}
    }
}

 

[ index의 전체 데이터 삭제 쿼리 ]

DELETE your_index_name

elastic search에서 dsl을 통해 지정한 index에 원하는 데이터가 쌓이는걸 확인했다면 시각화해줍니다.

5. 시각화

(1). 인덱스 패턴 생성

Management -> Index Patterns -> Create index pattern을 클릭합니다.

 

(2). 인덱스 패턴이름 설정

인덱스 패턴 이름을 설정하면 Discovery에서 해당 패턴이름이 뜨고 해당 index로 들어온 data를 확인할 수 있습니다.

(3) time filed 설정

time field를 생성해주면 추후 시각화했을때 로그트래픽을 막대형태로 확인할 수 있으니 셋팅해줍니다.(time field를 key로 가지지 않는 json은 해당 패턴에서 아에 확인이 불가능하니 주의)

여기까지 완료되었다면 사이드바에 있는 Discover를 눌러 시각화된 로그를 확인할 수 있습니다.

 

6. 테라폼

 

GitHub - cxcloud/terraform-kinesis-firehose-elasticsearch

Contribute to cxcloud/terraform-kinesis-firehose-elasticsearch development by creating an account on GitHub.

github.com

테라폼 모듈을 제공하는 오픈소스입니다.
해당 내용에서 필요한 부분을 참고해서 개발합니다.

반응형

댓글