package ultraviolet import ( "context" "sync" ) type Subscribers map[any]chan any type Source struct { globalCtx context.Context localCtx context.Context cancel func() mutex sync.Mutex waitGroup sync.WaitGroup queue *State subscribers Subscribers } func NewSource(globalCtx context.Context, queue *State) *Source { localCtx, cancel := context.WithCancel(context.Background()) return &Source{ globalCtx: globalCtx, localCtx: localCtx, cancel: cancel, queue: queue, subscribers: Subscribers{}, } } func (this *Source) Start() error { this.waitGroup.Add(1) go this.worker() return nil } func (this *Source) Stop() error { this.cancel() for _, subscriber := range this.subscribers { close(subscriber) } this.waitGroup.Wait() return nil } func (this *Source) Subscribe(key any) <-chan any { this.mutex.Lock() defer this.mutex.Unlock() result := make(chan any) this.subscribers[key] = result return result } func (this *Source) Unsubscribe(key any) error { this.mutex.Lock() defer this.mutex.Unlock() _, ok := this.subscribers[key] if !ok { return ErrIdInvalid } delete(this.subscribers, key) return nil } func (this *Source) worker() { defer this.waitGroup.Done() for { select { case <-this.globalCtx.Done(): return case <-this.localCtx.Done(): return case event, ok := <-this.queue.Out(): if !ok { continue } this.mutex.Lock() for _, subscriber := range this.subscribers { go func(subscriber chan any, event any) { subscriber <- event }(subscriber, event) } this.mutex.Unlock() } } }