| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- package ultraviolet
- import (
- "context"
- "fmt"
- "sync"
- )
- type ValueReactive struct {
- ctxGlobal context.Context
- ctxLocal context.Context
- cancel func()
- waitGroup sync.WaitGroup
- source *Source
- out <-chan any
- selector Selector
- value any
- change bool
- }
- func NewValueReactive(ctxGlobal context.Context, source *Source, selector Selector) *ValueReactive {
- ctxLocal, cancel := context.WithCancel(context.Background())
- return &ValueReactive{
- ctxGlobal: ctxGlobal,
- ctxLocal: ctxLocal,
- cancel: cancel,
- source: source,
- selector: selector,
- value: nil,
- change: true,
- }
- }
- func (this *ValueReactive) Start() error {
- this.out = this.source.Subscribe(this)
- this.waitGroup.Add(1)
- go this.worker()
- return nil
- }
- func (this *ValueReactive) Stop() error {
- err := this.source.Unsubscribe(this)
- if err != nil {
- return fmt.Errorf("can't stop reactive value: %w", err)
- }
- this.cancel()
- this.waitGroup.Wait()
- return nil
- }
- func (this *ValueReactive) Value() any {
- return this.value
- }
- func (this *ValueReactive) Invalidate() bool {
- result := this.change
- this.change = false
- return result
- }
- func (this *ValueReactive) worker() {
- defer this.waitGroup.Done()
- for {
- select {
- case <-this.ctxLocal.Done():
- return
- case event, ok := <-this.out:
- if !ok {
- return
- }
- value := this.selector(event).Value()
- if this.value != value {
- this.value = value
- this.change = true
- }
- }
- }
- }
|