반응형
저번 게시글에서는 context를 활용하여 불필요한 트래픽을 제어하고, 클라이언트에게 즉시 리턴을 내려주는 방법에 대해 정리했습니다.
이번 게시글에서는 컨텍스트, 고루틴, 채널을 활용하여 비동기 큐를 만드는 예제입니다.
1. 요구사항
(1). 비동기 큐에 각 자릿수의 합을 구해달라는 요청을 쌓을수 있어야한다.(Producer)
(2). 요청을 FIFO형식으로 처리할 수 있는 Consumer가 있어야한다.
(3). Consumer가 증설된다면 여러 Consumer가 큐에서 데이터를 꺼내 사용할 수 있어야한다.
(4). 컨텍스트가 종료되면 모든 요청이 중지되어야한다.
2. 예제
package main
import (
"fmt"
"strconv"
"time"
)
import "golang.org/x/net/context"
// 컨슈머와 프로듀서가 통신하는 채널용 메시지입니다.
// 프로듀서는 부모
type message struct {
responseChan chan<- int
request string
ctx context.Context
cancel context.CancelFunc
}
func Consumer(messageList <-chan message) {
for job := range messageList {
select {
case <-job.ctx.Done():
fmt.Println("컨슈머: 요청을 받는도중 컨텍스트가 종료 됐습니다")
continue
default:
fmt.Printf("컨슈머: [%s]에 대한 각 자릿수의 합을 계산합니다.\n", job.request)
}
res := 0
for i := 0; i < len(job.request); i++ {
<-time.After(time.Millisecond * 100)
value, err := strconv.Atoi(string(job.request[i]))
if err != nil {
// 컨슈머가 문자열에 숫자를 담지 않는경우 해당 ctx 강제 종료
job.cancel()
break
} else {
res += value
}
}
select {
case <-job.ctx.Done():
if job.ctx.Err().Error() == "context canceled" {
fmt.Printf("컨슈머: [%s]는 올바를 요청이 아닙니다. 컨텍스트를 종료합니다.\n", job.request)
} else {
fmt.Println("컨슈머: 계산후 응답을 내려주는 도중 컨텍스트가 종료 됐습니다")
}
default:
fmt.Printf("컨슈머: [%s]에 대한 계산 완료! 채널을 통해 응답을 내려줍니다.\n", job.request)
job.responseChan <- res
}
}
}
func Producer(parentCtx context.Context, input string, queue chan<- message) {
// 자식 컨텍스트와 종료함수를 만들어 채널을 통해 컨슈머에게 전달합니다.
ctx, cancel := context.WithCancel(parentCtx)
r := make(chan int)
select {
case <-ctx.Done():
// 부모가 타임아웃에 걸리면 자식은 모두 타임아웃에 걸립니다.
fmt.Println("프로듀서: 요청이전 컨텍스트가 종료 됐습니다")
return
default:
fmt.Printf("프로듀서: 채널을 통해 [%s] 각자릿수의 합을 구하는 요청을 보냅니다.\n", input)
}
// 컨슈머에게 채널을 통해 요청을 보냅니다
queue <- message{
responseChan: r,
request: input,
ctx: ctx,
cancel: cancel,
}
// 프로듀서는 receive 채널을 이용해 응답과 컨텍스트 종료를 대기합니다.
select {
case response := <-r:
fmt.Printf("프로듀서: 채널에서 결과가 왔습니다. [%s]에 대한 각 자릿수의 합은[%d] 입니다.\n", input, response)
case <-ctx.Done():
if ctx.Err().Error() == "context canceled" {
fmt.Printf("프로듀서: 컨슈머가 강제로 컨텍스트를 종료하였습니다. [%s]에 대한 결과를 얻지 못하였습니다.\n", input)
} else {
fmt.Printf("프로듀서: 컨텍스트가 종료되어 [%s]에 대한 응답을 받지 못하였습니다.\n", input)
}
}
}
func main() {
maxQueueSize := 3
q := make(chan message, maxQueueSize)
go Consumer(q)
// go Consumer(q) 컨슈머를 여러개 만들면 큐에 쌓인 내용을 동시에 꺼내갈 수 있습니다.
ctx, _ := context.WithTimeout(context.Background(), time.Second*5)
go Producer(ctx, "333a", q)
go Producer(ctx, "101", q)
go Producer(ctx, "515", q)
select {
case <-ctx.Done():
// 해당 로직은 parent context가 timeout 걸렸을때만 동작합니다.
// child context에 cancel이 호출되어도 parent context에는 영향 X
fmt.Println("컨텍스트 타임아웃")
fmt.Println(ctx.Err())
}
// Consumer가 종료된 후 Producer가 남은 응답을 수신하기 위한 대기시간
<-time.After(time.Second * 2)
close(q)
}
output
$ go run main.go
프로듀서: 채널을 통해 [515] 각자릿수의 합을 구하는 요청을 보냅니다.
컨슈머: [515]에 대한 각 자릿수의 합을 계산합니다.
프로듀서: 채널을 통해 [101] 각자릿수의 합을 구하는 요청을 보냅니다.
프로듀서: 채널을 통해 [333a] 각자릿수의 합을 구하는 요청을 보냅니다.
컨슈머: [515]에 대한 계산 완료! 채널을 통해 응답을 내려줍니다.
프로듀서: 채널에서 결과가 왔습니다. [515]에 대한 각 자릿수의 합은[11] 입니다.
컨슈머: [101]에 대한 각 자릿수의 합을 계산합니다.
컨슈머: [101]에 대한 계산 완료! 채널을 통해 응답을 내려줍니다.
컨슈머: [333a]에 대한 각 자릿수의 합을 계산합니다.
프로듀서: 채널에서 결과가 왔습니다. [101]에 대한 각 자릿수의 합은[2] 입니다.
컨슈머: [333a]는 올바를 요청이 아닙니다. 컨텍스트를 종료합니다.
프로듀서: 컨슈머가 강제로 컨텍스트를 종료하였습니다. [333a]에 대한 결과를 얻지 못하였습니다.
컨텍스트 타임아웃
context deadline exceeded
상세 로직
1. 프로듀서는 버퍼채널(queue)에 고루틴을 활용하여 request와 receiveChannel, context를 집어넣습니다.(버퍼채널은 버퍼가 가득 찬 상태가 아니라면 수신대기가 없어도 송신시점에 블락되지 않습니다)
2. 컨슈머는 버퍼채널에 수신을 대기하며 쌓인 요청(각 자릿수의 합을 구하는)을 하나씩 꺼내갑니다.
3. 각자릿수를 더할때 숫자가 아닌 요청이 있으면 cancel을 호출하여 컨텍스트를 종료합니다.
반응형
'golang' 카테고리의 다른 글
[golang] golang profile 적용하기(viper 예제) (0) | 2021.12.28 |
---|---|
[golang] go-redis, redis-mock 사용법 및 예제(suite 사용법) (0) | 2021.12.13 |
[golang] go context의 활용1(go context사용법 및 예제1) (0) | 2021.12.07 |
[golang] go context란 (0) | 2021.12.06 |
[golang] golang 채널(channel) 사용법, 사용예제 (0) | 2021.12.06 |
댓글