Notification on a channel is a common concurrency pattern in Golang. In this post I'm describing how it works and a common trap that I found with it.

Introduction to the pattern

This pattern can be used in a situation when there is some piece of work that updates a variable that then can be consumed by multiple other threads interested in this value. Imagine a family: mum is cooking a dinner, she finishes, announces to the children that the dinner is ready and the children after hearing it come and eat the dinner.

How could such a thing be implemented in Golang?

I'm assuming you are familiar with the golang channels and the closing of channels.

To implement such notification system, start with a struct:

  • with a variable that will be read,
  • with a channel for the notification that the value is ready.

When the value is updated, the producer thread closes the associated variableChannel. When the channel gets closed all the reading operations from it finish, so effectively the consumer threads will be notified that the thing that they were waiting for finished.

As an illustration, take a look at the Foo struct that comes up with a number that can later be read by 10 different goroutines when it's ready (full example also at https://play.golang.org/p/tolS8Qgeja).

type Foo struct {
    number     int
    numberChan chan struct{}
}

func (f *Foo) ComeUpWithANumber() {
    time.Sleep(1 * time.Second)
    f.number = 42
    close(f.numberChan)
}

func (f *Foo) Number() (int, error) {
    <-f.numberChan
    return f.number, nil
}

func main() {
    f := &Foo{numberChan: make(chan struct{})}
    go f.ComeUpWithANumber()
    for i := 0; i < 10; i++ {
        go func() {
            n, err := f.Number()
            if err != nil {
                fmt.Println("Error:", err)
            } else {
                fmt.Println("Number was", n)
            }
        }()
    }
    time.Sleep(2 * time.Second)
}

When you run it the result is as expected:

Number was 42
Number was 42
Number was 42
Number was 42
Number was 42
Number was 42
Number was 42
Number was 42
Number was 42
Number was 42

Ordering trap

Cool! The code works as expected. However, imagine that after some time users of your software start complaining that sometimes Foo spends too much time generating the number. They request a cancellation and timeout feature from you.

Sure no problem, you add the feature. Accessing the number becomes a bit more complex (full example at: https://play.golang.org/p/9JR-OONNsH).

func (f *Foo) Number() (int, error) {
    select {
    case <-f.doneChan:
        return 0, errors.New("canceled")
    case <-f.numberChan:
        return f.number, nil
    }
}

You test your code and you get the unexpected result. There is a bug there!

Error: canceled
Number was 42
Error: canceled
Error: canceled
Number was 42
Number was 42
Number was 42
Error: canceled
Error: canceled
Error: canceled

This result is a surprise for you because you know that there was no timeout here and that you close the doneChan after you've updated the number in your testing scenario. Just like below:

func (f *Foo) ComeUpWithANumber() {
    time.Sleep(1 * time.Second)
    f.number = 42
    close(f.numberChan)
    close(f.doneChan)
}

What happened? The answer is that the select clause will choose any of the "ready" channels, not necessarily the one that got ready first. There is no assured order.

So, even if the number is ready, before doneChan was closed, you might end up on the doneChan part of select.

To fix this issue, don't assume the order of accessing the closed channels. Be ready for any possible ordering. The improved version of the Number function looks as follows:

func (f *Foo) Number() (int, error) {
    select {
    case <-f.doneChan:
        select {
        case <-f.numberChan:
            return f.number, nil
        default:
            return 0, errors.New("canceled")
        }
    case <-f.numberChan:
        return f.number, nil
    }
}

Real life example

This all might seem a bit far fetched to you. But sadly it isn't. Here is an example from the golang grpc implementation.

The original code in the implementatin of the Stream struct was using select on multiple channels:

select {
case <-s.ctx.Done():
    return nil, ContextErr(s.ctx.Err())
case <-s.goAway:
    return nil, ErrStreamDrain
case <-s.headerChan:
    return s.header.Copy(), nil
}

And sometimes you would receive the header value and sometimes you would not...