reactive_queue.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. package ultraviolet
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. )
  7. type State struct {
  8. globalCtx context.Context
  9. localCtx context.Context
  10. cancel func()
  11. mutex sync.RWMutex
  12. waitGroup sync.WaitGroup
  13. eventsIn chan any
  14. eventsOut chan any
  15. queue []any
  16. }
  17. func NewStateWithOpts(globalCtx context.Context) *State {
  18. localCtx, cancel := context.WithCancel(context.Background())
  19. return &State{
  20. globalCtx: globalCtx,
  21. localCtx: localCtx,
  22. cancel: cancel,
  23. eventsIn: make(chan any),
  24. eventsOut: make(chan any),
  25. queue: []any{},
  26. }
  27. }
  28. func (this *State) Start() error {
  29. this.waitGroup.Add(2)
  30. go this.workerIn()
  31. go this.workerOut()
  32. return nil
  33. }
  34. func (this *State) Stop() error {
  35. this.cancel()
  36. close(this.eventsIn)
  37. close(this.eventsOut)
  38. this.waitGroup.Wait()
  39. return nil
  40. }
  41. func (this *State) In() chan<- any {
  42. return this.eventsIn
  43. }
  44. func (this *State) Out() <-chan any {
  45. return this.eventsOut
  46. }
  47. func (this *State) enqueue(event any) {
  48. this.mutex.Lock()
  49. defer this.mutex.Unlock()
  50. this.queue = append(this.queue, event)
  51. }
  52. func (this *State) dequeue() any {
  53. this.mutex.RLock()
  54. defer this.mutex.RUnlock()
  55. if len(this.queue) == 0 {
  56. return nil
  57. }
  58. result := this.queue[0]
  59. this.queue = this.queue[1:]
  60. return result
  61. }
  62. func (this *State) workerIn() {
  63. defer this.waitGroup.Done()
  64. for {
  65. select {
  66. case <-this.globalCtx.Done():
  67. return
  68. case <-this.localCtx.Done():
  69. return
  70. case event, ok := <-this.eventsIn:
  71. if !ok {
  72. return
  73. }
  74. this.enqueue(event)
  75. continue
  76. }
  77. }
  78. }
  79. func (this *State) workerOut() {
  80. defer this.waitGroup.Done()
  81. for {
  82. select {
  83. case <-this.localCtx.Done():
  84. return
  85. case <-time.After(1 * time.Microsecond):
  86. }
  87. event := this.dequeue()
  88. if event == nil {
  89. continue
  90. }
  91. go func(event any) {
  92. this.eventsOut <- event
  93. }(event)
  94. }
  95. }