본문 바로가기
golang

[golang] go-sdk-v2 athena 사용법(go-sdk-v2 athena example)

by devjh 2022. 9. 7.
반응형

이번 게시글에서는 go-sdk-v2를 사용한 athena query 질의방법에 대해 정리합니다.

소스코드는 아래 github에서 확인할 수 있습니다.

 

GitHub - jaeho310/go-sdk-v2-athena-example

Contribute to jaeho310/go-sdk-v2-athena-example development by creating an account on GitHub.

github.com

1. 패키지 구조

.
├── go.mod
├── go.sum
├── main.go
├── gateway
│   └── athena_gateway.go
└── service
    └── athena_service.go

 

2. main.go

package main

import "athena-example/service"

func main() {
   service.ExecuteQuery("SELECT * FROM your_database_name.your_table_name limit 5")
}

실행할 쿼리를 파라미터로 넘겨줍니다.

 

3. service/athena_service.go

package service

import (
	"athena-example/gateway"
	"fmt"
)

func ExecuteQuery(query string) {
	executionId, err := gateway.StartQuery(query)
	if err != nil {
		fmt.Println(err)
		return
	}
	err = gateway.WaitForFinish(executionId)
	if err != nil {
		fmt.Println(err)
		return
	}
	err = gateway.ResultProcessing(executionId)
	if err != nil {
		fmt.Println(err)
		return
	}
}

 

athena쿼리를 실행하면 executionId를 받아옵니다.
해당 id를 이용해 해당 쿼리가 끝났는지를 확인한 후 쿼리가 성공적으로 종료됐을때
결과를 받아와야 합니다.
WaitForFinish메서드에서 해당 쿼리가 끝날때까지 블락되어 기다립니다.

 

4. gateway/athena_gateway.go

package gateway

import (
   "context"
   "errors"
   "fmt"
   "github.com/aws/aws-sdk-go-v2/config"
   "github.com/aws/aws-sdk-go-v2/service/athena"
   "github.com/aws/aws-sdk-go-v2/service/athena/types"
   "time"
)

var (
   athenaClient *athena.Client
   output       string
)

func init() {
   cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithSharedConfigProfile("your-profile"))
   if err != nil {
      fmt.Println(err)
   }
   athenaClient = athena.NewFromConfig(cfg)
   output = "s3://your-output-bucket-path"
}

func StartQuery(query string) (*string, error) {
   res, err := athenaClient.StartQueryExecution(context.TODO(), &athena.StartQueryExecutionInput{
      QueryString: &query,
      ResultConfiguration: &types.ResultConfiguration{
         OutputLocation: &output,
      },
   })
   if err != nil {
      return nil, err
   }
   return res.QueryExecutionId, nil
}

func WaitForFinish(executionId *string) error {
   isRunning := true
   for isRunning {
      res, err := athenaClient.GetQueryExecution(context.TODO(), &athena.GetQueryExecutionInput{
         QueryExecutionId: executionId,
      })
      if err != nil {
         return err
      }
      if res.QueryExecution.Status.State == types.QueryExecutionStateFailed {
         return errors.New(*res.QueryExecution.Status.AthenaError.ErrorMessage)
      } else if res.QueryExecution.Status.State == types.QueryExecutionStateCancelled {
         return errors.New("athena cancelled")
      } else if res.QueryExecution.Status.State == types.QueryExecutionStateSucceeded {
         isRunning = false
      } else {
         <-time.After(time.Second * 5)
      }
   }
   return nil
}

func ResultProcessing(executionId *string) error {
   res, err := athenaClient.GetQueryResults(context.TODO(), &athena.GetQueryResultsInput{
      QueryExecutionId: executionId,
   })
   if err != nil {
      return err
   }
   for _, row := range res.ResultSet.Rows {
      for _, datum := range row.Data {
         fmt.Println(*datum.VarCharValue)
      }
   }
   return nil
}

폴링 방식이 유쾌하진 않지만 aws에서 go를 사용했다고 따로 채널로 결과를 제공해주지 않습니다.
일정 주기로 쿼리실행이 끝났는지 폴링해줍니다.
스코프의 err는 상위 패키지에서 처리하였으므로 바로 return합니다.

반응형

댓글