玩转Golang的channel,二百行代码实现PubSub模式

两百行代码实现生产级PubSub工具类

引言

PubSub(Publish/Subscribe)模式,,意为“发布/订阅”模式,是为了解决一对多的依赖关系,使多个消费者同时监听某一个主题,不仅可以让生产者和消费者解耦,同时也让不同的消费者之间相互解耦(注:有些反模式依赖订阅者执行的先后顺序,使用共享数据来传递状态,是需要避免的,因为这样会使消费者耦合在一起,不能独立变化)。这其中的关键就在于需要有中介来维护订阅关系,并负责把生产的消息,传递给订阅方。

在 Golang 这门语言中,channel 天然就适合来当这个中介,下面就让我们一步步根据 PubSub 模式,实现工具类 EventBus.

定义类型

首先,让我们先定义一些基本类型和核心操作。

//EventID是Event的唯一标识
type EventID int64

//Event
type Event interface {
	ID() EventID
}

//EventHandler
type EventHandler interface {
	OnEvent(ctx context.Context, event Event) error
	CanAutoRetry(err error) bool
}

// JobStatus holds information related to a job status
type JobStatus struct {
	RunAt      time.Time
	FinishedAt time.Time
	Err        error
}

//EventBus ...
type EventBus struct {}

func (eb *EventBus) Subscribe(eventID EventID, handlers ...EventHandler) { }

func (eb *EventBus) Unsubscribe(eventID EventID, handlers ...EventHandler) { }

func (eb *EventBus) Publish(evt Event) <-chan JobStatus { }

重点拆解

首先,消费者要通过 Subscribe 来订阅相关的主题,这其中的重点就是需要根据 EventID 维护订阅的消费者,很自然的想到 map,我们选择用handlers map[EventID][]EventHandler来维护,考虑到并发问题,还需要来加个锁。

//Subscribe ...
func (eb *EventBus) Subscribe(eventID EventID, handlers ...EventHandler) {
	eb.mu.Lock()
	defer eb.mu.Unlock()

	eb.handlers[eventID] = append(eb.handlers[eventID], handlers...)
}

这里实现的比较简单,没有考虑一个消费者,重复订阅的问题,留给了使用方自己处理。(但同一个消费者为什么要多次调用 subcribe,订阅同一个主题呢,感觉是在写 bug

下面就是最核心的 Publish 函数了,一方面一定是需要一个 channel(最好是有 buffer 的)来传递 Event 数据,另一方面,为了保证性能,需要有一些常驻协程,来监听消息,并启动相关的消费者。以下是相关代码(在完整版代码里,添加了日志、错误处理等,这里为了展示重点,暂且隐去)

func (eb *EventBus) Start() {
	if eb.started {
		return
	}

	for i := 0; i < eb.eventWorkers; i++ {
		eb.wg.Add(1)
		go eb.eventWorker(eb.eventJobQueue)
	}

	eb.started = true
}


func (eb *EventBus) eventWorker(jobQueue <-chan EventJob) {
loop:
	for {
		select {
		case job := <-jobQueue:
			jobStatus := JobStatus{
				RunAt: time.Now(),
			}

			ctx, cancel := context.WithTimeout(context.Background(), eb.timeout)
			g, _ := errgroup.WithContext(ctx)
			for index := range job.handlers {
				handler := job.handlers[index]
				g.Go(func() error {
					return eb.runHandler(ctx, handler, job.event)
				})
			}
			jobStatus.Err = g.Wait()

			jobStatus.FinishedAt = time.Now()

			select {
			case job.resultChan <- jobStatus:
			default:
			}
		    cancel()
		}
	}
}

做好上面的准备工作后,以下就是真正的 Publish 代码了。

// EventJob ...
type EventJob struct {
	event      Event
	handlers   []EventHandler
	resultChan chan JobStatus
}

//Publish ...
func (eb *EventBus) Publish(evt Event) <-chan JobStatus {
	eb.mu.RLock()
	defer eb.mu.RUnlock()
	if ehs, ok := eb.handlers[evt.ID()]; ok {
		handlers := make([]EventHandler, len(ehs))
		copy(handlers, ehs) //snapshot一份当时的消费者
		job := EventJob{
			event:      evt,
			handlers:   handlers,
			resultChan: make(chan JobStatus, 1),
		}

		var jobQueue = eb.eventJobQueue
		select {
		case jobQueue <- job:
		default:
		}

		return job.resultChan
	} else {
		err := fmt.Errorf("no handlers for event(%d)", evt.ID())
		resultChan := make(chan JobStatus, 1)
		resultChan <- JobStatus{
			Err: err,
		}
		return resultChan
	}
}

这里没有在 eventWorker 中直接从 handlers 中根据 ID 拿到相关的消费者,一方面是为了让 eventWorker 更加通用,另一方面也是为减少因为锁操作引起的阻塞。

至此,我们已经把最核心的代码一一拆解完成,完整代码,请参见 channelx 项目中的event_bus.go

使用示例

没有例子的工具类是不完整的,下面就提供一个例子。

先定义一个 event,这里把id定义成私有的,然后在构造函数中,强制指定。

const ExampleEventID channelx.EventID = 1

type ExampleEvent struct {
	id channelx.EventID
}

func NewExampleEvent() ExampleEvent {
	return ExampleEvent{id:ExampleEventID}
}

func (evt ExampleEvent) ID() channelx.EventID  {
	return evt.id
}

接下来是 event handler,需要根据实际的需要,在OnEvent中检查接收到的事件是否是订阅的事件,以及接收到事件结构是否能转换成特定的类型。在防御编程后,就可以处理事件逻辑了。

type ExampleHandler struct {
	logger channelx.Logger
}

func NewExampleHandler(logger channelx.Logger) *ExampleHandler {
	return &ExampleHandler{
		logger: logger,
	}
}

func (h ExampleHandler) Logger() channelx.Logger{
	return h.logger
}

func (h ExampleHandler) CanAutoRetry(err error) bool {
	return false
}

func (h ExampleHandler) OnEvent(ctx context.Context, event channelx.Event) error {
	if event.ID() != ExampleEventID {
		return fmt.Errorf("subscribe wrong event(%d)", event.ID())
	}

	_, ok := event.(ExampleEvent)
	if !ok {
		return fmt.Errorf("failed to convert received event to ExampleEvent")
	}

    // handle the event here
	h.Logger().Infof("event handled")

	return nil
}

最后,就是EventBus的启动,事件的订阅和发布了。

eventBus := channelx.NewEventBus(logger, "test", 4,4,2, time.Second, 5 * time.Second)
eventBus.Start()

handler := NewExampleHandler(logger)
eventBus.Subscribe(ExampleEventID, handler)
eventBus.Publish(NewExampleEvent())