| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- package executor
- import (
- "context"
- "errors"
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- dockerpkg "git.buran.team/main/cep/docker"
- schemepkg "git.buran.team/main/cep/scheme"
- fw "git.buran.team/main/fairwind"
- )
- var ErrContainerStartConditionsNotMeet = errors.New("container start conditions not meet")
- var ErrContainerStopConditionsNotMeet = errors.New("container stop conditions not meet")
- type ContiniousContainer struct {
- global *Global
- ticket *Ticket
- id string
- container *dockerpkg.Container
- startTimeout int
- stopTimeout int
- startConditions []Condition
- stopConditions []Condition
- }
- func NewContiniousContainer(
- global *Global,
- ticket *Ticket,
- network *Network,
- containerIndex int,
- containerScheme schemepkg.Container,
- ) (*ContiniousContainer, error) {
- variables := []dockerpkg.Variable{}
- for _, variable := range containerScheme.Variables {
- variables = append(
- variables,
- dockerpkg.Variable{
- Key: variable.Key,
- Value: variable.Value,
- },
- )
- }
- hosts := []dockerpkg.ContainerSettingsHost{}
- for _, host := range network.Hosts() {
- hosts = append(
- hosts,
- dockerpkg.ContainerSettingsHost{
- Name: host.Name,
- IP: host.IP,
- },
- )
- }
- container := dockerpkg.NewContainer(
- global.Ctx,
- global.Log,
- global.Docker,
- network.Network(),
- dockerpkg.NewImage(
- global.Ctx,
- global.Log,
- global.Docker,
- global.Registry,
- containerScheme.Image.Tag,
- containerScheme.Image.Version,
- containerScheme.Image.Pull,
- ),
- dockerpkg.ContainerSettings{
- Name: containerName(
- ticket.Index,
- containerIndex,
- ),
- Command: containerScheme.Command,
- Variables: variables,
- Hosts: hosts,
- Binds: containerScheme.Binds,
- Permissions: dockerpkg.ContainerSettingsPermissions{
- Privileged: containerScheme.Permissions.Privileged,
- Capabilities: containerScheme.Permissions.Capabilities,
- },
- Network: dockerpkg.ContainerSettingsNetwork{
- IP: containerIP(
- ticket.Index,
- containerIndex,
- ),
- },
- Resources: dockerpkg.ContainerResources{
- CPU: containerScheme.Resources.CPU,
- Memory: containerScheme.Resources.Memory,
- },
- },
- )
- this := &ContiniousContainer{
- global: global,
- ticket: ticket,
- id: containerScheme.ID,
- container: container,
- startTimeout: containerScheme.KindContinious.Start.Timeout,
- stopTimeout: containerScheme.KindContinious.Stop.Timeout,
- }
- startConditions := []Condition{}
- if containerScheme.KindContinious.Start.Conditions != nil {
- for _, conditionScheme := range containerScheme.KindContinious.Start.Conditions {
- condition, err := NewCondition(global, ticket, this, conditionScheme)
- if err != nil {
- return nil, fmt.Errorf("can't create container: %w", err)
- }
- startConditions = append(startConditions, condition)
- }
- }
- stopConditions := []Condition{}
- if containerScheme.KindContinious.Stop.Conditions != nil {
- for _, conditionScheme := range containerScheme.KindContinious.Stop.Conditions {
- condition, err := NewCondition(global, ticket, this, conditionScheme)
- if err != nil {
- return nil, fmt.Errorf("can't create container: %w", err)
- }
- stopConditions = append(stopConditions, condition)
- }
- }
- this.startConditions = startConditions
- this.stopConditions = stopConditions
- return this, nil
- }
- func (this *ContiniousContainer) ID() string {
- return this.id
- }
- func (this *ContiniousContainer) Start() *ContainerResult {
- var err error
- this.global.Log.Debug("starting container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
- defer func() {
- if err != nil {
- this.global.Log.Debug("container start failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id), fw.LogError(err))
- } else {
- this.global.Log.Debug("container started", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
- }
- }()
- err = this.container.Start()
- if err != nil {
- return NewContainerResultError(
- fmt.Errorf("can't start container: %w", err),
- )
- }
- success, conditionsStatus := checkConditions(this.startTimeout, this.startConditions)
- if !success {
- err = ErrContainerStartConditionsNotMeet
- return NewContainerResultError(err)
- }
- return NewContainerResultSuccess(
- conditionsStatus,
- 0,
- []byte{},
- []byte{},
- )
- }
- func (this *ContiniousContainer) Stop() *ContainerResult {
- var err error
- this.global.Log.Debug("stopping container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
- defer func() {
- if err != nil {
- this.global.Log.Debug("container stop failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id), fw.LogError(err))
- } else {
- this.global.Log.Debug("container stopped", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
- }
- }()
- containerStatus := this.container.Status(false) // NOTE: shutdown messages ommited
- if containerStatus.Error != nil {
- err = containerStatus.Error
- return NewContainerResultError(
- containerStatus.Error,
- )
- }
- err = this.container.Stop()
- if err != nil {
- return NewContainerResultError(
- fmt.Errorf("can't stop container: %w", err),
- )
- }
- success, conditionsStatus := checkConditions(this.stopTimeout, this.stopConditions)
- if !success {
- err = ErrContainerStopConditionsNotMeet
- return NewContainerResultError(err)
- }
- return NewContainerResultSuccess(
- conditionsStatus,
- 0,
- containerStatus.Stdout,
- containerStatus.Stderr,
- )
- }
- func (this *ContiniousContainer) Execute(timeout int, command string) ExecutionResult {
- result := this.container.Execute(timeout, command)
- return ExecutionResult{
- Error: result.Error,
- Code: result.Code,
- Stdout: result.Stdout,
- Stderr: result.Stderr,
- }
- }
- func (this *ContiniousContainer) Read(timeout int, path string) ([]byte, error) {
- return this.container.Read(timeout, path)
- }
- func checkConditions(timeout int, conditions []Condition) (bool, map[string]bool) {
- if len(conditions) == 0 {
- return true, map[string]bool{}
- }
- ctx, cancel := context.WithTimeout(
- context.Background(),
- time.Duration(timeout)*time.Millisecond,
- )
- defer cancel()
- var mutex sync.Mutex
- status := map[string]bool{}
- var successCounter atomic.Int64
- successCounter.Store(0)
- var waitGroup sync.WaitGroup
- waitGroup.Add(len(conditions))
- for _, condition := range conditions {
- status[condition.ID()] = false
- }
- for _, condition := range conditions {
- go func(condition Condition) {
- defer waitGroup.Done()
- for {
- select {
- case <-ctx.Done():
- return
- case <-time.After(100 * time.Millisecond): // NOTE: basic 100-millisecond startup delay
- }
- if !condition.Check() {
- continue
- }
- successCounter.Add(1)
- mutex.Lock()
- status[condition.ID()] = true
- mutex.Unlock()
- return
- }
- }(condition)
- }
- waitGroup.Wait()
- return successCounter.Load() == int64(len(conditions)), status
- }
|