package executor import ( "context" "errors" "fmt" "sync" "sync/atomic" "time" dockerpkg "git.buran.team/main/cep/docker" schemepkg "git.buran.team/main/cep/scheme" fw "git.buran.team/main/fairwind" ) var ErrContainerStartConditionsNotMeet = errors.New("container start conditions not meet") var ErrContainerStopConditionsNotMeet = errors.New("container stop conditions not meet") type ContiniousContainer struct { global *Global ticket *Ticket id string container *dockerpkg.Container startTimeout int stopTimeout int startConditions []Condition stopConditions []Condition } func NewContiniousContainer( global *Global, ticket *Ticket, network *Network, containerIndex int, containerScheme schemepkg.Container, ) (*ContiniousContainer, error) { variables := []dockerpkg.Variable{} for _, variable := range containerScheme.Variables { variables = append( variables, dockerpkg.Variable{ Key: variable.Key, Value: variable.Value, }, ) } hosts := []dockerpkg.ContainerSettingsHost{} for _, host := range network.Hosts() { hosts = append( hosts, dockerpkg.ContainerSettingsHost{ Name: host.Name, IP: host.IP, }, ) } container := dockerpkg.NewContainer( global.Ctx, global.Log, global.Docker, network.Network(), dockerpkg.NewImage( global.Ctx, global.Log, global.Docker, global.Registry, containerScheme.Image.Tag, containerScheme.Image.Version, containerScheme.Image.Pull, ), dockerpkg.ContainerSettings{ Name: containerName( ticket.Index, containerIndex, ), Command: containerScheme.Command, Variables: variables, Hosts: hosts, Binds: containerScheme.Binds, Permissions: dockerpkg.ContainerSettingsPermissions{ Privileged: containerScheme.Permissions.Privileged, Capabilities: containerScheme.Permissions.Capabilities, }, Network: dockerpkg.ContainerSettingsNetwork{ IP: containerIP( ticket.Index, containerIndex, ), }, Resources: dockerpkg.ContainerResources{ CPU: containerScheme.Resources.CPU, Memory: containerScheme.Resources.Memory, }, }, ) this := &ContiniousContainer{ global: global, ticket: ticket, id: containerScheme.ID, container: container, startTimeout: containerScheme.KindContinious.Start.Timeout, stopTimeout: containerScheme.KindContinious.Stop.Timeout, } startConditions := []Condition{} if containerScheme.KindContinious.Start.Conditions != nil { for _, conditionScheme := range containerScheme.KindContinious.Start.Conditions { condition, err := NewCondition(global, ticket, this, conditionScheme) if err != nil { return nil, fmt.Errorf("can't create container: %w", err) } startConditions = append(startConditions, condition) } } stopConditions := []Condition{} if containerScheme.KindContinious.Stop.Conditions != nil { for _, conditionScheme := range containerScheme.KindContinious.Stop.Conditions { condition, err := NewCondition(global, ticket, this, conditionScheme) if err != nil { return nil, fmt.Errorf("can't create container: %w", err) } stopConditions = append(stopConditions, condition) } } this.startConditions = startConditions this.stopConditions = stopConditions return this, nil } func (this *ContiniousContainer) ID() string { return this.id } func (this *ContiniousContainer) Start() *ContainerResult { var err error this.global.Log.Debug("starting container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id)) defer func() { if err != nil { this.global.Log.Debug("container start failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id), fw.LogError(err)) } else { this.global.Log.Debug("container started", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id)) } }() err = this.container.Start() if err != nil { return NewContainerResultError( fmt.Errorf("can't start container: %w", err), ) } success, conditionsStatus := checkConditions(this.startTimeout, this.startConditions) if !success { err = ErrContainerStartConditionsNotMeet return NewContainerResultError(err) } return NewContainerResultSuccess( conditionsStatus, 0, []byte{}, []byte{}, ) } func (this *ContiniousContainer) Stop() *ContainerResult { var err error this.global.Log.Debug("stopping container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id)) defer func() { if err != nil { this.global.Log.Debug("container stop failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id), fw.LogError(err)) } else { this.global.Log.Debug("container stopped", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id)) } }() containerStatus := this.container.Status(false) // NOTE: shutdown messages ommited if containerStatus.Error != nil { err = containerStatus.Error return NewContainerResultError( containerStatus.Error, ) } err = this.container.Stop() if err != nil { return NewContainerResultError( fmt.Errorf("can't stop container: %w", err), ) } success, conditionsStatus := checkConditions(this.stopTimeout, this.stopConditions) if !success { err = ErrContainerStopConditionsNotMeet return NewContainerResultError(err) } return NewContainerResultSuccess( conditionsStatus, 0, containerStatus.Stdout, containerStatus.Stderr, ) } func (this *ContiniousContainer) Execute(timeout int, command string) ExecutionResult { result := this.container.Execute(timeout, command) return ExecutionResult{ Error: result.Error, Code: result.Code, Stdout: result.Stdout, Stderr: result.Stderr, } } func (this *ContiniousContainer) Read(timeout int, path string) ([]byte, error) { return this.container.Read(timeout, path) } func checkConditions(timeout int, conditions []Condition) (bool, map[string]bool) { if len(conditions) == 0 { return true, map[string]bool{} } ctx, cancel := context.WithTimeout( context.Background(), time.Duration(timeout)*time.Millisecond, ) defer cancel() var mutex sync.Mutex status := map[string]bool{} var successCounter atomic.Int64 successCounter.Store(0) var waitGroup sync.WaitGroup waitGroup.Add(len(conditions)) for _, condition := range conditions { status[condition.ID()] = false } for _, condition := range conditions { go func(condition Condition) { defer waitGroup.Done() for { select { case <-ctx.Done(): return case <-time.After(100 * time.Millisecond): // NOTE: basic 100-millisecond startup delay } if !condition.Check() { continue } successCounter.Add(1) mutex.Lock() status[condition.ID()] = true mutex.Unlock() return } }(condition) } waitGroup.Wait() return successCounter.Load() == int64(len(conditions)), status }