container_oneshot.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. package executor
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "time"
  7. dockerpkg "git.buran.team/main/cep/docker"
  8. schemepkg "git.buran.team/main/cep/scheme"
  9. fw "git.buran.team/main/fairwind"
  10. )
  11. var ErrContainerContextCancelled = errors.New("container context cancelled")
  12. var ErrContainerContextTimeout = errors.New("container context timeout")
  13. var ErrContainerResultInvalid = errors.New("container result invalid")
  14. // TODO: add start/stop conditions
  15. type OneshotContainer struct {
  16. global *Global
  17. ticket *Ticket
  18. id string
  19. container *dockerpkg.Container
  20. timeout int
  21. result *Result
  22. }
  23. func NewOneshotContainer(
  24. global *Global,
  25. ticket *Ticket,
  26. network *Network,
  27. containerIndex int,
  28. containerScheme schemepkg.Container,
  29. ) (*OneshotContainer, error) {
  30. varialbles := []dockerpkg.Variable{}
  31. for _, variable := range containerScheme.Variables {
  32. varialbles = append(
  33. varialbles,
  34. dockerpkg.Variable{
  35. Key: variable.Key,
  36. Value: variable.Value,
  37. },
  38. )
  39. }
  40. hosts := []dockerpkg.ContainerSettingsHost{}
  41. for _, host := range network.Hosts() {
  42. hosts = append(
  43. hosts,
  44. dockerpkg.ContainerSettingsHost{
  45. Name: host.Name,
  46. IP: host.IP,
  47. },
  48. )
  49. }
  50. container := dockerpkg.NewContainer(
  51. global.Ctx,
  52. global.Log,
  53. global.Docker,
  54. network.Network(),
  55. dockerpkg.NewImage(
  56. global.Ctx,
  57. global.Log,
  58. global.Docker,
  59. global.Registry,
  60. containerScheme.Image.Tag,
  61. containerScheme.Image.Version,
  62. containerScheme.Image.Pull,
  63. ),
  64. dockerpkg.ContainerSettings{
  65. Name: containerName(
  66. ticket.Index,
  67. containerIndex,
  68. ),
  69. Command: containerScheme.Command,
  70. Variables: varialbles,
  71. Hosts: hosts,
  72. Binds: containerScheme.Binds,
  73. Permissions: dockerpkg.ContainerSettingsPermissions{
  74. Privileged: containerScheme.Permissions.Privileged,
  75. Capabilities: containerScheme.Permissions.Capabilities,
  76. },
  77. Network: dockerpkg.ContainerSettingsNetwork{
  78. IP: containerIP(
  79. ticket.Index,
  80. containerIndex,
  81. ),
  82. },
  83. Resources: dockerpkg.ContainerResources{},
  84. },
  85. )
  86. result, err := NewResult(containerScheme.KindOneshot.Result)
  87. if err != nil {
  88. return nil, fmt.Errorf("can't create oneshot container: %w", err)
  89. }
  90. return &OneshotContainer{
  91. global: global,
  92. ticket: ticket,
  93. id: containerScheme.ID,
  94. container: container,
  95. timeout: containerScheme.KindOneshot.Timeout,
  96. result: result,
  97. }, nil
  98. }
  99. func (this *OneshotContainer) ID() string {
  100. return this.id
  101. }
  102. func (this *OneshotContainer) Start() *ContainerResult {
  103. var err error
  104. this.global.Log.Debug("starting container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
  105. defer func() {
  106. if err != nil {
  107. this.global.Log.Debug("container start failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id), fw.LogError(err))
  108. } else {
  109. this.global.Log.Debug("container started", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
  110. }
  111. }()
  112. // Start
  113. err = this.container.Start()
  114. if err != nil {
  115. return NewContainerResultError(
  116. fmt.Errorf("can't start container: %w", err),
  117. )
  118. }
  119. // Wait
  120. ctx, cancel := context.WithTimeout(
  121. context.Background(),
  122. time.Duration(this.timeout)*time.Millisecond,
  123. )
  124. defer cancel()
  125. var status dockerpkg.ExecutionResult
  126. for {
  127. select {
  128. case <-this.global.Ctx.Done():
  129. err = ErrContainerContextCancelled
  130. return NewContainerResultError(err)
  131. case <-ctx.Done():
  132. err = ErrContainerContextTimeout
  133. return NewContainerResultError(err)
  134. case <-time.After(100 * time.Millisecond): // NOTE: basic 100-millisecond startup delay
  135. }
  136. this.global.Log.Debug("check container status", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
  137. status = this.container.Status(true)
  138. if status.Error != nil {
  139. this.global.Log.Information("container startup error", fw.LogError(status.Error))
  140. continue
  141. }
  142. this.global.Log.Debug("check container result", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
  143. if !this.result.Check(
  144. status.Code,
  145. status.Stdout,
  146. status.Stderr,
  147. ) {
  148. this.global.Log.Information("container startup check failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
  149. err = ErrContainerResultInvalid
  150. return NewContainerResultError(err)
  151. }
  152. this.global.Log.Debug("container checks succeed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
  153. break
  154. }
  155. // Done
  156. return NewContainerResultSuccess(
  157. map[string]bool{},
  158. status.Code,
  159. status.Stdout,
  160. status.Stderr,
  161. )
  162. }
  163. func (this *OneshotContainer) Stop() *ContainerResult {
  164. var err error
  165. this.global.Log.Debug("stopping container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
  166. defer func() {
  167. if err != nil {
  168. this.global.Log.Debug("container stop failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id), fw.LogError(err))
  169. } else {
  170. this.global.Log.Debug("container stopped", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
  171. }
  172. }()
  173. err = this.container.Stop()
  174. if err != nil {
  175. return NewContainerResultError(
  176. fmt.Errorf("can't stop container: %w", err),
  177. )
  178. }
  179. return NewContainerResultSuccess(
  180. map[string]bool{},
  181. 0,
  182. []byte{},
  183. []byte{},
  184. )
  185. }
  186. func (this *OneshotContainer) Execute(timeout int, command string) ExecutionResult {
  187. result := this.container.Execute(timeout, command)
  188. return ExecutionResult{
  189. Error: result.Error,
  190. Code: result.Code,
  191. Stdout: result.Stdout,
  192. Stderr: result.Stderr,
  193. }
  194. }
  195. func (this *OneshotContainer) Read(timeout int, path string) ([]byte, error) {
  196. return this.container.Read(timeout, path)
  197. }