reactive_source.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package ultraviolet
  2. import (
  3. "context"
  4. "sync"
  5. )
  6. type Subscribers map[any]chan any
  7. type Source struct {
  8. globalCtx context.Context
  9. localCtx context.Context
  10. cancel func()
  11. mutex sync.Mutex
  12. waitGroup sync.WaitGroup
  13. queue *State
  14. subscribers Subscribers
  15. }
  16. func NewSource(globalCtx context.Context, queue *State) *Source {
  17. localCtx, cancel := context.WithCancel(context.Background())
  18. return &Source{
  19. globalCtx: globalCtx,
  20. localCtx: localCtx,
  21. cancel: cancel,
  22. queue: queue,
  23. subscribers: Subscribers{},
  24. }
  25. }
  26. func (this *Source) Start() error {
  27. this.waitGroup.Add(1)
  28. go this.worker()
  29. return nil
  30. }
  31. func (this *Source) Stop() error {
  32. this.cancel()
  33. for _, subscriber := range this.subscribers {
  34. close(subscriber)
  35. }
  36. this.waitGroup.Wait()
  37. return nil
  38. }
  39. func (this *Source) Subscribe(key any) <-chan any {
  40. this.mutex.Lock()
  41. defer this.mutex.Unlock()
  42. result := make(chan any)
  43. this.subscribers[key] = result
  44. return result
  45. }
  46. func (this *Source) Unsubscribe(key any) error {
  47. this.mutex.Lock()
  48. defer this.mutex.Unlock()
  49. _, ok := this.subscribers[key]
  50. if !ok {
  51. return ErrIdInvalid
  52. }
  53. delete(this.subscribers, key)
  54. return nil
  55. }
  56. func (this *Source) worker() {
  57. defer this.waitGroup.Done()
  58. for {
  59. select {
  60. case <-this.globalCtx.Done():
  61. return
  62. case <-this.localCtx.Done():
  63. return
  64. case event, ok := <-this.queue.Out():
  65. if !ok {
  66. continue
  67. }
  68. this.mutex.Lock()
  69. for _, subscriber := range this.subscribers {
  70. go func(subscriber chan any, event any) {
  71. subscriber <- event
  72. }(subscriber, event)
  73. }
  74. this.mutex.Unlock()
  75. }
  76. }
  77. }