| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- package executor
- import (
- "errors"
- "fmt"
- "sync"
- "sync/atomic"
- fw "git.buran.team/main/fairwind"
- schemepkg "git.buran.team/main/cep/scheme"
- )
- var ErrContainerNotFound = errors.New("container not found")
- type Task struct {
- global *Global
- ticket *Ticket
- id string
- network *Network
- containers []Container
- checker Checker
- }
- func NewTask(global *Global, ticket *Ticket, taskScheme schemepkg.Task) (*Task, error) {
- this := &Task{
- global: global,
- ticket: ticket,
- id: taskScheme.ID,
- }
- // Build hosts
- hosts := []Host{}
- for containerIndex, containerScheme := range taskScheme.Containers {
- hosts = append(
- hosts,
- Host{
- Name: containerScheme.ID,
- IP: containerIP(ticket.Index, containerIndex),
- },
- )
- }
- // Create network
- network, err := NewNetwork(global, ticket, hosts)
- if err != nil {
- return nil, fmt.Errorf("can't create network: %w", err)
- }
- this.network = network
- // Create containers
- containers := []Container{}
- for containerIndex, containerScheme := range taskScheme.Containers {
- container, err := NewContainer(global, ticket, network, containerIndex, containerScheme)
- if err != nil {
- return nil, fmt.Errorf("can't create container: %w", err)
- }
- containers = append(containers, container)
- }
- this.containers = containers
- // Create checker
- checker, err := NewChecker(
- global,
- ticket,
- func(ID string) (Container, error) {
- for _, container := range this.containers {
- if container.ID() == ID {
- return container, nil
- }
- }
- return nil, ErrContainerNotFound
- },
- taskScheme.Checker,
- )
- if err != nil {
- return nil, fmt.Errorf("can't create checker: %w", err)
- }
- this.checker = checker
- // Done
- return this, nil
- }
- func (this *Task) Execute() *TaskResult {
- // Create/Delete network
- this.global.Log.Information("creating network", fw.LogValue("uuid", this.ticket.UUID))
- err := this.network.Create()
- if err != nil {
- this.global.Log.Information("network creation failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogError(err))
- return NewTaskResultFailed()
- } else {
- this.global.Log.Information("network created", fw.LogValue("uuid", this.ticket.UUID))
- }
- defer func() {
- this.global.Log.Information("deleting network", fw.LogValue("uuid", this.ticket.UUID))
- err := this.network.Delete()
- if err != nil {
- this.global.Log.Information("network deletion failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogError(err))
- } else {
- this.global.Log.Information("network deleted", fw.LogValue("uuid", this.ticket.UUID))
- }
- }()
- containerStartResults := map[string]*ContainerResult{}
- start := func() bool {
- var errorCounter atomic.Int64
- var waitGroup sync.WaitGroup
- errorCounter.Store(0)
- this.global.Log.Information("starting containers", fw.LogValue("uuid", this.ticket.UUID))
- waitGroup.Add(len(this.containers))
- for _, container := range this.containers {
- go func(container Container) {
- defer waitGroup.Done()
- this.global.Log.Information("starting container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()))
- containerResult := container.Start()
- if containerResult.Error != nil {
- this.global.Log.Information("container start failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()), fw.LogError(err))
- errorCounter.Add(1)
- } else {
- this.global.Log.Information("container started successfully", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()))
- }
- containerStartResults[container.ID()] = containerResult
- }(container)
- }
- waitGroup.Wait()
- if errorCounter.Load() == 0 {
- this.global.Log.Information("containers started successfully", fw.LogValue("uuid", this.ticket.UUID))
- } else {
- this.global.Log.Information("containers started with errors", fw.LogValue("uuid", this.ticket.UUID))
- }
- return errorCounter.Load() == 0
- }
- containerStopResults := map[string]*ContainerResult{}
- stop := func() bool {
- var errorCounter atomic.Int64
- var waitGroup sync.WaitGroup
- errorCounter.Store(0)
- this.global.Log.Information("stopping containers", fw.LogValue("uuid", this.ticket.UUID))
- waitGroup.Add(len(this.containers))
- for _, container := range this.containers {
- go func(container Container) {
- defer waitGroup.Done()
- this.global.Log.Information("stopping container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()))
- containerResult := container.Stop()
- if containerResult.Error != nil {
- this.global.Log.Information("container stopping failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()), fw.LogError(err))
- errorCounter.Add(1)
- } else {
- this.global.Log.Information("container stopped successfully", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()))
- }
- containerStopResults[container.ID()] = containerResult
- }(container)
- }
- waitGroup.Wait()
- if errorCounter.Load() == 0 {
- this.global.Log.Information("containers stopped successfully", fw.LogValue("uuid", this.ticket.UUID))
- } else {
- this.global.Log.Information("containers stopped with errors", fw.LogValue("uuid", this.ticket.UUID))
- }
- return errorCounter.Load() == 0
- }
- // Start
- if !start() {
- if !stop() {
- // ...
- }
- return NewTaskResultFailed()
- }
- // Check
- this.global.Log.Information("starting checker", fw.LogValue("uuid", this.ticket.UUID))
- result := this.checker.Check()
- result.Start = containerStartResults
- this.global.Log.Information("checker finished", fw.LogValue("uuid", this.ticket.UUID))
- // Stop
- if !stop() {
- return result
- }
- // Done
- result.Stop = containerStopResults
- result.Clean = true
- return result
- }
|