| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- package cep
- import (
- "context"
- "errors"
- "fmt"
- "sync"
- "time"
- fw "git.buran.team/main/fairwind"
- dockerpkg "git.buran.team/main/cep/docker"
- executorpkg "git.buran.team/main/cep/executor"
- schemepkg "git.buran.team/main/cep/scheme"
- )
- var ErrPlatformOutOfCapacity = errors.New("platform out of capacity")
- var ErrPlatformTicketNotFound = errors.New("platform ticket not found")
- var ErrPlatformTaskNotComplete = errors.New("platform task not complete")
- type CEP struct {
- mutex sync.Mutex
- waitGroup sync.WaitGroup
- ctxLocal context.Context
- ctxGlobal context.Context
- cancel func()
- global *executorpkg.Global
- tvm *executorpkg.TVM
- status map[string]*executorpkg.TaskResult
- }
- func NewCEP(ctxGlobal context.Context, log *fw.Log, capacity int, registry schemepkg.Registry) (*CEP, error) {
- ctxLocal, cancel := context.WithCancel(context.Background())
- client, err := dockerpkg.NewDocker()
- if err != nil {
- cancel()
- return nil, fmt.Errorf("can't create cep: %w", err)
- }
- return &CEP{
- ctxLocal: ctxLocal,
- ctxGlobal: ctxGlobal,
- cancel: cancel,
- global: &executorpkg.Global{
- Ctx: ctxGlobal,
- Log: log,
- Docker: client,
- Registry: dockerpkg.NewRegistry(
- ctxLocal,
- log,
- client,
- registry.Address,
- registry.Login,
- registry.Password,
- ),
- },
- tvm: executorpkg.NewTVM(capacity),
- status: map[string]*executorpkg.TaskResult{},
- }, nil
- }
- func (this *CEP) Start() error {
- this.global.Log.Information("starting cep")
- defer this.global.Log.Information("cep started")
- this.waitGroup.Add(1)
- go this.workerWatch()
- return nil
- }
- func (this *CEP) Stop() error {
- this.global.Log.Information("stopping cep")
- defer this.global.Log.Information("cep stopped")
- this.cancel()
- this.waitGroup.Wait()
- return nil
- }
- func (this *CEP) Capacity() int {
- return this.tvm.Capacity()
- }
- func (this *CEP) Schedule(task schemepkg.Task) (string, error) {
- this.mutex.Lock()
- defer this.mutex.Unlock()
- if this.tvm.Capacity() == 0 {
- return "", ErrPlatformOutOfCapacity
- }
- ticket, err := this.tvm.AcquireTicket()
- if err != nil {
- return "", fmt.Errorf("can't schedule task: %w", err)
- }
- this.waitGroup.Add(1)
- go this.workerExecute(ticket, task)
- return ticket.UUID, nil
- }
- func (this *CEP) Status(UUID string) (*executorpkg.TaskResult, error) {
- this.mutex.Lock()
- defer this.mutex.Unlock()
- if !this.tvm.HasTicket(UUID) {
- return nil, ErrPlatformTicketNotFound
- }
- ticket, err := this.tvm.FindTicket(UUID)
- if err != nil {
- return nil, fmt.Errorf("can't obtain task status: %w", err)
- }
- status, ok := this.status[UUID]
- if !ok {
- return nil, ErrPlatformTaskNotComplete
- }
- err = this.tvm.ReleaseTicket(ticket.UUID)
- if err != nil {
- return nil, fmt.Errorf("can't obtain task status: %w", err)
- }
- delete(this.status, ticket.UUID)
- return status, nil
- }
- func (this *CEP) workerExecute(ticket *executorpkg.Ticket, taskScheme schemepkg.Task) {
- defer this.waitGroup.Done()
- this.global.Log.Information("task execution started", fw.LogValue("uuid", ticket.UUID))
- // Create
- task, err := executorpkg.NewTask(this.global, ticket, taskScheme)
- if err != nil {
- this.global.Log.Information("task execution finished with errors", fw.LogValue("uuid", ticket.UUID), fw.LogError(err))
- return
- }
- // Execute
- result := task.Execute()
- // Store
- this.mutex.Lock()
- this.status[ticket.UUID] = result
- this.mutex.Unlock()
- // Done
- this.global.Log.Information("task execution finished successfully", fw.LogValue("uuid", ticket.UUID))
- }
- func (this *CEP) workerWatch() {
- defer this.waitGroup.Done()
- for {
- select {
- case <-this.ctxGlobal.Done():
- this.cancel()
- return
- case <-time.After(10 * time.Microsecond):
- continue
- }
- }
- }
|