package ultraviolet import ( "context" "sync" "time" ) type State struct { globalCtx context.Context localCtx context.Context cancel func() mutex sync.RWMutex waitGroup sync.WaitGroup eventsIn chan any eventsOut chan any queue []any } func NewStateWithOpts(globalCtx context.Context) *State { localCtx, cancel := context.WithCancel(context.Background()) return &State{ globalCtx: globalCtx, localCtx: localCtx, cancel: cancel, eventsIn: make(chan any), eventsOut: make(chan any), queue: []any{}, } } func (this *State) Start() error { this.waitGroup.Add(2) go this.workerIn() go this.workerOut() return nil } func (this *State) Stop() error { this.cancel() close(this.eventsIn) close(this.eventsOut) this.waitGroup.Wait() return nil } func (this *State) In() chan<- any { return this.eventsIn } func (this *State) Out() <-chan any { return this.eventsOut } func (this *State) enqueue(event any) { this.mutex.Lock() defer this.mutex.Unlock() this.queue = append(this.queue, event) } func (this *State) dequeue() any { this.mutex.RLock() defer this.mutex.RUnlock() if len(this.queue) == 0 { return nil } result := this.queue[0] this.queue = this.queue[1:] return result } func (this *State) workerIn() { defer this.waitGroup.Done() for { select { case <-this.globalCtx.Done(): return case <-this.localCtx.Done(): return case event, ok := <-this.eventsIn: if !ok { return } this.enqueue(event) continue } } } func (this *State) workerOut() { defer this.waitGroup.Done() for { select { case <-this.localCtx.Done(): return case <-time.After(1 * time.Microsecond): } event := this.dequeue() if event == nil { continue } go func(event any) { this.eventsOut <- event }(event) } }