| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- package ultraviolet
- import (
- "context"
- "sync"
- )
- type Subscribers map[any]chan any
- type Source struct {
- globalCtx context.Context
- localCtx context.Context
- cancel func()
- mutex sync.Mutex
- waitGroup sync.WaitGroup
- queue *State
- subscribers Subscribers
- }
- func NewSource(globalCtx context.Context, queue *State) *Source {
- localCtx, cancel := context.WithCancel(context.Background())
- return &Source{
- globalCtx: globalCtx,
- localCtx: localCtx,
- cancel: cancel,
- queue: queue,
- subscribers: Subscribers{},
- }
- }
- func (this *Source) Start() error {
- this.waitGroup.Add(1)
- go this.worker()
- return nil
- }
- func (this *Source) Stop() error {
- this.cancel()
- for _, subscriber := range this.subscribers {
- close(subscriber)
- }
- this.waitGroup.Wait()
- return nil
- }
- func (this *Source) Subscribe(key any) <-chan any {
- this.mutex.Lock()
- defer this.mutex.Unlock()
- result := make(chan any)
- this.subscribers[key] = result
- return result
- }
- func (this *Source) Unsubscribe(key any) error {
- this.mutex.Lock()
- defer this.mutex.Unlock()
- _, ok := this.subscribers[key]
- if !ok {
- return ErrIdInvalid
- }
- delete(this.subscribers, key)
- return nil
- }
- func (this *Source) worker() {
- defer this.waitGroup.Done()
- for {
- select {
- case <-this.globalCtx.Done():
- return
- case <-this.localCtx.Done():
- return
- case event, ok := <-this.queue.Out():
- if !ok {
- continue
- }
- this.mutex.Lock()
- for _, subscriber := range this.subscribers {
- go func(subscriber chan any, event any) {
- subscriber <- event
- }(subscriber, event)
- }
- this.mutex.Unlock()
- }
- }
- }
|