
Pub/Sub Concurrency Pattern
Table of Contents
Story time
I was trying to develop werewolf telegram bot, where i have 3 components The Game, Game Manager, and Notifier. as you may have guess already there resposiblity is:
- Game Manager: the manager of all the games,it spawn new games clean finished ones and store the stats and states on the repository layer.
- Game: It the spawned Game instance that gonna be managed by Game Manager, and it is a state machine that track and react to game states occur during the game loop and gonna send game events to the Notifier, and Game Manager.
- Notifier: is the one that listen events from the game instance and send to the telegram group or users about game phase and events occur.
NB: if you say why not just call a function of outter layers from the game instance ( which is the inner one), it would create circular dependancy as the outter layer always depend on the inner one.
One Channel Per Listener

I prefer not using one channel per listener and giving that channel to the game instance. that will create a lot of dependency as the game instance has to have all the channels created by the listeners,and that leads to very ugly code, redundant event sending logic and most importantly it is not scaleable. In ourcase it is going to be just two channels one for Game Manager and one for Notifier but, what if i want to add other clients, maybe porting it to web client i made. it would become very ugly and poor implimentation. so the only way is to create broadcaster that gonna listen from one source ( in our case The Game Instance ) and send it to the listeners (Game Manager and Notifer for now).

and that is where Pub/Sub Pattern comes into play.
Naive Pub/Sub
1package event
2
3import (
4 "sync"
5 "github.com/kaleabAlemayehu/werewolf/internal/domain"
6)
7
8type EventBroker struct {
9 subscribers []chan domain.GameEvent
10 mutex sync.RWMutex
11 isClosed bool
12}
13
14func NewEventBroker() *EventBroker {
15 return &EventBroker{}
16}
17
18func (b *EventBroker) Subscribe() chan domain.GameEvent {
19 b.mutex.Lock()
20 defer b.mutex.Unlock()
21 if b.isClosed {
22 return nil
23 }
24 ch := make(chan domain.GameEvent, 10)
25 b.subscribers = append(b.subscribers, ch)
26 return ch
27}
28
29func (b *EventBroker) Unsubscribe(channel chan domain.GameEvent) {
30 b.mutex.Lock()
31 defer b.mutex.Unlock()
32 if b.isClosed {
33 return nil
34 }
35 for i, ch := range b.subscribers {
36 if ch == channel {
37 b.subscribers = append(b.subscribers[:i], b.subscribers[i+1:]...)
38 close(ch)
39 break
40 }
41 }
42 return
43}
44
45func (b *EventBroker) Publish(event domain.GameEvent) {
46 b.mutex.RLock()
47 defer b.mutex.Unlock()
48 if b.isClosed {
49 return
50 }
51 for _, ch := range b.subscribers {
52 ch <- event
53 }
54}
55
56func (b *EventBroker) Close() {
57 b.mutex.Lock()
58 defer b.mutex.Unlock()
59 if b.isClosed {
60 return
61 }
62 for _, ch := range b.subscribers {
63 close(ch)
64 }
65}
I’m not gonna provide how i use it on Notifer and Game Manager Components as it is unnecessary and lead to bloated blog but you could use it like this.
1package main
2import (
3 "log"
4 "sync"
5 "github.com/kaleabAlemayehu/werewolf/internal/events"
6 "github.com/kaleabAlemayehu/werewolf/internal/domain"
7)
8
9func main(){
10 broker :=event.NewEventBroker()
11 ch := broker.Subscribe()
12 var wg sync.WaitGroup
13 wg.Add(1)
14 go func(){
15 defer wg.Done()
16 for event := range ch {
17 log.Printf("event occur: %v", event)
18 }
19 }()
20 // publish a couple of times
21 for i :=0; i < 5; i++{
22 broker.Publish(domain.GameEvent{
23 // here event data
24 })
25 }
26 // close the broker or the listener loop continue forever and with wg.Wait() on the bottom it lead to deadlock
27 broker.Close()
28 wg.Wait()
29}
you may have find similar implimentation from one of my favorite golang youtuber mario’s video, and he said this is code is just for demostration purpose only, to show how pub/sub works and you may also notice a couple of defects.
The publish method is blocking. what if one listener is down and stop listening from the channel it subscribe, it will lead to deadlock, this may endure until the buffer is full but it will not last long. when the buffer is full deadlock will happen, as there is lock on the top of the function body. that lock will not be unlocked until the loop is over. the loop got stuck that lead to deadlock of The EventBroker.
storing the subscribers in array instead of hashmap so that the Unsubscribe function have time complexity of n instead of 1. where n is the number of subscribers.
Let’s Address The shortcomings
Let’s address the shortcomings of the above implimentation one by one
The Publish Method
the problem with the publish method is it is blocking if one subscriber stop listening for the events. to solve this what if we create new goroutine when we try to publish an event that would be prevent blocking of other event publishings right? so we can do it like this
1func (b *EventBroker) Publish(event domain.GameEvent) {
2 b.mutex.RLock()
3 defer b.mutex.Unlock()
4 if b.isClosed {
5 return
6 }
7 for _, ch := range b.subscribers {
8 go func(c chan domain.GameEvent) {
9 c <- event:
10 }(ch)
11 }
12}
This may look like a decent solution since it prevents blocking other events when a subscriber blocks. But there’s a catch: those blocked goroutines are a time bomb, they never terminate unless the program exits, leading to goroutine leaks. Imagine if there are n failed subscribers and m events occur during the game: that would create n × m goroutines stuck forever.
Who are you talking to?
How can we solve the goroutine leaks?
The Solution is simple just like The Great Rob Pike said in this talk golang’s concurrency inspired by real world things, what would you do when someone stop listening to you, you stop talking to them right? so we will drop event that don’t have listener.
1func (b *EventBroker) Publish(event domain.GameEvent) {
2 b.mutex.RLock()
3 defer b.mutex.Unlock()
4 if b.isClosed {
5 return
6 }
7 for _, ch := range b.subscribers {
8 go func(c chan domain.GameEvent) {
9 select {
10 case c <- event:
11 default:
12 // drop the event
13 }
14 }(ch)
15 }
16}
yet there is another defect imagine creating and deleting n goroutine for every event that occur where n is the number of subscribers. it will lead to goroutine churn where the runtime gonna stress more about creating and deleting short lived goroutines then actualy doing useful jobs.
Goroutine Churn
so what is the solution? the solution for goroutine churn is to use existing one instead of creating everytime for short lived tasks infact we gonna create one goroutine that will live with the lifetime of the Eventbroker that gonna send events to the subscribers and the publisher’s task will be to forward the event to that goroutine. and that comes with restructuring the EventBroker struct, as we need channel that we gonna use to listen events from the publisher and send it to the subscribers, and also i am gonna address the previous data structure issue, we gonna store the subscribers in hashmap as we go.
1
2type EventBroker struct {
3 subscribers map[chan domain.GameEvent]struct{}
4 mutex sync.RWMutex
5 events chan domain.GameEvent
6}
7
8func NewEventBroker() *EventBroker {
9 broker := &EventBroker{
10 subscribers: make(map[chan domain.GameEvent]struct{}, 2),
11 events: make(chan domain.GameEvent, 20),
12 }
13 go broker.run()
14 return broker
15}
16
17func (b *EventBroker) run() {
18 for {
19 select {
20 case event := <-b.events:
21 b.mutex.RLock()
22 for ch := range b.subscribers {
23 select {
24 case ch <- event:
25 default:
26 // dropping it
27 }
28 }
29 b.mutex.RUnlock()
30 }
31}
32
33func (b *EventBroker) Publish(event domain.GameEvent){
34 b.mutex.RLock()
35 defer b.mutex.RUnlock()
36 b.events <- event
37}
this look good right? yeah, I know this is a lot of defects to digest but just a little bit of tweaking we need to destroy the goroutine that listen for the events and send to subscribers when the EventBroker get closed so how do goroutines communicate with each other? by channels right? we use a signal channel to indicate when the EventBroker is closed. so our new code looks like this.
1
2type EventBroker struct {
3 subscribers map[chan domain.GameEvent]struct{}
4 mutex sync.RWMutex
5 events chan domain.GameEvent
6 // done channel that gonna be used for signal for destroying the channel
7 done chan struct{}
8}
9
10func NewEventBroker() *EventBroker {
11 broker := &EventBroker{
12 subscribers: make(map[chan domain.GameEvent]struct{}, 2),
13 events: make(chan domain.GameEvent, 20),
14 done: make(chan struct{}),
15 }
16 go broker.run()
17 return broker
18}
19
20func (b *EventBroker) run() {
21 for {
22 select {
23 case event := <-b.events:
24 b.mutex.RLock()
25 for ch := range b.subscribers {
26 select {
27 case ch <- event:
28 default:
29 // drop the event
30 }
31 }
32 b.mutex.RUnlock()
33 case <-b.done:
34 return
35 }
36 }
37}
38
39func (b *EventBroker) Close() {
40 close(b.done)
41 b.mutex.Lock()
42 defer b.mutex.Unlock()
43 for ch := range b.subscribers {
44 close(ch)
45 }
46 close(b.events)
47}
Polishing it off
after i add some custom errors, integrating the new done channel and using the new datastructure ( hashmap ), and preventing closing the same channels more than once by using sync.Once
( btw it will lead to panic if you try to close already closed channel ) on our EventBroker methods. our EventBroker looks like this
1package events
2
3import (
4 "errors"
5 "sync"
6
7 "github.com/kaleabAlemayehu/werewolf/internal/domain"
8)
9
10var (
11 ErrBrokerClosed = errors.New("Event broker is blocked")
12 ErrUnknownChannel = errors.New("Unknown channel")
13)
14
15type EventBroker struct {
16 subscribers map[chan domain.GameEvent]struct{}
17 mutex sync.RWMutex
18 events chan domain.GameEvent
19 done chan struct{}
20 // prevents double closing of the channel
21 closeOnce sync.Once
22}
23
24func NewEventBroker() *EventBroker {
25 broker := &EventBroker{
26 subscribers: make(map[chan domain.GameEvent]struct{}, 2),
27 events: make(chan domain.GameEvent, 20),
28 done: make(chan struct{}),
29 }
30 go broker.run()
31 return broker
32}
33
34func (b *EventBroker) run() {
35 for {
36 select {
37 case event := <-b.events:
38 b.mutex.RLock()
39 for ch := range b.subscribers {
40 select {
41 case ch <- event:
42 default:
43 // drop the evenet
44 }
45 }
46 b.mutex.RUnlock()
47 case <-b.done:
48 return
49 }
50 }
51}
52
53func (b *EventBroker) Subscribe() (chan domain.GameEvent, error) {
54 b.mutex.Lock()
55 defer b.mutex.Unlock()
56 select {
57 case <-b.done:
58 return nil, ErrBrokerClosed
59 default:
60
61 }
62 ch := make(chan domain.GameEvent, 10)
63 b.subscribers[ch] = struct{}{}
64 return ch, nil
65}
66
67func (b *EventBroker) Publish(event domain.GameEvent) error {
68 b.mutex.RLock()
69 defer b.mutex.RUnlock()
70 select {
71 case <-b.done:
72 return ErrBrokerClosed
73 default:
74 b.events <- event
75 return nil
76 }
77}
78
79func (b *EventBroker) Unsubscribe(ch chan domain.GameEvent) error {
80 b.mutex.Lock()
81 defer b.mutex.Unlock()
82 if _, ok := b.subscribers[ch]; ok {
83 delete(b.subscribers, ch)
84 close(ch)
85 return nil
86 }
87 return ErrUnknownChannel
88}
89
90func (b *EventBroker) Close() {
91 b.closeOnce.Do(func() {
92 close(b.done)
93 b.mutex.Lock()
94 defer b.mutex.Unlock()
95 for ch := range b.subscribers {
96 close(ch)
97 }
98 close(b.events)
99 })
100
101}
Conclusion
Here we are at the end of my blog on the publisher/subscriber concurrency pattern, where I walked through how I built the event broker I use for my Werewolf Telegram bot. If you have any suggestions or better ways of doing things, I’d love to hear from you, I’m always open to learning, as Helmut Schmidt said, ‘The largest room in the world is the room for improvement.’ Since this is my first blog, I want to sincerely thank you for reading. See you in the next one!