reactive_value_reactive.go 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package ultraviolet
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. )
  7. type ValueReactive struct {
  8. ctxGlobal context.Context
  9. ctxLocal context.Context
  10. cancel func()
  11. waitGroup sync.WaitGroup
  12. source *Source
  13. out <-chan any
  14. selector Selector
  15. value any
  16. change bool
  17. }
  18. func NewValueReactive(ctxGlobal context.Context, source *Source, selector Selector) *ValueReactive {
  19. ctxLocal, cancel := context.WithCancel(context.Background())
  20. return &ValueReactive{
  21. ctxGlobal: ctxGlobal,
  22. ctxLocal: ctxLocal,
  23. cancel: cancel,
  24. source: source,
  25. selector: selector,
  26. value: nil,
  27. change: true,
  28. }
  29. }
  30. func (this *ValueReactive) Start() error {
  31. this.out = this.source.Subscribe(this)
  32. this.waitGroup.Add(1)
  33. go this.worker()
  34. return nil
  35. }
  36. func (this *ValueReactive) Stop() error {
  37. err := this.source.Unsubscribe(this)
  38. if err != nil {
  39. return fmt.Errorf("can't stop reactive value: %w", err)
  40. }
  41. this.cancel()
  42. this.waitGroup.Wait()
  43. return nil
  44. }
  45. func (this *ValueReactive) Value() any {
  46. return this.value
  47. }
  48. func (this *ValueReactive) Invalidate() bool {
  49. result := this.change
  50. this.change = false
  51. return result
  52. }
  53. func (this *ValueReactive) worker() {
  54. defer this.waitGroup.Done()
  55. for {
  56. select {
  57. case <-this.ctxLocal.Done():
  58. return
  59. case event, ok := <-this.out:
  60. if !ok {
  61. return
  62. }
  63. value := this.selector(event).Value()
  64. if this.value != value {
  65. this.value = value
  66. this.change = true
  67. }
  68. }
  69. }
  70. }