Go 동시성 프로그래밍(concurrency in go) 책의 5장 비정상 고루틴의 치료 부분을 참고하여 스터디 발표용으로 작성한 자료입니다.
코드는 여기서 볼 수 있습니다.
⚠️ 주의!
함수 반환, 클로저 등의 설명이 발표자의 언어로 충분히 번역되지 않았습니다.
비정상 고루틴의 치료
치료
- 고루틴이 건강한 상태인지 확인하고 건강한 상태가 아니라면 재시작하는 메커니즘
- 치료하기 위해 생존여부를 확인하려는 목적으로
하트비트
패턴을 사용.- 패턴사용시 작업을 잘 수행하고 있다는 추가 정보를 포함시켜야 함.
- 이 절에서는 단순화를 위해 고루틴의 작동여부만을 고려
- 나쁜 상태에서 스스로 회복하는 방법을 알아내기 위한 작업을 수행하는것이 고루틴의 관심사가 되어서는 안된다.
- 하트비트
- 동시 프로세스가 외부로 생존 신호를 보내는 방법. 잘 진행중이고, 침묵이 예상된 것이라는 사실을 리스너에게 알리는 방법.
- 종류는 두가지가 있다.
- 일정 시간마다 발생하는 하트비트. → 여기선 1번을 사용한다.
- 일정 단계마다 발생하는 하트비트
스튜어드와 와드
- 스튜어드(관리인, steward)
- 와드의 하트비트를 받아 비정상 상태가 되었을 때 재시작하는 책임을 갖는다.
- 이를 위해 해당 고루틴을 시작시킬 수 있는 함수에 대한 참조가 필요하다.
- 치료하는 역할
- 와드의 하트비트를 받아 비정상 상태가 되었을 때 재시작하는 책임을 갖는다.
- 와드(피후견인, ward)
- 모니터링 대상
- 작업을 수행하는 역할
Code
- or
- 선착순
- 여러 개의 채널을 한 개의 채널로 결합해, 여러 채널 중에 하나라도 닫히거나 채널에 데이터가 쓰여지면 모든 채널이 닫힌다.
```go
var or func(channels ...<-chan interface{}) <-chan interface{}
or = func(channels ...<-chan interface{}) <-chan interface{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], orDone)...):
}
}
}()
return orDone
}
```
func mRecover() {
... or ...
// 모니터링 대상이 될 고루틴의 시그니처를 파라미터로 받는다.
type startGoroutineFn func(done <-chan interface{}, pulseInterval time.Duration) (heartbeat <-chan interface{})
newSteward := func(timeout time.Duration, ward startGoroutineFn) startGoroutineFn {
return func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} {
heartbeat := make(chan interface{})
go func() {
defer close(heartbeat)
var wardDone chan interface{}
var wardHeartbeat <-chan interface{}
startWard := func() {
wardDone = make(chan interface{})
wardHeartbeat = ward(or(wardDone, done), timeout/2)
}
startWard()
pulse := time.Tick(pulseInterval)
monitorLoop:
for {
timeoutSignal := time.After(timeout)
for {
select {
case <-pulse:
select {
case heartbeat <- struct{}{}:
default:
}
case <-wardHeartbeat:
continue monitorLoop
case <-timeoutSignal:
log.Println("[steward] : ward unhealthy; restarting (와드가 비정상, 재시작)")
// done채널을 사용해서 ward를 종료하고 새로 시작한다.
close(wardDone)
startWard()
continue monitorLoop
case <-done:
return
}
}
}
}()
return heartbeat
}
}
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)
// 아무것도 안하고 취소만 기다린다. (펄스를 보내지 않음)
ward := func(done <-chan interface{}, _ time.Duration) <-chan interface{} {
log.Println("[ward] : Hello, I'm irresponsible! (응답할 수 없습니다)")
go func() {
<-done
log.Println("[ward] : I am halting.(중단합니다)")
}()
return nil
}
wardWithSteward := newSteward(4*time.Second, ward)
// 흐름상 프로그램을 종료시키기 위해 포함되어있는 코드이다.
done := make(chan interface{})
time.AfterFunc(9*time.Second, func() {
log.Println("[main] : halting steward and ward.(스튜어드와 와드 중단합니다.)")
close(done)
})
// 예제가 멈추는 것을 막기 위해 스튜어드를 시작시키고 펄스들을 순회한다.
for range wardWithSteward(done, 4*time.Second) {
}
log.Println("Done")
}
- 와드로부터 하트비트가 오지 않으면 스튜어드가 와드를 종료, 재시작한다.
코드의 개선 포인트
매번 와드에 맞게 스튜어드를 재작성하거나 생성하는 일은 번거롭고 불필요하다. 대신 클로저를 사용한다.
- 클로저
- 생성된 곳과 동일한 주소 공간을 사용
스튜어드는 동일하다. 와드를 감싸는 함수를 하나 더 만들어서 이를 해결한다.
or-done
어떤 채널이 닫힌건가에 따른 동작
- 예) done 채널이 닫혔을때, 값을 읽는 채널이 닫혔을때를 구분
orDone := func(done, c <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { select { case <-done: return case v, ok := <-c: if ok == false { return } select { case valStream <- v: case <-done: } } } }() return valStream }
Bridge
- 연속된 채널의 값을 읽을때 단순한 채널로 변경
c ←chan ←chan interface{}
```go
bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
valStream := make(chan interface{})
go func() {
defer close(valStream)
for {
stream := make(<-chan interface{})
select {
case <-done:
return
case maybeStream, ok := <-chanStream:
if ok == false {
return
}
stream = maybeStream
}
for val := range orDone(done, stream) {
select {
case <-done:
return
case valStream <- val:
}
}
}
}()
return valStream
}
```
- Take
- 채널에서 n개만큼의 값을 읽고 종료
```go
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
```
func withClosure() {
... 여러 함수들 ...
doWorkFn := func(done <-chan interface{}, intList ...int) (startGoroutineFn, <-chan interface{}) {
intChanStream := make(chan (<-chan interface{}))
intStream := bridge(done, intChanStream)
doWork := func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} { // 스튜어드가 시작시키고 감시할 클로저
intStream := make(chan interface{}) // 와드의 고루틴 내에서 통신할 채널
heartbeat := make(chan interface{})
go func() {
defer close(intStream)
select {
case intChanStream <- intStream:
case <-done:
return
}
pulse := time.Tick(pulseInterval)
for {
valueLoop:
for _, intVal := range intList {
if intVal < 0 {
log.Printf("negative value: %v\n", intVal) // [비정상상태]를 정의한다. 음수를 받았을때
return
}
for {
select {
case <-pulse:
select {
case heartbeat <- struct{}{}:
default:
}
case intStream <- intVal:
continue valueLoop
case <-done:
return
}
}
}
}
}()
return heartbeat
}
return doWork, intStream
}
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)
done := make(chan interface{})
defer close(done)
ward, intStream := doWorkFn(done, 1, 2, -1, 3, 4, 5)
wardWithSteward := newSteward(1*time.Millisecond, ward)
wardWithSteward(done, 1*time.Hour)
for intVal := range take(done, intStream, 6) {
fmt.Printf("Received: %v\n", intVal)
}
log.Println("Done")
}
생성기가 상태를 유지하게 만들기
valueLoop:
//for _, intVal := range intList {
for {
intVal := intList[0]
intList = intList[1:]
이 패턴을 사용하면 장기간 실행되는 고루틴의 상태를 정상적으로 유지하면서 지속적으로 실행되도록 할 수 있다.