container_continious.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. package executor
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "sync/atomic"
  8. "time"
  9. dockerpkg "git.buran.team/main/cep/docker"
  10. schemepkg "git.buran.team/main/cep/scheme"
  11. fw "git.buran.team/main/fairwind"
  12. )
  13. var ErrContainerStartConditionsNotMeet = errors.New("container start conditions not meet")
  14. var ErrContainerStopConditionsNotMeet = errors.New("container stop conditions not meet")
  15. type ContiniousContainer struct {
  16. global *Global
  17. ticket *Ticket
  18. id string
  19. container *dockerpkg.Container
  20. startTimeout int
  21. stopTimeout int
  22. startConditions []Condition
  23. stopConditions []Condition
  24. }
  25. func NewContiniousContainer(
  26. global *Global,
  27. ticket *Ticket,
  28. network *Network,
  29. containerIndex int,
  30. containerScheme schemepkg.Container,
  31. ) (*ContiniousContainer, error) {
  32. variables := []dockerpkg.Variable{}
  33. for _, variable := range containerScheme.Variables {
  34. variables = append(
  35. variables,
  36. dockerpkg.Variable{
  37. Key: variable.Key,
  38. Value: variable.Value,
  39. },
  40. )
  41. }
  42. hosts := []dockerpkg.ContainerSettingsHost{}
  43. for _, host := range network.Hosts() {
  44. hosts = append(
  45. hosts,
  46. dockerpkg.ContainerSettingsHost{
  47. Name: host.Name,
  48. IP: host.IP,
  49. },
  50. )
  51. }
  52. container := dockerpkg.NewContainer(
  53. global.Ctx,
  54. global.Log,
  55. global.Docker,
  56. network.Network(),
  57. dockerpkg.NewImage(
  58. global.Ctx,
  59. global.Log,
  60. global.Docker,
  61. global.Registry,
  62. containerScheme.Image.Tag,
  63. containerScheme.Image.Version,
  64. containerScheme.Image.Pull,
  65. ),
  66. dockerpkg.ContainerSettings{
  67. Name: containerName(
  68. ticket.Index,
  69. containerIndex,
  70. ),
  71. Command: containerScheme.Command,
  72. Variables: variables,
  73. Hosts: hosts,
  74. Binds: containerScheme.Binds,
  75. Permissions: dockerpkg.ContainerSettingsPermissions{
  76. Privileged: containerScheme.Permissions.Privileged,
  77. Capabilities: containerScheme.Permissions.Capabilities,
  78. },
  79. Network: dockerpkg.ContainerSettingsNetwork{
  80. IP: containerIP(
  81. ticket.Index,
  82. containerIndex,
  83. ),
  84. },
  85. Resources: dockerpkg.ContainerResources{
  86. CPU: containerScheme.Resources.CPU,
  87. Memory: containerScheme.Resources.Memory,
  88. },
  89. },
  90. )
  91. this := &ContiniousContainer{
  92. global: global,
  93. ticket: ticket,
  94. id: containerScheme.ID,
  95. container: container,
  96. startTimeout: containerScheme.KindContinious.Start.Timeout,
  97. stopTimeout: containerScheme.KindContinious.Stop.Timeout,
  98. }
  99. startConditions := []Condition{}
  100. if containerScheme.KindContinious.Start.Conditions != nil {
  101. for _, conditionScheme := range containerScheme.KindContinious.Start.Conditions {
  102. condition, err := NewCondition(global, ticket, this, conditionScheme)
  103. if err != nil {
  104. return nil, fmt.Errorf("can't create container: %w", err)
  105. }
  106. startConditions = append(startConditions, condition)
  107. }
  108. }
  109. stopConditions := []Condition{}
  110. if containerScheme.KindContinious.Stop.Conditions != nil {
  111. for _, conditionScheme := range containerScheme.KindContinious.Stop.Conditions {
  112. condition, err := NewCondition(global, ticket, this, conditionScheme)
  113. if err != nil {
  114. return nil, fmt.Errorf("can't create container: %w", err)
  115. }
  116. stopConditions = append(stopConditions, condition)
  117. }
  118. }
  119. this.startConditions = startConditions
  120. this.stopConditions = stopConditions
  121. return this, nil
  122. }
  123. func (this *ContiniousContainer) ID() string {
  124. return this.id
  125. }
  126. func (this *ContiniousContainer) Start() *ContainerResult {
  127. var err error
  128. this.global.Log.Debug("starting container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
  129. defer func() {
  130. if err != nil {
  131. this.global.Log.Debug("container start failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id), fw.LogError(err))
  132. } else {
  133. this.global.Log.Debug("container started", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
  134. }
  135. }()
  136. err = this.container.Start()
  137. if err != nil {
  138. return NewContainerResultError(
  139. fmt.Errorf("can't start container: %w", err),
  140. )
  141. }
  142. success, conditionsStatus := checkConditions(this.startTimeout, this.startConditions)
  143. if !success {
  144. err = ErrContainerStartConditionsNotMeet
  145. return NewContainerResultError(err)
  146. }
  147. return NewContainerResultSuccess(
  148. conditionsStatus,
  149. 0,
  150. []byte{},
  151. []byte{},
  152. )
  153. }
  154. func (this *ContiniousContainer) Stop() *ContainerResult {
  155. var err error
  156. this.global.Log.Debug("stopping container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
  157. defer func() {
  158. if err != nil {
  159. this.global.Log.Debug("container stop failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id), fw.LogError(err))
  160. } else {
  161. this.global.Log.Debug("container stopped", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
  162. }
  163. }()
  164. containerStatus := this.container.Status(false) // NOTE: shutdown messages ommited
  165. if containerStatus.Error != nil {
  166. err = containerStatus.Error
  167. return NewContainerResultError(
  168. containerStatus.Error,
  169. )
  170. }
  171. err = this.container.Stop()
  172. if err != nil {
  173. return NewContainerResultError(
  174. fmt.Errorf("can't stop container: %w", err),
  175. )
  176. }
  177. success, conditionsStatus := checkConditions(this.stopTimeout, this.stopConditions)
  178. if !success {
  179. err = ErrContainerStopConditionsNotMeet
  180. return NewContainerResultError(err)
  181. }
  182. return NewContainerResultSuccess(
  183. conditionsStatus,
  184. 0,
  185. containerStatus.Stdout,
  186. containerStatus.Stderr,
  187. )
  188. }
  189. func (this *ContiniousContainer) Execute(timeout int, command string) ExecutionResult {
  190. result := this.container.Execute(timeout, command)
  191. return ExecutionResult{
  192. Error: result.Error,
  193. Code: result.Code,
  194. Stdout: result.Stdout,
  195. Stderr: result.Stderr,
  196. }
  197. }
  198. func (this *ContiniousContainer) Read(timeout int, path string) ([]byte, error) {
  199. return this.container.Read(timeout, path)
  200. }
  201. func checkConditions(timeout int, conditions []Condition) (bool, map[string]bool) {
  202. if len(conditions) == 0 {
  203. return true, map[string]bool{}
  204. }
  205. ctx, cancel := context.WithTimeout(
  206. context.Background(),
  207. time.Duration(timeout)*time.Millisecond,
  208. )
  209. defer cancel()
  210. var mutex sync.Mutex
  211. status := map[string]bool{}
  212. var successCounter atomic.Int64
  213. successCounter.Store(0)
  214. var waitGroup sync.WaitGroup
  215. waitGroup.Add(len(conditions))
  216. for _, condition := range conditions {
  217. status[condition.ID()] = false
  218. }
  219. for _, condition := range conditions {
  220. go func(condition Condition) {
  221. defer waitGroup.Done()
  222. for {
  223. select {
  224. case <-ctx.Done():
  225. return
  226. case <-time.After(100 * time.Millisecond): // NOTE: basic 100-millisecond startup delay
  227. }
  228. if !condition.Check() {
  229. continue
  230. }
  231. successCounter.Add(1)
  232. mutex.Lock()
  233. status[condition.ID()] = true
  234. mutex.Unlock()
  235. return
  236. }
  237. }(condition)
  238. }
  239. waitGroup.Wait()
  240. return successCounter.Load() == int64(len(conditions)), status
  241. }