반응형
이번 게시글에서는 go-sdk-v2를 사용한 athena query 질의방법에 대해 정리합니다.
소스코드는 아래 github에서 확인할 수 있습니다.
1. 패키지 구조
.
├── go.mod
├── go.sum
├── main.go
├── gateway
│ └── athena_gateway.go
└── service
└── athena_service.go
2. main.go
package main
import "athena-example/service"
func main() {
service.ExecuteQuery("SELECT * FROM your_database_name.your_table_name limit 5")
}
실행할 쿼리를 파라미터로 넘겨줍니다.
3. service/athena_service.go
package service
import (
"athena-example/gateway"
"fmt"
)
func ExecuteQuery(query string) {
executionId, err := gateway.StartQuery(query)
if err != nil {
fmt.Println(err)
return
}
err = gateway.WaitForFinish(executionId)
if err != nil {
fmt.Println(err)
return
}
err = gateway.ResultProcessing(executionId)
if err != nil {
fmt.Println(err)
return
}
}
athena쿼리를 실행하면 executionId를 받아옵니다.
해당 id를 이용해 해당 쿼리가 끝났는지를 확인한 후 쿼리가 성공적으로 종료됐을때
결과를 받아와야 합니다.
WaitForFinish메서드에서 해당 쿼리가 끝날때까지 블락되어 기다립니다.
4. gateway/athena_gateway.go
package gateway
import (
"context"
"errors"
"fmt"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/athena"
"github.com/aws/aws-sdk-go-v2/service/athena/types"
"time"
)
var (
athenaClient *athena.Client
output string
)
func init() {
cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithSharedConfigProfile("your-profile"))
if err != nil {
fmt.Println(err)
}
athenaClient = athena.NewFromConfig(cfg)
output = "s3://your-output-bucket-path"
}
func StartQuery(query string) (*string, error) {
res, err := athenaClient.StartQueryExecution(context.TODO(), &athena.StartQueryExecutionInput{
QueryString: &query,
ResultConfiguration: &types.ResultConfiguration{
OutputLocation: &output,
},
})
if err != nil {
return nil, err
}
return res.QueryExecutionId, nil
}
func WaitForFinish(executionId *string) error {
isRunning := true
for isRunning {
res, err := athenaClient.GetQueryExecution(context.TODO(), &athena.GetQueryExecutionInput{
QueryExecutionId: executionId,
})
if err != nil {
return err
}
if res.QueryExecution.Status.State == types.QueryExecutionStateFailed {
return errors.New(*res.QueryExecution.Status.AthenaError.ErrorMessage)
} else if res.QueryExecution.Status.State == types.QueryExecutionStateCancelled {
return errors.New("athena cancelled")
} else if res.QueryExecution.Status.State == types.QueryExecutionStateSucceeded {
isRunning = false
} else {
<-time.After(time.Second * 5)
}
}
return nil
}
func ResultProcessing(executionId *string) error {
res, err := athenaClient.GetQueryResults(context.TODO(), &athena.GetQueryResultsInput{
QueryExecutionId: executionId,
})
if err != nil {
return err
}
for _, row := range res.ResultSet.Rows {
for _, datum := range row.Data {
fmt.Println(*datum.VarCharValue)
}
}
return nil
}
폴링 방식이 유쾌하진 않지만 aws에서 go를 사용했다고 따로 채널로 결과를 제공해주지 않습니다.
일정 주기로 쿼리실행이 끝났는지 폴링해줍니다.
스코프의 err는 상위 패키지에서 처리하였으므로 바로 return합니다.
반응형
'golang' 카테고리의 다른 글
[golang] H4sIAAAAA??? (0) | 2022.09.19 |
---|---|
[golang] 익명함수를 고루틴으로 실행시킬때의 주의점(순서와 스택) (0) | 2022.09.08 |
[golang] aws-sdk-go-v2 aws resource 정보를 max value 이상 가져오기 (0) | 2022.04.28 |
[golang] aws s3 deletemarker 삭제하기(s3 object 복구하기) (0) | 2022.04.28 |
[golang] aws-go-sdk-v2를 사용하여 s3 bucket lifecycle 적용하기(PutBucketLifecycleConfiguration 사용법) (0) | 2022.04.15 |
댓글