cep.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package cep
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "time"
  8. fw "git.buran.team/main/fairwind"
  9. dockerpkg "git.buran.team/main/cep/docker"
  10. executorpkg "git.buran.team/main/cep/executor"
  11. schemepkg "git.buran.team/main/cep/scheme"
  12. )
  13. var ErrPlatformOutOfCapacity = errors.New("platform out of capacity")
  14. var ErrPlatformTicketNotFound = errors.New("platform ticket not found")
  15. var ErrPlatformTaskNotComplete = errors.New("platform task not complete")
  16. type CEP struct {
  17. mutex sync.Mutex
  18. waitGroup sync.WaitGroup
  19. ctxLocal context.Context
  20. ctxGlobal context.Context
  21. cancel func()
  22. global *executorpkg.Global
  23. tvm *executorpkg.TVM
  24. status map[string]*executorpkg.TaskResult
  25. }
  26. func NewCEP(ctxGlobal context.Context, log *fw.Log, capacity int, registry schemepkg.Registry) (*CEP, error) {
  27. ctxLocal, cancel := context.WithCancel(context.Background())
  28. client, err := dockerpkg.NewDocker()
  29. if err != nil {
  30. cancel()
  31. return nil, fmt.Errorf("can't create cep: %w", err)
  32. }
  33. return &CEP{
  34. ctxLocal: ctxLocal,
  35. ctxGlobal: ctxGlobal,
  36. cancel: cancel,
  37. global: &executorpkg.Global{
  38. Ctx: ctxGlobal,
  39. Log: log,
  40. Docker: client,
  41. Registry: dockerpkg.NewRegistry(
  42. ctxLocal,
  43. log,
  44. client,
  45. registry.Address,
  46. registry.Login,
  47. registry.Password,
  48. ),
  49. },
  50. tvm: executorpkg.NewTVM(capacity),
  51. status: map[string]*executorpkg.TaskResult{},
  52. }, nil
  53. }
  54. func (this *CEP) Start() error {
  55. this.global.Log.Information("starting cep")
  56. defer this.global.Log.Information("cep started")
  57. this.waitGroup.Add(1)
  58. go this.workerWatch()
  59. return nil
  60. }
  61. func (this *CEP) Stop() error {
  62. this.global.Log.Information("stopping cep")
  63. defer this.global.Log.Information("cep stopped")
  64. this.cancel()
  65. this.waitGroup.Wait()
  66. return nil
  67. }
  68. func (this *CEP) Capacity() int {
  69. return this.tvm.Capacity()
  70. }
  71. func (this *CEP) Schedule(task schemepkg.Task) (string, error) {
  72. this.mutex.Lock()
  73. defer this.mutex.Unlock()
  74. if this.tvm.Capacity() == 0 {
  75. return "", ErrPlatformOutOfCapacity
  76. }
  77. ticket, err := this.tvm.AcquireTicket()
  78. if err != nil {
  79. return "", fmt.Errorf("can't schedule task: %w", err)
  80. }
  81. this.waitGroup.Add(1)
  82. go this.workerExecute(ticket, task)
  83. return ticket.UUID, nil
  84. }
  85. func (this *CEP) Status(UUID string) (*executorpkg.TaskResult, error) {
  86. this.mutex.Lock()
  87. defer this.mutex.Unlock()
  88. if !this.tvm.HasTicket(UUID) {
  89. return nil, ErrPlatformTicketNotFound
  90. }
  91. ticket, err := this.tvm.FindTicket(UUID)
  92. if err != nil {
  93. return nil, fmt.Errorf("can't obtain task status: %w", err)
  94. }
  95. status, ok := this.status[UUID]
  96. if !ok {
  97. return nil, ErrPlatformTaskNotComplete
  98. }
  99. err = this.tvm.ReleaseTicket(ticket.UUID)
  100. if err != nil {
  101. return nil, fmt.Errorf("can't obtain task status: %w", err)
  102. }
  103. delete(this.status, ticket.UUID)
  104. return status, nil
  105. }
  106. func (this *CEP) workerExecute(ticket *executorpkg.Ticket, taskScheme schemepkg.Task) {
  107. defer this.waitGroup.Done()
  108. this.global.Log.Information("task execution started", fw.LogValue("uuid", ticket.UUID))
  109. // Create
  110. task, err := executorpkg.NewTask(this.global, ticket, taskScheme)
  111. if err != nil {
  112. this.global.Log.Information("task execution finished with errors", fw.LogValue("uuid", ticket.UUID), fw.LogError(err))
  113. return
  114. }
  115. // Execute
  116. result := task.Execute()
  117. // Store
  118. this.mutex.Lock()
  119. this.status[ticket.UUID] = result
  120. this.mutex.Unlock()
  121. // Done
  122. this.global.Log.Information("task execution finished successfully", fw.LogValue("uuid", ticket.UUID))
  123. }
  124. func (this *CEP) workerWatch() {
  125. defer this.waitGroup.Done()
  126. for {
  127. select {
  128. case <-this.ctxGlobal.Done():
  129. this.cancel()
  130. return
  131. case <-time.After(10 * time.Microsecond):
  132. continue
  133. }
  134. }
  135. }