package executor import ( "errors" "fmt" "sync" "sync/atomic" fw "git.buran.team/main/fairwind" schemepkg "git.buran.team/main/cep/scheme" ) var ErrContainerNotFound = errors.New("container not found") type Task struct { global *Global ticket *Ticket id string network *Network containers []Container checker Checker } func NewTask(global *Global, ticket *Ticket, taskScheme schemepkg.Task) (*Task, error) { this := &Task{ global: global, ticket: ticket, id: taskScheme.ID, } // Build hosts hosts := []Host{} for containerIndex, containerScheme := range taskScheme.Containers { hosts = append( hosts, Host{ Name: containerScheme.ID, IP: containerIP(ticket.Index, containerIndex), }, ) } // Create network network, err := NewNetwork(global, ticket, hosts) if err != nil { return nil, fmt.Errorf("can't create network: %w", err) } this.network = network // Create containers containers := []Container{} for containerIndex, containerScheme := range taskScheme.Containers { container, err := NewContainer(global, ticket, network, containerIndex, containerScheme) if err != nil { return nil, fmt.Errorf("can't create container: %w", err) } containers = append(containers, container) } this.containers = containers // Create checker checker, err := NewChecker( global, ticket, func(ID string) (Container, error) { for _, container := range this.containers { if container.ID() == ID { return container, nil } } return nil, ErrContainerNotFound }, taskScheme.Checker, ) if err != nil { return nil, fmt.Errorf("can't create checker: %w", err) } this.checker = checker // Done return this, nil } func (this *Task) Execute() *TaskResult { // Create/Delete network this.global.Log.Information("creating network", fw.LogValue("uuid", this.ticket.UUID)) err := this.network.Create() if err != nil { this.global.Log.Information("network creation failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogError(err)) return NewTaskResultFailed() } else { this.global.Log.Information("network created", fw.LogValue("uuid", this.ticket.UUID)) } defer func() { this.global.Log.Information("deleting network", fw.LogValue("uuid", this.ticket.UUID)) err := this.network.Delete() if err != nil { this.global.Log.Information("network deletion failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogError(err)) } else { this.global.Log.Information("network deleted", fw.LogValue("uuid", this.ticket.UUID)) } }() containerStartResults := map[string]*ContainerResult{} start := func() bool { var errorCounter atomic.Int64 var waitGroup sync.WaitGroup errorCounter.Store(0) this.global.Log.Information("starting containers", fw.LogValue("uuid", this.ticket.UUID)) waitGroup.Add(len(this.containers)) for _, container := range this.containers { go func(container Container) { defer waitGroup.Done() this.global.Log.Information("starting container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID())) containerResult := container.Start() if containerResult.Error != nil { this.global.Log.Information("container start failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()), fw.LogError(err)) errorCounter.Add(1) } else { this.global.Log.Information("container started successfully", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID())) } containerStartResults[container.ID()] = containerResult }(container) } waitGroup.Wait() if errorCounter.Load() == 0 { this.global.Log.Information("containers started successfully", fw.LogValue("uuid", this.ticket.UUID)) } else { this.global.Log.Information("containers started with errors", fw.LogValue("uuid", this.ticket.UUID)) } return errorCounter.Load() == 0 } containerStopResults := map[string]*ContainerResult{} stop := func() bool { var errorCounter atomic.Int64 var waitGroup sync.WaitGroup errorCounter.Store(0) this.global.Log.Information("stopping containers", fw.LogValue("uuid", this.ticket.UUID)) waitGroup.Add(len(this.containers)) for _, container := range this.containers { go func(container Container) { defer waitGroup.Done() this.global.Log.Information("stopping container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID())) containerResult := container.Stop() if containerResult.Error != nil { this.global.Log.Information("container stopping failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()), fw.LogError(err)) errorCounter.Add(1) } else { this.global.Log.Information("container stopped successfully", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID())) } containerStopResults[container.ID()] = containerResult }(container) } waitGroup.Wait() if errorCounter.Load() == 0 { this.global.Log.Information("containers stopped successfully", fw.LogValue("uuid", this.ticket.UUID)) } else { this.global.Log.Information("containers stopped with errors", fw.LogValue("uuid", this.ticket.UUID)) } return errorCounter.Load() == 0 } // Start if !start() { if !stop() { // ... } return NewTaskResultFailed() } // Check this.global.Log.Information("starting checker", fw.LogValue("uuid", this.ticket.UUID)) result := this.checker.Check() result.Start = containerStartResults this.global.Log.Information("checker finished", fw.LogValue("uuid", this.ticket.UUID)) // Stop if !stop() { return result } // Done result.Stop = containerStopResults result.Clean = true return result }