| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- package ultraviolet
- import (
- "context"
- "sync"
- "time"
- )
- type State struct {
- globalCtx context.Context
- localCtx context.Context
- cancel func()
- mutex sync.RWMutex
- waitGroup sync.WaitGroup
- eventsIn chan any
- eventsOut chan any
- queue []any
- }
- func NewStateWithOpts(globalCtx context.Context) *State {
- localCtx, cancel := context.WithCancel(context.Background())
- return &State{
- globalCtx: globalCtx,
- localCtx: localCtx,
- cancel: cancel,
- eventsIn: make(chan any),
- eventsOut: make(chan any),
- queue: []any{},
- }
- }
- func (this *State) Start() error {
- this.waitGroup.Add(2)
- go this.workerIn()
- go this.workerOut()
- return nil
- }
- func (this *State) Stop() error {
- this.cancel()
- close(this.eventsIn)
- close(this.eventsOut)
- this.waitGroup.Wait()
- return nil
- }
- func (this *State) In() chan<- any {
- return this.eventsIn
- }
- func (this *State) Out() <-chan any {
- return this.eventsOut
- }
- func (this *State) enqueue(event any) {
- this.mutex.Lock()
- defer this.mutex.Unlock()
- this.queue = append(this.queue, event)
- }
- func (this *State) dequeue() any {
- this.mutex.RLock()
- defer this.mutex.RUnlock()
- if len(this.queue) == 0 {
- return nil
- }
- result := this.queue[0]
- this.queue = this.queue[1:]
- return result
- }
- func (this *State) workerIn() {
- defer this.waitGroup.Done()
- for {
- select {
- case <-this.globalCtx.Done():
- return
- case <-this.localCtx.Done():
- return
- case event, ok := <-this.eventsIn:
- if !ok {
- return
- }
- this.enqueue(event)
- continue
- }
- }
- }
- func (this *State) workerOut() {
- defer this.waitGroup.Done()
- for {
- select {
- case <-this.localCtx.Done():
- return
- case <-time.After(1 * time.Microsecond):
- }
- event := this.dequeue()
- if event == nil {
- continue
- }
- go func(event any) {
- this.eventsOut <- event
- }(event)
- }
- }
|