package ultraviolet import ( "context" "fmt" "sync" ) type ValueReactive struct { ctxGlobal context.Context ctxLocal context.Context cancel func() waitGroup sync.WaitGroup source *Source out <-chan any selector Selector value any change bool } func NewValueReactive(ctxGlobal context.Context, source *Source, selector Selector) *ValueReactive { ctxLocal, cancel := context.WithCancel(context.Background()) return &ValueReactive{ ctxGlobal: ctxGlobal, ctxLocal: ctxLocal, cancel: cancel, source: source, selector: selector, value: nil, change: true, } } func (this *ValueReactive) Start() error { this.out = this.source.Subscribe(this) this.waitGroup.Add(1) go this.worker() return nil } func (this *ValueReactive) Stop() error { err := this.source.Unsubscribe(this) if err != nil { return fmt.Errorf("can't stop reactive value: %w", err) } this.cancel() this.waitGroup.Wait() return nil } func (this *ValueReactive) Value() any { return this.value } func (this *ValueReactive) Invalidate() bool { result := this.change this.change = false return result } func (this *ValueReactive) worker() { defer this.waitGroup.Done() for { select { case <-this.ctxLocal.Done(): return case event, ok := <-this.out: if !ok { return } value := this.selector(event).Value() if this.value != value { this.value = value this.change = true } } } }