task.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. package executor
  2. import (
  3. "errors"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. fw "git.buran.team/main/fairwind"
  8. schemepkg "git.buran.team/main/cep/scheme"
  9. )
  10. var ErrContainerNotFound = errors.New("container not found")
  11. type Task struct {
  12. global *Global
  13. ticket *Ticket
  14. id string
  15. network *Network
  16. containers []Container
  17. checker Checker
  18. }
  19. func NewTask(global *Global, ticket *Ticket, taskScheme schemepkg.Task) (*Task, error) {
  20. this := &Task{
  21. global: global,
  22. ticket: ticket,
  23. id: taskScheme.ID,
  24. }
  25. // Build hosts
  26. hosts := []Host{}
  27. for containerIndex, containerScheme := range taskScheme.Containers {
  28. hosts = append(
  29. hosts,
  30. Host{
  31. Name: containerScheme.ID,
  32. IP: containerIP(ticket.Index, containerIndex),
  33. },
  34. )
  35. }
  36. // Create network
  37. network, err := NewNetwork(global, ticket, hosts)
  38. if err != nil {
  39. return nil, fmt.Errorf("can't create network: %w", err)
  40. }
  41. this.network = network
  42. // Create containers
  43. containers := []Container{}
  44. for containerIndex, containerScheme := range taskScheme.Containers {
  45. container, err := NewContainer(global, ticket, network, containerIndex, containerScheme)
  46. if err != nil {
  47. return nil, fmt.Errorf("can't create container: %w", err)
  48. }
  49. containers = append(containers, container)
  50. }
  51. this.containers = containers
  52. // Create checker
  53. checker, err := NewChecker(
  54. global,
  55. ticket,
  56. func(ID string) (Container, error) {
  57. for _, container := range this.containers {
  58. if container.ID() == ID {
  59. return container, nil
  60. }
  61. }
  62. return nil, ErrContainerNotFound
  63. },
  64. taskScheme.Checker,
  65. )
  66. if err != nil {
  67. return nil, fmt.Errorf("can't create checker: %w", err)
  68. }
  69. this.checker = checker
  70. // Done
  71. return this, nil
  72. }
  73. func (this *Task) Execute() *TaskResult {
  74. // Create/Delete network
  75. this.global.Log.Information("creating network", fw.LogValue("uuid", this.ticket.UUID))
  76. err := this.network.Create()
  77. if err != nil {
  78. this.global.Log.Information("network creation failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogError(err))
  79. return NewTaskResultFailed()
  80. } else {
  81. this.global.Log.Information("network created", fw.LogValue("uuid", this.ticket.UUID))
  82. }
  83. defer func() {
  84. this.global.Log.Information("deleting network", fw.LogValue("uuid", this.ticket.UUID))
  85. err := this.network.Delete()
  86. if err != nil {
  87. this.global.Log.Information("network deletion failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogError(err))
  88. } else {
  89. this.global.Log.Information("network deleted", fw.LogValue("uuid", this.ticket.UUID))
  90. }
  91. }()
  92. containerStartResults := map[string]*ContainerResult{}
  93. start := func() bool {
  94. var errorCounter atomic.Int64
  95. var waitGroup sync.WaitGroup
  96. errorCounter.Store(0)
  97. this.global.Log.Information("starting containers", fw.LogValue("uuid", this.ticket.UUID))
  98. waitGroup.Add(len(this.containers))
  99. for _, container := range this.containers {
  100. go func(container Container) {
  101. defer waitGroup.Done()
  102. this.global.Log.Information("starting container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()))
  103. containerResult := container.Start()
  104. if containerResult.Error != nil {
  105. this.global.Log.Information("container start failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()), fw.LogError(err))
  106. errorCounter.Add(1)
  107. } else {
  108. this.global.Log.Information("container started successfully", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()))
  109. }
  110. containerStartResults[container.ID()] = containerResult
  111. }(container)
  112. }
  113. waitGroup.Wait()
  114. if errorCounter.Load() == 0 {
  115. this.global.Log.Information("containers started successfully", fw.LogValue("uuid", this.ticket.UUID))
  116. } else {
  117. this.global.Log.Information("containers started with errors", fw.LogValue("uuid", this.ticket.UUID))
  118. }
  119. return errorCounter.Load() == 0
  120. }
  121. containerStopResults := map[string]*ContainerResult{}
  122. stop := func() bool {
  123. var errorCounter atomic.Int64
  124. var waitGroup sync.WaitGroup
  125. errorCounter.Store(0)
  126. this.global.Log.Information("stopping containers", fw.LogValue("uuid", this.ticket.UUID))
  127. waitGroup.Add(len(this.containers))
  128. for _, container := range this.containers {
  129. go func(container Container) {
  130. defer waitGroup.Done()
  131. this.global.Log.Information("stopping container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()))
  132. containerResult := container.Stop()
  133. if containerResult.Error != nil {
  134. this.global.Log.Information("container stopping failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()), fw.LogError(err))
  135. errorCounter.Add(1)
  136. } else {
  137. this.global.Log.Information("container stopped successfully", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()))
  138. }
  139. containerStopResults[container.ID()] = containerResult
  140. }(container)
  141. }
  142. waitGroup.Wait()
  143. if errorCounter.Load() == 0 {
  144. this.global.Log.Information("containers stopped successfully", fw.LogValue("uuid", this.ticket.UUID))
  145. } else {
  146. this.global.Log.Information("containers stopped with errors", fw.LogValue("uuid", this.ticket.UUID))
  147. }
  148. return errorCounter.Load() == 0
  149. }
  150. // Start
  151. if !start() {
  152. if !stop() {
  153. // ...
  154. }
  155. return NewTaskResultFailed()
  156. }
  157. // Check
  158. this.global.Log.Information("starting checker", fw.LogValue("uuid", this.ticket.UUID))
  159. result := this.checker.Check()
  160. result.Start = containerStartResults
  161. this.global.Log.Information("checker finished", fw.LogValue("uuid", this.ticket.UUID))
  162. // Stop
  163. if !stop() {
  164. return result
  165. }
  166. // Done
  167. result.Stop = containerStopResults
  168. result.Clean = true
  169. return result
  170. }