package cep import ( "context" "errors" "fmt" "sync" "time" fw "git.buran.team/main/fairwind" dockerpkg "git.buran.team/main/cep/docker" executorpkg "git.buran.team/main/cep/executor" schemepkg "git.buran.team/main/cep/scheme" ) var ErrPlatformOutOfCapacity = errors.New("platform out of capacity") var ErrPlatformTicketNotFound = errors.New("platform ticket not found") var ErrPlatformTaskNotComplete = errors.New("platform task not complete") type CEP struct { mutex sync.Mutex waitGroup sync.WaitGroup ctxLocal context.Context ctxGlobal context.Context cancel func() global *executorpkg.Global tvm *executorpkg.TVM status map[string]*executorpkg.TaskResult } func NewCEP(ctxGlobal context.Context, log *fw.Log, capacity int, registry schemepkg.Registry) (*CEP, error) { ctxLocal, cancel := context.WithCancel(context.Background()) client, err := dockerpkg.NewDocker() if err != nil { cancel() return nil, fmt.Errorf("can't create cep: %w", err) } return &CEP{ ctxLocal: ctxLocal, ctxGlobal: ctxGlobal, cancel: cancel, global: &executorpkg.Global{ Ctx: ctxGlobal, Log: log, Docker: client, Registry: dockerpkg.NewRegistry( ctxLocal, log, client, registry.Address, registry.Login, registry.Password, ), }, tvm: executorpkg.NewTVM(capacity), status: map[string]*executorpkg.TaskResult{}, }, nil } func (this *CEP) Start() error { this.global.Log.Information("starting cep") defer this.global.Log.Information("cep started") this.waitGroup.Add(1) go this.workerWatch() return nil } func (this *CEP) Stop() error { this.global.Log.Information("stopping cep") defer this.global.Log.Information("cep stopped") this.cancel() this.waitGroup.Wait() return nil } func (this *CEP) Capacity() int { return this.tvm.Capacity() } func (this *CEP) Schedule(task schemepkg.Task) (string, error) { this.mutex.Lock() defer this.mutex.Unlock() if this.tvm.Capacity() == 0 { return "", ErrPlatformOutOfCapacity } ticket, err := this.tvm.AcquireTicket() if err != nil { return "", fmt.Errorf("can't schedule task: %w", err) } this.waitGroup.Add(1) go this.workerExecute(ticket, task) return ticket.UUID, nil } func (this *CEP) Status(UUID string) (*executorpkg.TaskResult, error) { this.mutex.Lock() defer this.mutex.Unlock() if !this.tvm.HasTicket(UUID) { return nil, ErrPlatformTicketNotFound } ticket, err := this.tvm.FindTicket(UUID) if err != nil { return nil, fmt.Errorf("can't obtain task status: %w", err) } status, ok := this.status[UUID] if !ok { return nil, ErrPlatformTaskNotComplete } err = this.tvm.ReleaseTicket(ticket.UUID) if err != nil { return nil, fmt.Errorf("can't obtain task status: %w", err) } delete(this.status, ticket.UUID) return status, nil } func (this *CEP) workerExecute(ticket *executorpkg.Ticket, taskScheme schemepkg.Task) { defer this.waitGroup.Done() this.global.Log.Information("task execution started", fw.LogValue("uuid", ticket.UUID)) // Create task, err := executorpkg.NewTask(this.global, ticket, taskScheme) if err != nil { this.global.Log.Information("task execution finished with errors", fw.LogValue("uuid", ticket.UUID), fw.LogError(err)) return } // Execute result := task.Execute() // Store this.mutex.Lock() this.status[ticket.UUID] = result this.mutex.Unlock() // Done this.global.Log.Information("task execution finished successfully", fw.LogValue("uuid", ticket.UUID)) } func (this *CEP) workerWatch() { defer this.waitGroup.Done() for { select { case <-this.ctxGlobal.Done(): this.cancel() return case <-time.After(10 * time.Microsecond): continue } } }