Notifications on the channels in Golang
Notification on a channel is a common concurrency pattern in Golang. In this post I'm describing how it works and a common trap that I found with it.
Introduction to the pattern
This pattern can be used in a situation when there is some piece of work that updates a variable that then can be consumed by multiple other threads interested in this value. Imagine a family: mum is cooking a dinner, she finishes, announces to the children that the dinner is ready and the children after hearing it come and eat the dinner.
How could such a thing be implemented in Golang?
I'm assuming you are familiar with the golang channels and the closing of channels.
To implement such notification system, start with a struct:
- with a variable that will be read,
- with a channel for the notification that the value is ready.
When the value is updated, the producer thread closes the associated variableChannel. When the channel gets closed all the reading operations from it finish, so effectively the consumer threads will be notified that the thing that they were waiting for finished.
As an illustration, take a look at the Foo
struct that comes up with a number
that can later be read by 10 different goroutines when it's ready
(full example also at https://play.golang.org/p/tolS8Qgeja).
type Foo struct {
number int
numberChan chan struct{}
}
func (f *Foo) ComeUpWithANumber() {
time.Sleep(1 * time.Second)
f.number = 42
close(f.numberChan)
}
func (f *Foo) Number() (int, error) {
<-f.numberChan
return f.number, nil
}
func main() {
f := &Foo{numberChan: make(chan struct{})}
go f.ComeUpWithANumber()
for i := 0; i < 10; i++ {
go func() {
n, err := f.Number()
if err != nil {
fmt.Println("Error:", err)
} else {
fmt.Println("Number was", n)
}
}()
}
time.Sleep(2 * time.Second)
}
When you run it the result is as expected:
Number was 42
Number was 42
Number was 42
Number was 42
Number was 42
Number was 42
Number was 42
Number was 42
Number was 42
Number was 42
Ordering trap
Cool! The code works as expected. However, imagine that after some time users
of your software start complaining that sometimes Foo
spends too much time
generating the number. They request a cancellation and timeout feature from you.
Sure no problem, you add the feature. Accessing the number becomes a bit more complex (full example at: https://play.golang.org/p/9JR-OONNsH).
func (f *Foo) Number() (int, error) {
select {
case <-f.doneChan:
return 0, errors.New("canceled")
case <-f.numberChan:
return f.number, nil
}
}
You test your code and you get the unexpected result. There is a bug there!
Error: canceled
Number was 42
Error: canceled
Error: canceled
Number was 42
Number was 42
Number was 42
Error: canceled
Error: canceled
Error: canceled
This result is a surprise for you because you know that there was no timeout
here and that you close the doneChan
after you've updated the number in
your testing scenario. Just like below:
func (f *Foo) ComeUpWithANumber() {
time.Sleep(1 * time.Second)
f.number = 42
close(f.numberChan)
close(f.doneChan)
}
What happened? The answer is that the select
clause will choose any of the
"ready" channels, not necessarily the one that got ready first.
There is no assured order.
So, even if the number
is ready, before doneChan
was closed, you might end
up on the doneChan
part of select.
To fix this issue, don't assume the order of accessing the closed channels. Be
ready for any possible ordering.
The improved version of the Number
function looks as follows:
func (f *Foo) Number() (int, error) {
select {
case <-f.doneChan:
select {
case <-f.numberChan:
return f.number, nil
default:
return 0, errors.New("canceled")
}
case <-f.numberChan:
return f.number, nil
}
}
Real life example
This all might seem a bit far fetched to you. But sadly it isn't. Here is an example from the golang grpc implementation.
The original code in the implementatin of the Stream struct was using select
on multiple channels:
select {
case <-s.ctx.Done():
return nil, ContextErr(s.ctx.Err())
case <-s.goAway:
return nil, ErrStreamDrain
case <-s.headerChan:
return s.header.Copy(), nil
}
And sometimes you would receive the header value and sometimes you would not...