본문 바로가기
개발/Golang

[Concurrency in go] 비정상 고루틴의 치료

by 상5c 2021. 11. 7.

Go 동시성 프로그래밍(concurrency in go) 책의 5장 비정상 고루틴의 치료 부분을 참고하여 스터디 발표용으로 작성한 자료입니다.
코드는 여기서 볼 수 있습니다.


⚠️ 주의!
함수 반환, 클로저 등의 설명이 발표자의 언어로 충분히 번역되지 않았습니다.

비정상 고루틴의 치료

치료

  • 고루틴이 건강한 상태인지 확인하고 건강한 상태가 아니라면 재시작하는 메커니즘
  • 치료하기 위해 생존여부를 확인하려는 목적으로 하트비트 패턴을 사용.
    • 패턴사용시 작업을 잘 수행하고 있다는 추가 정보를 포함시켜야 함.
    • 이 절에서는 단순화를 위해 고루틴의 작동여부만을 고려
  • 나쁜 상태에서 스스로 회복하는 방법을 알아내기 위한 작업을 수행하는것이 고루틴의 관심사가 되어서는 안된다.
  • 하트비트
    • 동시 프로세스가 외부로 생존 신호를 보내는 방법. 잘 진행중이고, 침묵이 예상된 것이라는 사실을 리스너에게 알리는 방법.
    • 종류는 두가지가 있다.
      1. 일정 시간마다 발생하는 하트비트. → 여기선 1번을 사용한다.
      2. 일정 단계마다 발생하는 하트비트

일정 단계마다 프로그레스바가 채워진다
컴퓨터의 사양이 좋지 않았을 땐 프로그레스 바 위에 마우스를 올려두어 상태를 체크했다.

스튜어드와 와드

  • 스튜어드(관리인, steward)
    • 와드의 하트비트를 받아 비정상 상태가 되었을 때 재시작하는 책임을 갖는다.
      • 이를 위해 해당 고루틴을 시작시킬 수 있는 함수에 대한 참조가 필요하다.
    • 치료하는 역할
  • 와드(피후견인, ward)
    • 모니터링 대상
    • 작업을 수행하는 역할

Code

  • or
    • 선착순
    • 여러 개의 채널을 한 개의 채널로 결합해, 여러 채널 중에 하나라도 닫히거나 채널에 데이터가 쓰여지면 모든 채널이 닫힌다.
    ```go
    var or func(channels ...<-chan interface{}) <-chan interface{}
    or = func(channels ...<-chan interface{}) <-chan interface{} {
        switch len(channels) {
        case 0:
            return nil
        case 1:
            return channels[0]
        }

        orDone := make(chan interface{})
        go func() {
            defer close(orDone)

            switch len(channels) {
            case 2:
                select {
                case <-channels[0]:
                case <-channels[1]:
                }
            default:
                select {
                case <-channels[0]:
                case <-channels[1]:
                case <-channels[2]:
                case <-or(append(channels[3:], orDone)...):
                }
            }
        }()
        return orDone
    }
    ```
func mRecover() {
    ... or ...

    // 모니터링 대상이 될 고루틴의 시그니처를 파라미터로 받는다.
    type startGoroutineFn func(done <-chan interface{}, pulseInterval time.Duration) (heartbeat <-chan interface{})

    newSteward := func(timeout time.Duration, ward startGoroutineFn) startGoroutineFn {
        return func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} {
            heartbeat := make(chan interface{})

            go func() {
                defer close(heartbeat)

                var wardDone chan interface{}
                var wardHeartbeat <-chan interface{}

                startWard := func() {
                    wardDone = make(chan interface{})
                    wardHeartbeat = ward(or(wardDone, done), timeout/2)
                }
                startWard()

                pulse := time.Tick(pulseInterval)

            monitorLoop:
                for {
                    timeoutSignal := time.After(timeout)

                    for {
                        select {
                        case <-pulse:
                            select {
                            case heartbeat <- struct{}{}:
                            default:
                            }
                        case <-wardHeartbeat:
                            continue monitorLoop
                        case <-timeoutSignal:
                            log.Println("[steward] : ward unhealthy; restarting (와드가 비정상, 재시작)")
                            // done채널을 사용해서 ward를 종료하고 새로 시작한다.
                            close(wardDone) 
                            startWard()
                            continue monitorLoop
                        case <-done:
                            return
                        }
                    }
                }
            }()

            return heartbeat
        }
    }

    log.SetOutput(os.Stdout)
    log.SetFlags(log.Ltime | log.LUTC)

    // 아무것도 안하고 취소만 기다린다. (펄스를 보내지 않음)
    ward := func(done <-chan interface{}, _ time.Duration) <-chan interface{} {
        log.Println("[ward] : Hello, I'm irresponsible! (응답할 수 없습니다)")
        go func() {
            <-done 
            log.Println("[ward] : I am halting.(중단합니다)")
        }()
        return nil
    }

    wardWithSteward := newSteward(4*time.Second, ward)

    // 흐름상 프로그램을 종료시키기 위해 포함되어있는 코드이다.
    done := make(chan interface{})
    time.AfterFunc(9*time.Second, func() {
        log.Println("[main] : halting steward and ward.(스튜어드와 와드 중단합니다.)")
        close(done)
    })

    // 예제가 멈추는 것을 막기 위해 스튜어드를 시작시키고 펄스들을 순회한다.
    for range wardWithSteward(done, 4*time.Second) {
    } 

    log.Println("Done")
}
  • 와드로부터 하트비트가 오지 않으면 스튜어드가 와드를 종료, 재시작한다.

코드의 개선 포인트

매번 와드에 맞게 스튜어드를 재작성하거나 생성하는 일은 번거롭고 불필요하다. 대신 클로저를 사용한다.

스튜어드는 동일하다. 와드를 감싸는 함수를 하나 더 만들어서 이를 해결한다.

  • or-done

    • 어떤 채널이 닫힌건가에 따른 동작

      • 예) done 채널이 닫혔을때, 값을 읽는 채널이 닫혔을때를 구분
      orDone := func(done, c <-chan interface{}) <-chan interface{} {
          valStream := make(chan interface{})
          go func() {
              defer close(valStream)
              for {
                  select {
                  case <-done:
                      return
                  case v, ok := <-c:
                      if ok == false {
                          return
                      }
                      select {
                      case valStream <- v:
                      case <-done:
      
                      }
                  }
              }
          }()
          return valStream
      }
  • Bridge

    • 연속된 채널의 값을 읽을때 단순한 채널로 변경
    • c ←chan ←chan interface{}
    ```go
    bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
        valStream := make(chan interface{})
        go func() {
            defer close(valStream)
            for {
                stream := make(<-chan interface{})
                select {
                case <-done:
                    return
                case maybeStream, ok := <-chanStream:
                    if ok == false {
                        return
                    }
                    stream = maybeStream
                }
                for val := range orDone(done, stream) {
                    select {
                    case <-done:
                        return
                    case valStream <- val:
                    }
                }
            }
        }()
        return valStream
    }
    ```
  • Take
    • 채널에서 n개만큼의 값을 읽고 종료
    ```go
    take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
        takeStream := make(chan interface{})
        go func() {
            defer close(takeStream)
            for i := 0; i < num; i++ {
                select {
                case <-done:
                    return
                case takeStream <- <-valueStream:
                }
            }
        }()
        return takeStream
    }
    ```
func withClosure() {
  ... 여러 함수들 ...

    doWorkFn := func(done <-chan interface{}, intList ...int) (startGoroutineFn, <-chan interface{}) {
        intChanStream := make(chan (<-chan interface{}))
        intStream := bridge(done, intChanStream)

        doWork := func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} { // 스튜어드가 시작시키고 감시할 클로저
            intStream := make(chan interface{}) // 와드의 고루틴 내에서 통신할 채널
            heartbeat := make(chan interface{})

            go func() {
                defer close(intStream)
                select {
                case intChanStream <- intStream:
                case <-done:
                    return
                }

                pulse := time.Tick(pulseInterval)

                for {
                valueLoop:
                    for _, intVal := range intList {
                        if intVal < 0 {
                            log.Printf("negative value: %v\n", intVal) // [비정상상태]를 정의한다. 음수를 받았을때
                            return
                        }

                        for {
                            select {
                            case <-pulse:
                                select {
                                case heartbeat <- struct{}{}:
                                default:
                                }

                            case intStream <- intVal:
                                continue valueLoop
                            case <-done:
                                return
                            }
                        }
                    }
                }
            }()
            return heartbeat
        }
        return doWork, intStream

    }

    log.SetOutput(os.Stdout)
    log.SetFlags(log.Ltime | log.LUTC)

    done := make(chan interface{})
    defer close(done)

    ward, intStream := doWorkFn(done, 1, 2, -1, 3, 4, 5)
    wardWithSteward := newSteward(1*time.Millisecond, ward)
    wardWithSteward(done, 1*time.Hour)

    for intVal := range take(done, intStream, 6) {
        fmt.Printf("Received: %v\n", intVal)
    }

    log.Println("Done")
}

생성기가 상태를 유지하게 만들기

valueLoop:
//for _, intVal := range intList {
for {
    intVal := intList[0]
    intList = intList[1:]

이 패턴을 사용하면 장기간 실행되는 고루틴의 상태를 정상적으로 유지하면서 지속적으로 실행되도록 할 수 있다.