Stan преди 1 ден
ревизия
a4aa8f99ee

+ 3 - 0
README.md

@@ -0,0 +1,3 @@
+# CEP
+
+Central Execution Platform

+ 2 - 0
TODO.md

@@ -0,0 +1,2 @@
+* Add logging to Result and Processor
+* Add more details (code, stdout, stderr) if container startup failed

+ 167 - 0
cep.go

@@ -0,0 +1,167 @@
+package cep
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"sync"
+	"time"
+
+	fw "git.buran.team/main/fairwind"
+
+	dockerpkg "git.buran.team/main/cep/docker"
+	executorpkg "git.buran.team/main/cep/executor"
+	schemepkg "git.buran.team/main/cep/scheme"
+)
+
+var ErrPlatformOutOfCapacity = errors.New("platform out of capacity")
+var ErrPlatformTicketNotFound = errors.New("platform ticket not found")
+var ErrPlatformTaskNotComplete = errors.New("platform task not complete")
+
+type CEP struct {
+	mutex     sync.Mutex
+	waitGroup sync.WaitGroup
+	ctxLocal  context.Context
+	ctxGlobal context.Context
+	cancel    func()
+	global    *executorpkg.Global
+	tvm       *executorpkg.TVM
+	status    map[string]*executorpkg.TaskResult
+}
+
+func NewCEP(ctxGlobal context.Context, log *fw.Log, capacity int, registry schemepkg.Registry) (*CEP, error) {
+	ctxLocal, cancel := context.WithCancel(context.Background())
+
+	client, err := dockerpkg.NewDocker()
+	if err != nil {
+		cancel()
+		return nil, fmt.Errorf("can't create cep: %w", err)
+	}
+
+	return &CEP{
+		ctxLocal:  ctxLocal,
+		ctxGlobal: ctxGlobal,
+		cancel:    cancel,
+		global: &executorpkg.Global{
+			Ctx:    ctxGlobal,
+			Log:    log,
+			Docker: client,
+			Registry: dockerpkg.NewRegistry(
+				ctxLocal,
+				log,
+				client,
+				registry.Address,
+				registry.Login,
+				registry.Password,
+			),
+		},
+		tvm:    executorpkg.NewTVM(capacity),
+		status: map[string]*executorpkg.TaskResult{},
+	}, nil
+}
+
+func (this *CEP) Start() error {
+	this.global.Log.Information("starting cep")
+	defer this.global.Log.Information("cep started")
+
+	this.waitGroup.Add(1)
+	go this.workerWatch()
+	return nil
+}
+
+func (this *CEP) Stop() error {
+	this.global.Log.Information("stopping cep")
+	defer this.global.Log.Information("cep stopped")
+
+	this.cancel()
+	this.waitGroup.Wait()
+	return nil
+}
+
+func (this *CEP) Capacity() int {
+	return this.tvm.Capacity()
+}
+
+func (this *CEP) Schedule(task schemepkg.Task) (string, error) {
+	this.mutex.Lock()
+	defer this.mutex.Unlock()
+
+	if this.tvm.Capacity() == 0 {
+		return "", ErrPlatformOutOfCapacity
+	}
+
+	ticket, err := this.tvm.AcquireTicket()
+	if err != nil {
+		return "", fmt.Errorf("can't schedule task: %w", err)
+	}
+
+	this.waitGroup.Add(1)
+	go this.workerExecute(ticket, task)
+	return ticket.UUID, nil
+}
+
+func (this *CEP) Status(UUID string) (*executorpkg.TaskResult, error) {
+	this.mutex.Lock()
+	defer this.mutex.Unlock()
+
+	if !this.tvm.HasTicket(UUID) {
+		return nil, ErrPlatformTicketNotFound
+	}
+
+	ticket, err := this.tvm.FindTicket(UUID)
+	if err != nil {
+		return nil, fmt.Errorf("can't obtain task status: %w", err)
+	}
+
+	status, ok := this.status[UUID]
+	if !ok {
+		return nil, ErrPlatformTaskNotComplete
+	}
+
+	err = this.tvm.ReleaseTicket(ticket.UUID)
+	if err != nil {
+		return nil, fmt.Errorf("can't obtain task status: %w", err)
+	}
+
+	delete(this.status, ticket.UUID)
+	return status, nil
+}
+
+func (this *CEP) workerExecute(ticket *executorpkg.Ticket, taskScheme schemepkg.Task) {
+	defer this.waitGroup.Done()
+
+	this.global.Log.Information("task execution started", fw.LogValue("uuid", ticket.UUID))
+
+	// Create
+	task, err := executorpkg.NewTask(this.global, ticket, taskScheme)
+	if err != nil {
+		this.global.Log.Information("task execution finished with errors", fw.LogValue("uuid", ticket.UUID), fw.LogError(err))
+		return
+	}
+
+	// Execute
+	result := task.Execute()
+
+	// Store
+	this.mutex.Lock()
+	this.status[ticket.UUID] = result
+	this.mutex.Unlock()
+
+	// Done
+	this.global.Log.Information("task execution finished successfully", fw.LogValue("uuid", ticket.UUID))
+}
+
+func (this *CEP) workerWatch() {
+	defer this.waitGroup.Done()
+
+	for {
+		select {
+		case <-this.ctxGlobal.Done():
+			this.cancel()
+			return
+
+		case <-time.After(10 * time.Microsecond):
+			continue
+		}
+	}
+}

+ 388 - 0
docker/container.go

@@ -0,0 +1,388 @@
+package docker
+
+import (
+	"archive/tar"
+	"bytes"
+	"context"
+	"errors"
+	"fmt"
+	"io"
+	"net/netip"
+	"strings"
+	"time"
+
+	fw "git.buran.team/main/fairwind"
+
+	"github.com/docker/docker/pkg/stdcopy"
+	mobycontainer "github.com/moby/moby/api/types/container"
+	mobynetwork "github.com/moby/moby/api/types/network"
+	moby "github.com/moby/moby/client"
+)
+
+var ErrContainerExecReturnedNonZero = errors.New("container exec returned non-zero")
+var ErrContainerNotExited = errors.New("container not exited")
+
+type ExecutionResult struct {
+	Code   int
+	Stdout []byte
+	Stderr []byte
+	Error  error
+}
+
+type ContainerHook func(ctx context.Context) error
+
+type ContainerSettingsPermissions struct {
+	Privileged   bool
+	Capabilities []string
+}
+
+type ContainerSettingsNetwork struct {
+	IP string
+}
+
+type ContainerResources struct {
+	CPU    int64
+	Memory int64
+	// TODO: disk iops, network iops
+}
+
+type Variable struct {
+	Key   string
+	Value string
+}
+
+type ContainerSettingsHost struct {
+	Name string
+	IP   string
+}
+
+type ContainerSettings struct {
+	Name        string
+	Command     string
+	Variables   []Variable
+	Hosts       []ContainerSettingsHost
+	Binds       []string
+	Permissions ContainerSettingsPermissions
+	Network     ContainerSettingsNetwork
+	Resources   ContainerResources
+}
+
+type Container struct {
+	ctx      context.Context
+	log      *fw.Log
+	client   *Docker
+	network  *Network
+	image    *Image
+	settings ContainerSettings
+}
+
+func NewContainer(ctx context.Context, log *fw.Log, client *Docker, network *Network, image *Image, settings ContainerSettings) *Container {
+	return &Container{
+		ctx:      ctx,
+		log:      log,
+		client:   client,
+		network:  network,
+		image:    image,
+		settings: settings,
+	}
+}
+
+func (this *Container) Image() *Image {
+	return this.image
+}
+
+func (this *Container) Start() error {
+	err := this.image.Pull()
+	if err != nil {
+		return fmt.Errorf("can't start container: %w", err)
+	}
+
+	binds := []string{}
+	for _, bind := range this.settings.Binds {
+		binds = append(binds, fmt.Sprintf("%s:%s", bind, bind))
+	}
+
+	command := []string{}
+	if len(this.settings.Command) > 0 {
+		command = strings.Split(this.settings.Command, " ")
+	} else {
+		command = nil
+	}
+
+	env := []string{}
+	if this.settings.Variables != nil {
+		for _, environment := range this.settings.Variables {
+			env = append(
+				env,
+				fmt.Sprintf("%s=%s", environment.Key, environment.Value),
+			)
+		}
+	}
+
+	hosts := []string{}
+	if this.settings.Hosts != nil {
+		for _, host := range this.settings.Hosts {
+			hosts = append(
+				hosts,
+				fmt.Sprintf("%s:%s", host.Name, host.IP),
+			)
+		}
+	}
+
+	resources := mobycontainer.Resources{}
+	if this.settings.Resources.CPU > 0 {
+		resources.NanoCPUs = this.settings.Resources.CPU
+	}
+
+	if this.settings.Resources.Memory > 0 {
+		resources.Memory = this.settings.Resources.Memory
+	}
+
+	// Create container
+	_, err = this.client.Docker.ContainerCreate(
+		this.ctx,
+		moby.ContainerCreateOptions{
+			Name: this.settings.Name,
+			Config: &mobycontainer.Config{
+				Image: this.image.Name(),
+				Cmd:   command,
+				Env:   env,
+			},
+			HostConfig: &mobycontainer.HostConfig{
+				Binds: binds,
+				RestartPolicy: mobycontainer.RestartPolicy{
+					Name: mobycontainer.RestartPolicyDisabled,
+				},
+				Privileged:  this.settings.Permissions.Privileged,
+				CapAdd:      this.settings.Permissions.Capabilities,
+				NetworkMode: "default",
+				Resources:   resources,
+				ExtraHosts:  hosts,
+			},
+			NetworkingConfig: &mobynetwork.NetworkingConfig{
+				EndpointsConfig: map[string]*mobynetwork.EndpointSettings{
+					this.network.Name(): {
+						IPAMConfig: &mobynetwork.EndpointIPAMConfig{
+							IPv4Address: netip.MustParseAddr(this.settings.Network.IP),
+						},
+					},
+				},
+			},
+		},
+	)
+	if err != nil {
+		return fmt.Errorf("can't start container: %w", err)
+	}
+
+	// Start
+	_, err = this.client.Docker.ContainerStart(
+		this.ctx,
+		this.settings.Name,
+		moby.ContainerStartOptions{},
+	)
+	if err != nil {
+		return fmt.Errorf("can't start container: %w", err)
+	}
+
+	return nil
+}
+
+func (this *Container) Stop() error {
+	_, err := this.client.Docker.ContainerRemove(
+		this.ctx,
+		this.settings.Name,
+		moby.ContainerRemoveOptions{
+			Force: true,
+		},
+	)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (this *Container) Status(shouldExit bool) ExecutionResult {
+	// Get status
+	result0, err := this.client.Docker.ContainerInspect(
+		this.ctx,
+		this.settings.Name,
+		moby.ContainerInspectOptions{},
+	)
+	if err != nil {
+		return ExecutionResult{
+			Error: err,
+		}
+	}
+
+	if shouldExit {
+		if result0.Container.State.Status != "exited" {
+			return ExecutionResult{
+				Error: ErrContainerNotExited,
+			}
+		}
+	}
+
+	// Get logs
+	result1, err := this.client.Docker.ContainerLogs(
+		this.ctx,
+		this.settings.Name,
+		moby.ContainerLogsOptions{
+			ShowStdout: true,
+			ShowStderr: true,
+		},
+	)
+	if err != nil {
+		return ExecutionResult{
+			Error: err,
+		}
+	}
+
+	var stdout bytes.Buffer
+	var stderr bytes.Buffer
+	_, err = stdcopy.StdCopy(&stdout, &stderr, result1)
+	if err != nil {
+		return ExecutionResult{
+			Error: err,
+		}
+	}
+
+	// Done
+	return ExecutionResult{
+		Code:   result0.Container.State.ExitCode,
+		Stdout: stdout.Bytes(),
+		Stderr: stderr.Bytes(),
+	}
+}
+
+func (this *Container) Execute(timeout int, command string) ExecutionResult {
+	// Context
+	ctx, cancel := context.WithTimeout(
+		this.ctx,
+		time.Duration(timeout)*time.Millisecond,
+	)
+	defer cancel()
+
+	// Create
+	resultCreate, err := this.client.Docker.ExecCreate(
+		ctx,
+		this.settings.Name,
+		moby.ExecCreateOptions{
+			AttachStdin:  false,
+			AttachStdout: true,
+			AttachStderr: true,
+			Cmd:          strings.Split(command, " "),
+		},
+	)
+	if err != nil {
+		return ExecutionResult{
+			Error: fmt.Errorf("can't exec command in container: %w", err),
+		}
+	}
+
+	resultAttach, err := this.client.Docker.ExecAttach(
+		ctx,
+		resultCreate.ID,
+		moby.ExecAttachOptions{},
+	)
+	if err != nil {
+		return ExecutionResult{
+			Error: fmt.Errorf("can't exec command in container: %w", err),
+		}
+	}
+	defer resultAttach.Close()
+
+	// Start
+	_, err = this.client.Docker.ExecStart(
+		ctx,
+		resultCreate.ID,
+		moby.ExecStartOptions{},
+	)
+	if err != nil {
+		return ExecutionResult{
+			Error: fmt.Errorf("can't exec command in container: %w", err),
+		}
+	}
+
+	// Wait
+	exitCode := 0
+	for {
+		resultInspect, err := this.client.Docker.ExecInspect(
+			ctx,
+			resultCreate.ID,
+			moby.ExecInspectOptions{},
+		)
+		if err != nil {
+			return ExecutionResult{
+				Error: fmt.Errorf("can't exec command in container: %w", err),
+			}
+		}
+
+		if resultInspect.Running {
+			time.Sleep(10 * time.Millisecond)
+			continue
+		}
+
+		exitCode = resultInspect.ExitCode
+		break
+	}
+
+	var stdout bytes.Buffer
+	var stderr bytes.Buffer
+	_, err = stdcopy.StdCopy(&stdout, &stderr, resultAttach.Reader)
+	if err != nil {
+		return ExecutionResult{
+			Error: fmt.Errorf("can't exec command in container: %w", err),
+		}
+	}
+
+	// Done
+	return ExecutionResult{
+		Code:   exitCode,
+		Stdout: stdout.Bytes(),
+		Stderr: stderr.Bytes(),
+	}
+}
+
+func (this *Container) Read(timeout int, path string) ([]byte, error) {
+	// Context
+	ctx, cancel := context.WithTimeout(
+		this.ctx,
+		time.Duration(timeout)*time.Millisecond,
+	)
+	defer cancel()
+
+	// Read
+	result, err := this.client.Docker.CopyFromContainer(
+		ctx,
+		this.settings.Name,
+		moby.CopyFromContainerOptions{
+			SourcePath: path,
+		},
+	)
+	if err != nil {
+		return nil, fmt.Errorf("can't copy from container: %w", err)
+	}
+	defer result.Content.Close()
+
+	// Tar
+	buffer := []byte{}
+	reader := tar.NewReader(result.Content)
+
+	_, err = reader.Next()
+	if err == io.EOF {
+		return nil, fmt.Errorf("can't copy from container: %w", err)
+	}
+
+	if err != nil {
+		return nil, fmt.Errorf("can't copy from container: %w", err)
+	}
+
+	buffer, err = io.ReadAll(reader)
+	if err != nil {
+		return nil, fmt.Errorf("can't copy from container: %w", err)
+	}
+
+	// Done
+	return buffer, nil
+}

+ 22 - 0
docker/docker.go

@@ -0,0 +1,22 @@
+package docker
+
+import (
+	"fmt"
+
+	moby "github.com/moby/moby/client"
+)
+
+type Docker struct {
+	Docker *moby.Client
+}
+
+func NewDocker() (*Docker, error) {
+	client, err := moby.New()
+	if err != nil {
+		return nil, fmt.Errorf("can't create docker: %w", err)
+	}
+
+	return &Docker{
+		Docker: client,
+	}, nil
+}

+ 98 - 0
docker/image.go

@@ -0,0 +1,98 @@
+package docker
+
+import (
+	"context"
+	"encoding/base64"
+	"encoding/json"
+	"fmt"
+
+	fw "git.buran.team/main/fairwind"
+
+	mobyregistry "github.com/moby/moby/api/types/registry"
+	moby "github.com/moby/moby/client"
+)
+
+type Image struct {
+	ctx      context.Context
+	log      *fw.Log
+	client   *Docker
+	registry *Registry
+	tag      string
+	version  string
+	pull     bool
+}
+
+func NewImage(ctx context.Context, log *fw.Log, client *Docker, registry *Registry, tag string, version string, pull bool) *Image {
+	return &Image{
+		ctx:      ctx,
+		log:      log,
+		client:   client,
+		registry: registry,
+		tag:      tag,
+		version:  version,
+		pull:     pull,
+	}
+}
+
+func (this *Image) Registry() *Registry {
+	return this.registry
+}
+
+func (this *Image) Name() string {
+	return fmt.Sprintf("%s/%s:%s", this.registry.address, this.tag, this.version)
+}
+
+func (this *Image) Tag() string {
+	return this.tag
+}
+
+func (this *Image) Version() string {
+	return this.version
+}
+
+func (this *Image) Pull() error {
+	if !this.pull {
+		return nil
+	}
+
+	credentials, err := credentialsToString(
+		this.registry.login,
+		this.registry.password,
+	)
+	if err != nil {
+		return fmt.Errorf("can't pull image: %w", err)
+	}
+
+	response, err := this.client.Docker.ImagePull(
+		this.ctx,
+		this.Name(),
+		moby.ImagePullOptions{
+			RegistryAuth: credentials,
+		},
+	)
+	if err != nil {
+		return fmt.Errorf("can't pull image: %w", err)
+	}
+
+	err = response.Wait(this.ctx)
+	if err != nil {
+		return fmt.Errorf("can't pull image: %w", err)
+	}
+
+	return nil
+}
+
+func credentialsToString(login string, password string) (string, error) {
+	encodedJSON, err := json.Marshal(
+		&mobyregistry.AuthConfig{
+			Username: login,
+			Password: password,
+		},
+	)
+	if err != nil {
+		return "", fmt.Errorf("can't serialize credentials: %w", err)
+	}
+
+	authStr := base64.URLEncoding.EncodeToString(encodedJSON)
+	return authStr, nil
+}

+ 85 - 0
docker/network.go

@@ -0,0 +1,85 @@
+package docker
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"net/netip"
+
+	fw "git.buran.team/main/fairwind"
+
+	mobynetwork "github.com/moby/moby/api/types/network"
+	moby "github.com/moby/moby/client"
+)
+
+var DEFAULT_NETWORK_CIDR = netip.MustParsePrefix("172.16.0.0/24")
+var DEFAULT_NETWORK_GATEWAY = netip.MustParseAddr("172.16.0.1")
+
+var ErrNetworkAlreadyExists = errors.New("network already exists")
+
+type Network struct {
+	ctx     context.Context
+	log     *fw.Log
+	client  *Docker
+	name    string
+	cidr    netip.Prefix
+	gateway netip.Addr
+}
+
+func NewNetwork(ctx context.Context, log *fw.Log, client *Docker, name string, cidr netip.Prefix, gateway netip.Addr) *Network {
+	return &Network{
+		ctx:     ctx,
+		log:     log,
+		client:  client,
+		name:    name,
+		cidr:    cidr,
+		gateway: gateway,
+	}
+}
+
+func (this *Network) Name() string {
+	return this.name
+}
+
+func (this *Network) Create() error {
+	networks, err := this.client.Docker.NetworkList(this.ctx, moby.NetworkListOptions{})
+	if err != nil {
+		return fmt.Errorf("can't create network: %w", err)
+	}
+
+	for _, network := range networks.Items {
+		if network.Name == this.name {
+			return ErrNetworkAlreadyExists
+		}
+	}
+
+	_, err = this.client.Docker.NetworkCreate(
+		this.ctx,
+		this.name,
+		moby.NetworkCreateOptions{
+			Driver: "bridge",
+			IPAM: &mobynetwork.IPAM{
+				Config: []mobynetwork.IPAMConfig{
+					{
+						Subnet:  this.cidr,
+						Gateway: this.gateway,
+					},
+				},
+			},
+		},
+	)
+	if err != nil {
+		return fmt.Errorf("can't create network: %w", err)
+	}
+
+	return nil
+}
+
+func (this *Network) Delete() error {
+	_, err := this.client.Docker.NetworkRemove(this.ctx, this.name, moby.NetworkRemoveOptions{})
+	if err != nil {
+		return fmt.Errorf("can't delete network: %w", err)
+	}
+
+	return nil
+}

+ 50 - 0
docker/registry.go

@@ -0,0 +1,50 @@
+package docker
+
+import (
+	"context"
+	"fmt"
+
+	fw "git.buran.team/main/fairwind"
+
+	moby "github.com/moby/moby/client"
+)
+
+type Registry struct {
+	ctx      context.Context
+	log      *fw.Log
+	client   *Docker
+	address  string
+	login    string
+	password string
+}
+
+func NewRegistry(ctx context.Context, log *fw.Log, client *Docker, address string, login string, password string) *Registry {
+	return &Registry{
+		ctx:      ctx,
+		log:      log,
+		client:   client,
+		address:  address,
+		login:    login,
+		password: password,
+	}
+}
+
+func (this *Registry) Address() string {
+	return this.address
+}
+
+func (this *Registry) Authorize() error {
+	_, err := this.client.Docker.RegistryLogin(
+		this.ctx,
+		moby.RegistryLoginOptions{
+			ServerAddress: this.address,
+			Username:      this.login,
+			Password:      this.password,
+		},
+	)
+	if err != nil {
+		return fmt.Errorf("can't authorize in registry: %w", err)
+	}
+
+	return nil
+}

+ 22 - 0
executor/check.go

@@ -0,0 +1,22 @@
+package executor
+
+import (
+	schemepkg "git.buran.team/main/cep/scheme"
+)
+
+type Check interface {
+	ID() string
+	Check() CheckResult
+}
+
+func NewCheck(global *Global, ticket *Ticket, checkScheme schemepkg.Check, container Container) (Check, error) {
+	switch checkScheme.Kind {
+	case schemepkg.CHECK_KIND_BASH:
+		return NewCheckBash(global, ticket, container, checkScheme)
+
+	case schemepkg.CHECK_KIND_FILE:
+		return NewCheckFile(global, ticket, container, checkScheme)
+	}
+
+	return nil, ErrKindUnknown
+}

+ 58 - 0
executor/check_bash.go

@@ -0,0 +1,58 @@
+package executor
+
+import (
+	"fmt"
+
+	schemepkg "git.buran.team/main/cep/scheme"
+	fw "git.buran.team/main/fairwind"
+)
+
+type CheckBash struct {
+	global    *Global
+	ticket    *Ticket
+	id        string
+	container Container
+	timeout   int
+	command   string
+	result    *Result
+}
+
+func NewCheckBash(global *Global, ticket *Ticket, container Container, checkScheme schemepkg.Check) (*CheckBash, error) {
+	result, err := NewResult(checkScheme.KindBash.Result)
+	if err != nil {
+		return nil, fmt.Errorf("can't create bash check: %w", err)
+	}
+
+	return &CheckBash{
+		global:    global,
+		ticket:    ticket,
+		id:        checkScheme.ID,
+		container: container,
+		timeout:   checkScheme.Timeout,
+		command:   checkScheme.KindBash.Command,
+		result:    result,
+	}, nil
+}
+
+func (this *CheckBash) ID() string {
+	return this.ID()
+}
+
+func (this *CheckBash) Check() CheckResult {
+	this.global.Log.Debug("beginning bash check", fw.LogValue("uuid", this.ticket.UUID))
+
+	result := this.container.Execute(this.timeout, this.command)
+	if result.Error != nil {
+		this.global.Log.Debug("bash check ended with error", fw.LogValue("uuid", this.ticket.UUID), fw.LogError(result.Error))
+		return CheckResult{
+			Success: false,
+			Data:    nil,
+		}
+	}
+
+	this.global.Log.Debug("bash check ended successfully", fw.LogValue("uuid", this.ticket.UUID))
+	return CheckResult{
+		Success: this.result.Check(result.Code, result.Stdout, result.Stderr),
+		Data:    nil,
+	}
+}

+ 73 - 0
executor/check_file.go

@@ -0,0 +1,73 @@
+package executor
+
+import (
+	"fmt"
+
+	schemepkg "git.buran.team/main/cep/scheme"
+	fw "git.buran.team/main/fairwind"
+)
+
+type CheckFile struct {
+	global     *Global
+	ticket     *Ticket
+	id         string
+	container  Container
+	timeout    int
+	path       string
+	processors []Processor
+}
+
+func NewCheckFile(global *Global, ticket *Ticket, container Container, checkScheme schemepkg.Check) (*CheckFile, error) {
+	processors := []Processor{}
+	for _, processorScheme := range checkScheme.KindFile.Processors {
+		processor, err := NewProcessor(processorScheme)
+		if err != nil {
+			return nil, fmt.Errorf("can't create result: %w", err)
+		}
+
+		processors = append(processors, processor)
+	}
+
+	return &CheckFile{
+		global:     global,
+		ticket:     ticket,
+		id:         checkScheme.ID,
+		container:  container,
+		timeout:    checkScheme.Timeout,
+		path:       checkScheme.KindFile.Path,
+		processors: processors,
+	}, nil
+}
+
+func (this *CheckFile) ID() string {
+	return this.id
+}
+
+func (this *CheckFile) Check() CheckResult {
+	this.global.Log.Debug("beginning file check", fw.LogValue("uuid", this.ticket.UUID))
+
+	buffer, err := this.container.Read(this.timeout, this.path)
+	if err != nil {
+		this.global.Log.Debug("file check ended with error", fw.LogValue("uuid", this.ticket.UUID), fw.LogError(err))
+		return CheckResult{
+			Success: false,
+			Data:    nil,
+		}
+	}
+
+	for _, processor := range this.processors {
+		if !processor.Process(buffer) {
+			this.global.Log.Debug("file check ended with error", fw.LogValue("uuid", this.ticket.UUID))
+			return CheckResult{
+				Success: false,
+				Data:    nil,
+			}
+		}
+	}
+
+	this.global.Log.Debug("file check ended successfully", fw.LogValue("uuid", this.ticket.UUID))
+	return CheckResult{
+		Success: true,
+		Data:    nil,
+	}
+}

+ 55 - 0
executor/checker.go

@@ -0,0 +1,55 @@
+package executor
+
+import (
+	"fmt"
+
+	schemepkg "git.buran.team/main/cep/scheme"
+)
+
+type FindContainer func(string) (Container, error)
+
+type Checker interface {
+	Check() *TaskResult
+}
+
+func NewChecker(global *Global, ticket *Ticket, finder FindContainer, checkerScheme schemepkg.Checker) (Checker, error) {
+	switch checkerScheme.Kind {
+	case schemepkg.CHECKER_KIND_SEQUENTIAL:
+		checks := []Check{}
+		for _, checkScheme := range checkerScheme.KindSequential.Checks {
+			container, err := finder(checkScheme.ContainerID)
+			if err != nil {
+				return nil, fmt.Errorf("can't create checker: %w", err)
+			}
+
+			check, err := NewCheck(global, ticket, checkScheme, container)
+			if err != nil {
+				return nil, fmt.Errorf("can't create checker: %w", err)
+			}
+
+			checks = append(checks, check)
+		}
+
+		return NewSequentialChecker(global, ticket, checks)
+
+	case schemepkg.CHECKER_KIND_PARALLEL:
+		checks := []Check{}
+		for _, checkScheme := range checkerScheme.KindParallel.Checks {
+			container, err := finder(checkScheme.ContainerID)
+			if err != nil {
+				return nil, fmt.Errorf("can't create checker: %w", err)
+			}
+
+			check, err := NewCheck(global, ticket, checkScheme, container)
+			if err != nil {
+				return nil, fmt.Errorf("can't create checker: %w", err)
+			}
+
+			checks = append(checks, check)
+		}
+
+		return NewParallelChecker(global, ticket, checks)
+	}
+
+	return nil, ErrKindUnknown
+}

+ 46 - 0
executor/checker_parallel.go

@@ -0,0 +1,46 @@
+package executor
+
+import (
+	"sync"
+
+	fw "git.buran.team/main/fairwind"
+)
+
+type ParallelChecker struct {
+	global *Global
+	ticket *Ticket
+	checks []Check
+}
+
+func NewParallelChecker(global *Global, ticket *Ticket, checks []Check) (*ParallelChecker, error) {
+	return &ParallelChecker{
+		global: global,
+		ticket: ticket,
+		checks: checks,
+	}, nil
+}
+
+func (this *ParallelChecker) Check() *TaskResult {
+	this.global.Log.Debug("beginning checks parallel", fw.LogValue("uuid", this.ticket.UUID))
+	defer this.global.Log.Debug("parallel checks ended", fw.LogValue("uuid", this.ticket.UUID))
+
+	var mutex sync.Mutex
+	report := map[string]CheckResult{}
+
+	var waitGroup sync.WaitGroup
+	waitGroup.Add(len(this.checks))
+
+	for _, check := range this.checks {
+		go func(check Check) {
+			defer waitGroup.Done()
+			result := check.Check()
+
+			mutex.Lock()
+			report[check.ID()] = result
+			mutex.Unlock()
+		}(check)
+	}
+
+	waitGroup.Wait()
+	return NewTaskResultSuccess(report)
+}

+ 29 - 0
executor/checker_sequential.go

@@ -0,0 +1,29 @@
+package executor
+
+import fw "git.buran.team/main/fairwind"
+
+type SequentialChecker struct {
+	global *Global
+	ticket *Ticket
+	checks []Check
+}
+
+func NewSequentialChecker(global *Global, ticket *Ticket, checks []Check) (*SequentialChecker, error) {
+	return &SequentialChecker{
+		global: global,
+		ticket: ticket,
+		checks: checks,
+	}, nil
+}
+
+func (this *SequentialChecker) Check() *TaskResult {
+	this.global.Log.Debug("beginning checks sequential", fw.LogValue("uuid", this.ticket.UUID))
+	defer this.global.Log.Debug("sequential checks ended", fw.LogValue("uuid", this.ticket.UUID))
+
+	report := map[string]CheckResult{}
+	for _, check := range this.checks {
+		report[check.ID()] = check.Check()
+	}
+
+	return NewTaskResultSuccess(report)
+}

+ 19 - 0
executor/condition.go

@@ -0,0 +1,19 @@
+package executor
+
+import (
+	schemepkg "git.buran.team/main/cep/scheme"
+)
+
+type Condition interface {
+	ID() string
+	Check() bool
+}
+
+func NewCondition(global *Global, ticket *Ticket, container Container, conditionScheme schemepkg.Condition) (Condition, error) {
+	switch conditionScheme.Kind {
+	case schemepkg.CONDITION_KIND_BASH:
+		return NewConditionBash(global, ticket, container, conditionScheme)
+	}
+
+	return nil, ErrKindUnknown
+}

+ 51 - 0
executor/condition_bash.go

@@ -0,0 +1,51 @@
+package executor
+
+import (
+	"fmt"
+
+	schemepkg "git.buran.team/main/cep/scheme"
+	fw "git.buran.team/main/fairwind"
+)
+
+type ConditionBash struct {
+	global    *Global
+	ticket    *Ticket
+	container Container
+	id        string
+	timeout   int
+	command   string
+	result    *Result
+}
+
+func NewConditionBash(global *Global, ticket *Ticket, container Container, conditionScheme schemepkg.Condition) (*ConditionBash, error) {
+	result, err := NewResult(conditionScheme.KindBash.Result)
+	if err != nil {
+		return nil, fmt.Errorf("can't create bash condition: %w", err)
+	}
+
+	return &ConditionBash{
+		global:    global,
+		ticket:    ticket,
+		container: container,
+		id:        conditionScheme.ID,
+		timeout:   conditionScheme.Timeout,
+		command:   conditionScheme.KindBash.Command,
+		result:    result,
+	}, nil
+}
+
+func (this *ConditionBash) ID() string {
+	return this.id
+}
+
+func (this *ConditionBash) Check() bool {
+	this.global.Log.Debug("checking bash condition", fw.LogValue("uuid", this.ticket.UUID))
+	defer this.global.Log.Debug("bash condition checked", fw.LogValue("uuid", this.ticket.UUID))
+
+	result := this.container.Execute(this.timeout, this.command)
+	if result.Error != nil {
+		return false
+	}
+
+	return this.result.Check(result.Code, result.Stdout, result.Stderr)
+}

+ 42 - 0
executor/container.go

@@ -0,0 +1,42 @@
+package executor
+
+import (
+	"fmt"
+
+	schemepkg "git.buran.team/main/cep/scheme"
+)
+
+type ExecutionResult struct {
+	Code   int
+	Stdout []byte
+	Stderr []byte
+	Error  error
+}
+
+type Container interface {
+	ID() string
+	Start() *ContainerResult
+	Stop() *ContainerResult
+	Execute(timeout int, command string) ExecutionResult
+	Read(timeout int, path string) ([]byte, error)
+}
+
+func NewContainer(global *Global, ticket *Ticket, network *Network, containerIndex int, containerScheme schemepkg.Container) (Container, error) {
+	switch containerScheme.Kind {
+	case schemepkg.CONTAINER_KIND_ONESHOT:
+		return NewOneshotContainer(global, ticket, network, containerIndex, containerScheme)
+
+	case schemepkg.CONTAINER_KIND_CONTINIOUS:
+		return NewContiniousContainer(global, ticket, network, containerIndex, containerScheme)
+	}
+
+	return nil, ErrKindUnknown
+}
+
+func containerName(networkIndex int, containerIndex int) string {
+	return fmt.Sprintf("container-%d-%d", networkIndex, containerIndex)
+}
+
+func containerIP(networkIndex int, containerIndex int) string {
+	return fmt.Sprintf("172.16.%d.%d", networkIndex, containerIndex+2)
+}

+ 281 - 0
executor/container_continious.go

@@ -0,0 +1,281 @@
+package executor
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	dockerpkg "git.buran.team/main/cep/docker"
+	schemepkg "git.buran.team/main/cep/scheme"
+	fw "git.buran.team/main/fairwind"
+)
+
+var ErrContainerStartConditionsNotMeet = errors.New("container start conditions not meet")
+var ErrContainerStopConditionsNotMeet = errors.New("container stop conditions not meet")
+
+type ContiniousContainer struct {
+	global          *Global
+	ticket          *Ticket
+	id              string
+	container       *dockerpkg.Container
+	startTimeout    int
+	stopTimeout     int
+	startConditions []Condition
+	stopConditions  []Condition
+}
+
+func NewContiniousContainer(
+	global *Global,
+	ticket *Ticket,
+	network *Network,
+	containerIndex int,
+	containerScheme schemepkg.Container,
+) (*ContiniousContainer, error) {
+	variables := []dockerpkg.Variable{}
+	for _, variable := range containerScheme.Variables {
+		variables = append(
+			variables,
+			dockerpkg.Variable{
+				Key:   variable.Key,
+				Value: variable.Value,
+			},
+		)
+	}
+
+	hosts := []dockerpkg.ContainerSettingsHost{}
+	for _, host := range network.Hosts() {
+		hosts = append(
+			hosts,
+			dockerpkg.ContainerSettingsHost{
+				Name: host.Name,
+				IP:   host.IP,
+			},
+		)
+	}
+
+	container := dockerpkg.NewContainer(
+		global.Ctx,
+		global.Log,
+		global.Docker,
+		network.Network(),
+		dockerpkg.NewImage(
+			global.Ctx,
+			global.Log,
+			global.Docker,
+			global.Registry,
+			containerScheme.Image.Tag,
+			containerScheme.Image.Version,
+			containerScheme.Image.Pull,
+		),
+		dockerpkg.ContainerSettings{
+			Name: containerName(
+				ticket.Index,
+				containerIndex,
+			),
+			Command:   containerScheme.Command,
+			Variables: variables,
+			Hosts:     hosts,
+			Binds:     containerScheme.Binds,
+			Permissions: dockerpkg.ContainerSettingsPermissions{
+				Privileged:   containerScheme.Permissions.Privileged,
+				Capabilities: containerScheme.Permissions.Capabilities,
+			},
+			Network: dockerpkg.ContainerSettingsNetwork{
+				IP: containerIP(
+					ticket.Index,
+					containerIndex,
+				),
+			},
+			Resources: dockerpkg.ContainerResources{
+				CPU:    containerScheme.Resources.CPU,
+				Memory: containerScheme.Resources.Memory,
+			},
+		},
+	)
+
+	this := &ContiniousContainer{
+		global:       global,
+		ticket:       ticket,
+		id:           containerScheme.ID,
+		container:    container,
+		startTimeout: containerScheme.KindContinious.Start.Timeout,
+		stopTimeout:  containerScheme.KindContinious.Stop.Timeout,
+	}
+
+	startConditions := []Condition{}
+	if containerScheme.KindContinious.Start.Conditions != nil {
+		for _, conditionScheme := range containerScheme.KindContinious.Start.Conditions {
+			condition, err := NewCondition(global, ticket, this, conditionScheme)
+			if err != nil {
+				return nil, fmt.Errorf("can't create container: %w", err)
+			}
+
+			startConditions = append(startConditions, condition)
+		}
+	}
+
+	stopConditions := []Condition{}
+	if containerScheme.KindContinious.Stop.Conditions != nil {
+		for _, conditionScheme := range containerScheme.KindContinious.Stop.Conditions {
+			condition, err := NewCondition(global, ticket, this, conditionScheme)
+			if err != nil {
+				return nil, fmt.Errorf("can't create container: %w", err)
+			}
+
+			stopConditions = append(stopConditions, condition)
+		}
+	}
+
+	this.startConditions = startConditions
+	this.stopConditions = stopConditions
+
+	return this, nil
+}
+
+func (this *ContiniousContainer) ID() string {
+	return this.id
+}
+
+func (this *ContiniousContainer) Start() *ContainerResult {
+	var err error
+	this.global.Log.Debug("starting container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
+	defer func() {
+		if err != nil {
+			this.global.Log.Debug("container start failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id), fw.LogError(err))
+		} else {
+			this.global.Log.Debug("container started", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
+		}
+	}()
+
+	err = this.container.Start()
+	if err != nil {
+		return NewContainerResultError(
+			fmt.Errorf("can't start container: %w", err),
+		)
+	}
+
+	success, conditionsStatus := checkConditions(this.startTimeout, this.startConditions)
+	if !success {
+		err = ErrContainerStartConditionsNotMeet
+		return NewContainerResultError(err)
+	}
+
+	return NewContainerResultSuccess(
+		conditionsStatus,
+		0,
+		[]byte{},
+		[]byte{},
+	)
+}
+
+func (this *ContiniousContainer) Stop() *ContainerResult {
+	var err error
+	this.global.Log.Debug("stopping container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
+	defer func() {
+		if err != nil {
+			this.global.Log.Debug("container stop failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id), fw.LogError(err))
+		} else {
+			this.global.Log.Debug("container stopped", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
+		}
+	}()
+
+	containerStatus := this.container.Status(false) // NOTE: shutdown messages ommited
+	if containerStatus.Error != nil {
+		err = containerStatus.Error
+		return NewContainerResultError(
+			containerStatus.Error,
+		)
+	}
+
+	err = this.container.Stop()
+	if err != nil {
+		return NewContainerResultError(
+			fmt.Errorf("can't stop container: %w", err),
+		)
+	}
+
+	success, conditionsStatus := checkConditions(this.stopTimeout, this.stopConditions)
+	if !success {
+		err = ErrContainerStopConditionsNotMeet
+		return NewContainerResultError(err)
+	}
+
+	return NewContainerResultSuccess(
+		conditionsStatus,
+		0,
+		containerStatus.Stdout,
+		containerStatus.Stderr,
+	)
+}
+
+func (this *ContiniousContainer) Execute(timeout int, command string) ExecutionResult {
+	result := this.container.Execute(timeout, command)
+	return ExecutionResult{
+		Error:  result.Error,
+		Code:   result.Code,
+		Stdout: result.Stdout,
+		Stderr: result.Stderr,
+	}
+}
+
+func (this *ContiniousContainer) Read(timeout int, path string) ([]byte, error) {
+	return this.container.Read(timeout, path)
+}
+
+func checkConditions(timeout int, conditions []Condition) (bool, map[string]bool) {
+	if len(conditions) == 0 {
+		return true, map[string]bool{}
+	}
+
+	ctx, cancel := context.WithTimeout(
+		context.Background(),
+		time.Duration(timeout)*time.Millisecond,
+	)
+	defer cancel()
+
+	var mutex sync.Mutex
+	status := map[string]bool{}
+
+	var successCounter atomic.Int64
+	successCounter.Store(0)
+
+	var waitGroup sync.WaitGroup
+	waitGroup.Add(len(conditions))
+
+	for _, condition := range conditions {
+		status[condition.ID()] = false
+	}
+
+	for _, condition := range conditions {
+		go func(condition Condition) {
+			defer waitGroup.Done()
+
+			for {
+				select {
+				case <-ctx.Done():
+					return
+
+				case <-time.After(100 * time.Millisecond): // NOTE: basic 100-millisecond startup delay
+				}
+
+				if !condition.Check() {
+					continue
+				}
+
+				successCounter.Add(1)
+
+				mutex.Lock()
+				status[condition.ID()] = true
+				mutex.Unlock()
+
+				return
+			}
+		}(condition)
+	}
+
+	waitGroup.Wait()
+	return successCounter.Load() == int64(len(conditions)), status
+}

+ 222 - 0
executor/container_oneshot.go

@@ -0,0 +1,222 @@
+package executor
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"time"
+
+	dockerpkg "git.buran.team/main/cep/docker"
+	schemepkg "git.buran.team/main/cep/scheme"
+	fw "git.buran.team/main/fairwind"
+)
+
+var ErrContainerContextCancelled = errors.New("container context cancelled")
+var ErrContainerContextTimeout = errors.New("container context timeout")
+var ErrContainerResultInvalid = errors.New("container result invalid")
+
+// TODO: add start/stop conditions
+type OneshotContainer struct {
+	global    *Global
+	ticket    *Ticket
+	id        string
+	container *dockerpkg.Container
+	timeout   int
+	result    *Result
+}
+
+func NewOneshotContainer(
+	global *Global,
+	ticket *Ticket,
+	network *Network,
+	containerIndex int,
+	containerScheme schemepkg.Container,
+) (*OneshotContainer, error) {
+	varialbles := []dockerpkg.Variable{}
+	for _, variable := range containerScheme.Variables {
+		varialbles = append(
+			varialbles,
+			dockerpkg.Variable{
+				Key:   variable.Key,
+				Value: variable.Value,
+			},
+		)
+	}
+
+	hosts := []dockerpkg.ContainerSettingsHost{}
+	for _, host := range network.Hosts() {
+		hosts = append(
+			hosts,
+			dockerpkg.ContainerSettingsHost{
+				Name: host.Name,
+				IP:   host.IP,
+			},
+		)
+	}
+
+	container := dockerpkg.NewContainer(
+		global.Ctx,
+		global.Log,
+		global.Docker,
+		network.Network(),
+		dockerpkg.NewImage(
+			global.Ctx,
+			global.Log,
+			global.Docker,
+			global.Registry,
+			containerScheme.Image.Tag,
+			containerScheme.Image.Version,
+			containerScheme.Image.Pull,
+		),
+		dockerpkg.ContainerSettings{
+			Name: containerName(
+				ticket.Index,
+				containerIndex,
+			),
+			Command:   containerScheme.Command,
+			Variables: varialbles,
+			Hosts:     hosts,
+			Binds:     containerScheme.Binds,
+			Permissions: dockerpkg.ContainerSettingsPermissions{
+				Privileged:   containerScheme.Permissions.Privileged,
+				Capabilities: containerScheme.Permissions.Capabilities,
+			},
+			Network: dockerpkg.ContainerSettingsNetwork{
+				IP: containerIP(
+					ticket.Index,
+					containerIndex,
+				),
+			},
+			Resources: dockerpkg.ContainerResources{},
+		},
+	)
+
+	result, err := NewResult(containerScheme.KindOneshot.Result)
+	if err != nil {
+		return nil, fmt.Errorf("can't create oneshot container: %w", err)
+	}
+
+	return &OneshotContainer{
+		global:    global,
+		ticket:    ticket,
+		id:        containerScheme.ID,
+		container: container,
+		timeout:   containerScheme.KindOneshot.Timeout,
+		result:    result,
+	}, nil
+}
+
+func (this *OneshotContainer) ID() string {
+	return this.id
+}
+
+func (this *OneshotContainer) Start() *ContainerResult {
+	var err error
+	this.global.Log.Debug("starting container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
+	defer func() {
+		if err != nil {
+			this.global.Log.Debug("container start failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id), fw.LogError(err))
+		} else {
+			this.global.Log.Debug("container started", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
+		}
+	}()
+
+	// Start
+	err = this.container.Start()
+	if err != nil {
+		return NewContainerResultError(
+			fmt.Errorf("can't start container: %w", err),
+		)
+	}
+
+	// Wait
+	ctx, cancel := context.WithTimeout(
+		context.Background(),
+		time.Duration(this.timeout)*time.Millisecond,
+	)
+	defer cancel()
+
+	var status dockerpkg.ExecutionResult
+	for {
+		select {
+		case <-this.global.Ctx.Done():
+			err = ErrContainerContextCancelled
+			return NewContainerResultError(err)
+
+		case <-ctx.Done():
+			err = ErrContainerContextTimeout
+			return NewContainerResultError(err)
+
+		case <-time.After(100 * time.Millisecond): // NOTE: basic 100-millisecond startup delay
+		}
+
+		this.global.Log.Debug("check container status", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
+		status = this.container.Status(true)
+		if status.Error != nil {
+			this.global.Log.Information("container startup error", fw.LogError(status.Error))
+			continue
+		}
+
+		this.global.Log.Debug("check container result", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
+		if !this.result.Check(
+			status.Code,
+			status.Stdout,
+			status.Stderr,
+		) {
+			this.global.Log.Information("container startup check failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
+			err = ErrContainerResultInvalid
+			return NewContainerResultError(err)
+		}
+
+		this.global.Log.Debug("container checks succeed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
+		break
+	}
+
+	// Done
+	return NewContainerResultSuccess(
+		map[string]bool{},
+		status.Code,
+		status.Stdout,
+		status.Stderr,
+	)
+}
+
+func (this *OneshotContainer) Stop() *ContainerResult {
+	var err error
+	this.global.Log.Debug("stopping container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
+	defer func() {
+		if err != nil {
+			this.global.Log.Debug("container stop failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id), fw.LogError(err))
+		} else {
+			this.global.Log.Debug("container stopped", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", this.id))
+		}
+	}()
+
+	err = this.container.Stop()
+	if err != nil {
+		return NewContainerResultError(
+			fmt.Errorf("can't stop container: %w", err),
+		)
+	}
+
+	return NewContainerResultSuccess(
+		map[string]bool{},
+		0,
+		[]byte{},
+		[]byte{},
+	)
+}
+
+func (this *OneshotContainer) Execute(timeout int, command string) ExecutionResult {
+	result := this.container.Execute(timeout, command)
+	return ExecutionResult{
+		Error:  result.Error,
+		Code:   result.Code,
+		Stdout: result.Stdout,
+		Stderr: result.Stderr,
+	}
+}
+
+func (this *OneshotContainer) Read(timeout int, path string) ([]byte, error) {
+	return this.container.Read(timeout, path)
+}

+ 5 - 0
executor/error.go

@@ -0,0 +1,5 @@
+package executor
+
+import "errors"
+
+var ErrKindUnknown = errors.New("kind unknown")

+ 16 - 0
executor/global.go

@@ -0,0 +1,16 @@
+package executor
+
+import (
+	"context"
+
+	fw "git.buran.team/main/fairwind"
+
+	dockerpkg "git.buran.team/main/cep/docker"
+)
+
+type Global struct {
+	Ctx      context.Context
+	Log      *fw.Log
+	Docker   *dockerpkg.Docker
+	Registry *dockerpkg.Registry
+}

+ 90 - 0
executor/network.go

@@ -0,0 +1,90 @@
+package executor
+
+import (
+	"fmt"
+	"net/netip"
+
+	fw "git.buran.team/main/fairwind"
+
+	dockerpkg "git.buran.team/main/cep/docker"
+)
+
+type Host struct {
+	Name string
+	IP   string
+}
+
+type Network struct {
+	global  *Global
+	network *dockerpkg.Network
+	hosts   []Host
+}
+
+func NewNetwork(global *Global, ticket *Ticket, hosts []Host) (*Network, error) {
+	cidr, err := networkCIDR(ticket.Index)
+	if err != nil {
+		global.Log.Information("can't create network", fw.LogError(err))
+		return nil, err
+	}
+
+	gateway, err := networkGateway(ticket.Index)
+	if err != nil {
+		global.Log.Information("can't create network", fw.LogError(err))
+		return nil, err
+	}
+
+	network := dockerpkg.NewNetwork(
+		global.Ctx,
+		global.Log,
+		global.Docker,
+		networkName(
+			ticket.Index,
+		),
+		cidr,
+		gateway,
+	)
+
+	return &Network{
+		global:  global,
+		network: network,
+		hosts:   hosts,
+	}, nil
+}
+
+func (this *Network) Create() error {
+	return this.network.Create()
+}
+
+func (this *Network) Delete() error {
+	return this.network.Delete()
+}
+
+func (this *Network) Network() *dockerpkg.Network {
+	return this.network
+}
+
+func (this *Network) Hosts() []Host {
+	return this.hosts
+}
+
+func networkName(index int) string {
+	return fmt.Sprintf("network-%d", index)
+}
+
+func networkCIDR(index int) (netip.Prefix, error) {
+	prefix, err := netip.ParsePrefix(fmt.Sprintf("172.16.%d.0/24", index))
+	if err != nil {
+		return netip.Prefix{}, fmt.Errorf("can't parse prefix: %w", err)
+	}
+
+	return prefix, nil
+}
+
+func networkGateway(index int) (netip.Addr, error) {
+	address, err := netip.ParseAddr(fmt.Sprintf("172.16.%d.1", index))
+	if err != nil {
+		return netip.Addr{}, fmt.Errorf("can't parse address: %w", err)
+	}
+
+	return address, nil
+}

+ 24 - 0
executor/processor.go

@@ -0,0 +1,24 @@
+package executor
+
+import (
+	schemepkg "git.buran.team/main/cep/scheme"
+)
+
+type Processor interface {
+	Process(buffer []byte) bool
+}
+
+func NewProcessor(processorScheme schemepkg.Processor) (Processor, error) {
+	switch processorScheme.Kind {
+	case schemepkg.PROCESSOR_KIND_MATCH:
+		return NewMatchProcessor(processorScheme)
+
+	case schemepkg.PROCESSOR_KIND_NOT_MATCH:
+		return NewNotMatchProcessor(processorScheme)
+
+	case schemepkg.PROCESSOR_KIND_EQUAL:
+		return NewEqualProcessor(processorScheme)
+	}
+
+	return nil, ErrKindUnknown
+}

+ 41 - 0
executor/processor_equal.go

@@ -0,0 +1,41 @@
+package executor
+
+import (
+	schemepkg "git.buran.team/main/cep/scheme"
+)
+
+type EqualProcessor struct {
+	content []byte
+}
+
+func NewEqualProcessor(processorScheme schemepkg.Processor) (*EqualProcessor, error) {
+	// Normalize
+	content := processorScheme.KindEqual.Content
+	if len(content) > 0 && content[len(content)-1] == '\n' {
+		content = content[:len(content)-1]
+	}
+
+	return &EqualProcessor{
+		content: []byte(content),
+	}, nil
+}
+
+func (this *EqualProcessor) Process(buffer []byte) bool {
+	// Normalize
+	if len(buffer) > 0 && buffer[len(buffer)-1] == '\n' {
+		buffer = buffer[:len(buffer)-1]
+	}
+
+	// Compare
+	if len(buffer) != len(this.content) {
+		return false
+	}
+
+	for i := range buffer {
+		if buffer[i] != this.content[i] {
+			return false
+		}
+	}
+
+	return true
+}

+ 44 - 0
executor/processor_match.go

@@ -0,0 +1,44 @@
+package executor
+
+import (
+	"fmt"
+	"regexp"
+	"strings"
+
+	schemepkg "git.buran.team/main/cep/scheme"
+)
+
+type MatchProcessor struct {
+	expression *regexp.Regexp
+	count      int
+}
+
+func NewMatchProcessor(processorScheme schemepkg.Processor) (*MatchProcessor, error) {
+	e, err := regexp.Compile(processorScheme.KindMatch.Expression)
+	if err != nil {
+		return nil, fmt.Errorf("can't create match processor: %w", err)
+	}
+
+	return &MatchProcessor{
+		expression: e,
+		count:      processorScheme.KindMatch.Count,
+	}, nil
+}
+
+func (this *MatchProcessor) Process(buffer []byte) bool {
+	lines := strings.Split(string(buffer), "\n")
+	count := 0
+	for _, line := range lines {
+		if !this.expression.Match([]byte(line)) {
+			continue
+		}
+
+		count += 1
+	}
+
+	if this.count == -1 {
+		return count > 0
+	}
+
+	return count == this.count
+}

+ 35 - 0
executor/processor_not_match.go

@@ -0,0 +1,35 @@
+package executor
+
+import (
+	"fmt"
+	"regexp"
+	"strings"
+
+	schemepkg "git.buran.team/main/cep/scheme"
+)
+
+type NotMatchProcessor struct {
+	expression *regexp.Regexp
+}
+
+func NewNotMatchProcessor(processorScheme schemepkg.Processor) (*NotMatchProcessor, error) {
+	e, err := regexp.Compile(processorScheme.KindNotMatch.Expression)
+	if err != nil {
+		return nil, fmt.Errorf("can't create not-match processor: %w", err)
+	}
+
+	return &NotMatchProcessor{
+		expression: e,
+	}, nil
+}
+
+func (this *NotMatchProcessor) Process(buffer []byte) bool {
+	lines := strings.Split(string(buffer), "\n")
+	for _, line := range lines {
+		if this.expression.Match([]byte(line)) {
+			return false
+		}
+	}
+
+	return true
+}

+ 65 - 0
executor/result.go

@@ -0,0 +1,65 @@
+package executor
+
+import (
+	"fmt"
+
+	schemepkg "git.buran.team/main/cep/scheme"
+)
+
+type Result struct {
+	Code             int
+	StdoutProcessors []Processor
+	StderrProcessors []Processor
+}
+
+func NewResult(resultScheme schemepkg.Result) (*Result, error) {
+	stdoutProcessors := []Processor{}
+	if resultScheme.Stdout != nil {
+		for _, processorScheme := range resultScheme.Stdout {
+			processor, err := NewProcessor(processorScheme)
+			if err != nil {
+				return nil, fmt.Errorf("can't create result: %w", err)
+			}
+
+			stdoutProcessors = append(stdoutProcessors, processor)
+		}
+	}
+
+	stderrProcessors := []Processor{}
+	if resultScheme.Stderr != nil {
+		for _, processorScheme := range resultScheme.Stderr {
+			processor, err := NewProcessor(processorScheme)
+			if err != nil {
+				return nil, fmt.Errorf("can't create result: %w", err)
+			}
+
+			stderrProcessors = append(stderrProcessors, processor)
+		}
+	}
+
+	return &Result{
+		Code:             resultScheme.Code,
+		StdoutProcessors: stdoutProcessors,
+		StderrProcessors: stderrProcessors,
+	}, nil
+}
+
+func (this *Result) Check(code int, stdout []byte, stderr []byte) bool {
+	if code != this.Code {
+		return false
+	}
+
+	for _, processor := range this.StdoutProcessors {
+		if !processor.Process(stdout) {
+			return false
+		}
+	}
+
+	for _, processor := range this.StderrProcessors {
+		if !processor.Process(stderr) {
+			return false
+		}
+	}
+
+	return true
+}

+ 206 - 0
executor/task.go

@@ -0,0 +1,206 @@
+package executor
+
+import (
+	"errors"
+	"fmt"
+	"sync"
+	"sync/atomic"
+
+	fw "git.buran.team/main/fairwind"
+
+	schemepkg "git.buran.team/main/cep/scheme"
+)
+
+var ErrContainerNotFound = errors.New("container not found")
+
+type Task struct {
+	global     *Global
+	ticket     *Ticket
+	id         string
+	network    *Network
+	containers []Container
+	checker    Checker
+}
+
+func NewTask(global *Global, ticket *Ticket, taskScheme schemepkg.Task) (*Task, error) {
+	this := &Task{
+		global: global,
+		ticket: ticket,
+		id:     taskScheme.ID,
+	}
+
+	// Build hosts
+	hosts := []Host{}
+	for containerIndex, containerScheme := range taskScheme.Containers {
+		hosts = append(
+			hosts,
+			Host{
+				Name: containerScheme.ID,
+				IP:   containerIP(ticket.Index, containerIndex),
+			},
+		)
+	}
+
+	// Create network
+	network, err := NewNetwork(global, ticket, hosts)
+	if err != nil {
+		return nil, fmt.Errorf("can't create network: %w", err)
+	}
+	this.network = network
+
+	// Create containers
+	containers := []Container{}
+	for containerIndex, containerScheme := range taskScheme.Containers {
+		container, err := NewContainer(global, ticket, network, containerIndex, containerScheme)
+		if err != nil {
+			return nil, fmt.Errorf("can't create container: %w", err)
+		}
+
+		containers = append(containers, container)
+	}
+	this.containers = containers
+
+	// Create checker
+	checker, err := NewChecker(
+		global,
+		ticket,
+		func(ID string) (Container, error) {
+			for _, container := range this.containers {
+				if container.ID() == ID {
+					return container, nil
+				}
+			}
+
+			return nil, ErrContainerNotFound
+		},
+		taskScheme.Checker,
+	)
+	if err != nil {
+		return nil, fmt.Errorf("can't create checker: %w", err)
+	}
+	this.checker = checker
+
+	// Done
+	return this, nil
+}
+
+func (this *Task) Execute() *TaskResult {
+	// Create/Delete network
+	this.global.Log.Information("creating network", fw.LogValue("uuid", this.ticket.UUID))
+	err := this.network.Create()
+	if err != nil {
+		this.global.Log.Information("network creation failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogError(err))
+		return NewTaskResultFailed()
+	} else {
+		this.global.Log.Information("network created", fw.LogValue("uuid", this.ticket.UUID))
+	}
+
+	defer func() {
+		this.global.Log.Information("deleting network", fw.LogValue("uuid", this.ticket.UUID))
+		err := this.network.Delete()
+		if err != nil {
+			this.global.Log.Information("network deletion failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogError(err))
+		} else {
+			this.global.Log.Information("network deleted", fw.LogValue("uuid", this.ticket.UUID))
+		}
+	}()
+
+	containerStartResults := map[string]*ContainerResult{}
+	start := func() bool {
+		var errorCounter atomic.Int64
+		var waitGroup sync.WaitGroup
+		errorCounter.Store(0)
+
+		this.global.Log.Information("starting containers", fw.LogValue("uuid", this.ticket.UUID))
+
+		waitGroup.Add(len(this.containers))
+		for _, container := range this.containers {
+			go func(container Container) {
+				defer waitGroup.Done()
+
+				this.global.Log.Information("starting container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()))
+				containerResult := container.Start()
+				if containerResult.Error != nil {
+					this.global.Log.Information("container start failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()), fw.LogError(err))
+					errorCounter.Add(1)
+				} else {
+					this.global.Log.Information("container started successfully", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()))
+				}
+
+				containerStartResults[container.ID()] = containerResult
+			}(container)
+		}
+
+		waitGroup.Wait()
+
+		if errorCounter.Load() == 0 {
+			this.global.Log.Information("containers started successfully", fw.LogValue("uuid", this.ticket.UUID))
+		} else {
+			this.global.Log.Information("containers started with errors", fw.LogValue("uuid", this.ticket.UUID))
+		}
+
+		return errorCounter.Load() == 0
+	}
+
+	containerStopResults := map[string]*ContainerResult{}
+	stop := func() bool {
+		var errorCounter atomic.Int64
+		var waitGroup sync.WaitGroup
+		errorCounter.Store(0)
+
+		this.global.Log.Information("stopping containers", fw.LogValue("uuid", this.ticket.UUID))
+
+		waitGroup.Add(len(this.containers))
+		for _, container := range this.containers {
+			go func(container Container) {
+				defer waitGroup.Done()
+
+				this.global.Log.Information("stopping container", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()))
+				containerResult := container.Stop()
+				if containerResult.Error != nil {
+					this.global.Log.Information("container stopping failed", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()), fw.LogError(err))
+					errorCounter.Add(1)
+				} else {
+					this.global.Log.Information("container stopped successfully", fw.LogValue("uuid", this.ticket.UUID), fw.LogValue("id", container.ID()))
+				}
+
+				containerStopResults[container.ID()] = containerResult
+			}(container)
+		}
+
+		waitGroup.Wait()
+
+		if errorCounter.Load() == 0 {
+			this.global.Log.Information("containers stopped successfully", fw.LogValue("uuid", this.ticket.UUID))
+		} else {
+			this.global.Log.Information("containers stopped with errors", fw.LogValue("uuid", this.ticket.UUID))
+		}
+
+		return errorCounter.Load() == 0
+	}
+
+	// Start
+	if !start() {
+		if !stop() {
+			// ...
+		}
+
+		return NewTaskResultFailed()
+	}
+
+	// Check
+	this.global.Log.Information("starting checker", fw.LogValue("uuid", this.ticket.UUID))
+	result := this.checker.Check()
+	result.Start = containerStartResults
+	this.global.Log.Information("checker finished", fw.LogValue("uuid", this.ticket.UUID))
+
+	// Stop
+	if !stop() {
+		return result
+	}
+
+	// Done
+	result.Stop = containerStopResults
+	result.Clean = true
+	return result
+}

+ 55 - 0
executor/task_result.go

@@ -0,0 +1,55 @@
+package executor
+
+import "strings"
+
+type ContainerResult struct {
+	Conditions map[string]bool `json:"conditions"`
+	Code       int             `json:"code"`
+	Stdout     []string        `json:"stdout"`
+	Stderr     []string        `json:"stderr"`
+	Error      error           `json:"error"`
+}
+
+func NewContainerResultSuccess(conditions map[string]bool, code int, stdout []byte, stderr []byte) *ContainerResult {
+	return &ContainerResult{
+		Conditions: conditions,
+		Code:       code,
+		Stdout:     strings.Split(string(stdout), "\n"),
+		Stderr:     strings.Split(string(stderr), "\n"),
+	}
+}
+
+func NewContainerResultError(err error) *ContainerResult {
+	return &ContainerResult{
+		Error: err,
+	}
+}
+
+type CheckResult struct {
+	Success bool `json:"success"`
+	Data    any  `json:"data"`
+}
+
+type TaskResult struct {
+	Success bool                        `json:"success"`
+	Clean   bool                        `json:"clean"`
+	Start   map[string]*ContainerResult `json:"start"`
+	Stop    map[string]*ContainerResult `json:"stop"`
+	Checks  map[string]CheckResult      `json:"checks"`
+}
+
+func NewTaskResultSuccess(report map[string]CheckResult) *TaskResult {
+	return &TaskResult{
+		Success: true,
+		Clean:   false,
+		Checks:  report,
+	}
+}
+
+func NewTaskResultFailed() *TaskResult {
+	return &TaskResult{
+		Success: false,
+		Clean:   false,
+		Checks:  map[string]CheckResult{},
+	}
+}

+ 126 - 0
executor/tvm.go

@@ -0,0 +1,126 @@
+package executor
+
+import (
+	"errors"
+	"sync"
+
+	"github.com/google/uuid"
+)
+
+var ErrTicketPoolFull = errors.New("ticket pool full")
+var ErrTicketNotTaken = errors.New("ticket not taken")
+var ErrTicketNotFound = errors.New("ticket not found")
+
+type Ticket struct {
+	Index int
+	UUID  string
+	Taken bool
+}
+
+type TVM struct {
+	mutex   sync.Mutex
+	tickets map[int]*Ticket
+	size    int
+}
+
+func NewTVM(size int) *TVM {
+	tickets := map[int]*Ticket{}
+	for i := 0; i < size; i++ {
+		tickets[i] = &Ticket{
+			Index: i,
+			UUID:  "",
+			Taken: false,
+		}
+	}
+
+	return &TVM{
+		tickets: tickets,
+		size:    size,
+	}
+}
+
+func (this *TVM) AcquireTicket() (*Ticket, error) {
+	this.mutex.Lock()
+	defer this.mutex.Unlock()
+
+	for i := range len(this.tickets) {
+		if this.tickets[i].Taken {
+			continue
+		}
+
+		this.tickets[i].UUID = uuid.New().String()
+		this.tickets[i].Taken = true
+		return this.tickets[i], nil
+	}
+
+	return nil, ErrTicketPoolFull
+}
+
+func (this *TVM) ReleaseTicket(UUID string) error {
+	this.mutex.Lock()
+	defer this.mutex.Unlock()
+
+	for index, ticket := range this.tickets {
+		if ticket.UUID != UUID {
+			continue
+		}
+
+		if !ticket.Taken {
+			return ErrTicketNotTaken
+		}
+
+		this.tickets[index].UUID = ""
+		this.tickets[index].Taken = false
+		return nil
+	}
+
+	return ErrTicketNotFound
+}
+
+func (this *TVM) FindTicket(UUID string) (*Ticket, error) {
+	this.mutex.Lock()
+	defer this.mutex.Unlock()
+
+	for _, ticket := range this.tickets {
+		if ticket.UUID != UUID {
+			continue
+		}
+
+		if !ticket.Taken {
+			return nil, ErrTicketNotTaken
+		}
+
+		return ticket, nil
+	}
+
+	return nil, ErrTicketNotFound
+}
+
+func (this *TVM) Capacity() int {
+	this.mutex.Lock()
+	defer this.mutex.Unlock()
+
+	counter := 0
+	for i := range len(this.tickets) {
+		if !this.tickets[i].Taken {
+			counter += 1
+		}
+	}
+
+	return counter
+}
+
+func (this *TVM) HasTicket(UUID string) bool {
+	this.mutex.Lock()
+	defer this.mutex.Unlock()
+
+	for _, ticket := range this.tickets {
+		if ticket.UUID != UUID {
+			continue
+		}
+
+		return ticket.Taken
+	}
+
+	return false
+}

+ 32 - 0
go.mod

@@ -0,0 +1,32 @@
+module git.buran.team/main/cep
+
+go 1.26.3
+
+require (
+	git.buran.team/main/fairwind v0.0.0-20260606033541-7899270ab8ca
+	github.com/docker/docker v28.5.2+incompatible
+	github.com/google/uuid v1.6.0
+	github.com/moby/moby/api v1.54.2
+	github.com/moby/moby/client v0.4.1
+)
+
+require (
+	github.com/Microsoft/go-winio v0.6.2 // indirect
+	github.com/containerd/errdefs v1.0.0 // indirect
+	github.com/containerd/errdefs/pkg v0.3.0 // indirect
+	github.com/distribution/reference v0.6.0 // indirect
+	github.com/docker/go-connections v0.7.0 // indirect
+	github.com/docker/go-units v0.5.0 // indirect
+	github.com/felixge/httpsnoop v1.0.4 // indirect
+	github.com/go-logr/logr v1.4.2 // indirect
+	github.com/go-logr/stdr v1.2.2 // indirect
+	github.com/moby/docker-image-spec v1.3.1 // indirect
+	github.com/opencontainers/go-digest v1.0.0 // indirect
+	github.com/opencontainers/image-spec v1.1.1 // indirect
+	go.opentelemetry.io/auto/sdk v1.1.0 // indirect
+	go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
+	go.opentelemetry.io/otel v1.35.0 // indirect
+	go.opentelemetry.io/otel/metric v1.35.0 // indirect
+	go.opentelemetry.io/otel/trace v1.35.0 // indirect
+	golang.org/x/sys v0.33.0 // indirect
+)

+ 65 - 0
go.sum

@@ -0,0 +1,65 @@
+git.buran.team/main/fairwind v0.0.0-20260606033541-7899270ab8ca h1:z7QeP6STsIXXqKkwxBrhQh/p+akb1x34OE07En4NT9M=
+git.buran.team/main/fairwind v0.0.0-20260606033541-7899270ab8ca/go.mod h1:1w12qzBrANoxepbCn3vmQpxksWbtbmMK53/xAct7Biw=
+github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
+github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
+github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI=
+github.com/containerd/errdefs v1.0.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M=
+github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE=
+github.com/containerd/errdefs/pkg v0.3.0/go.mod h1:NJw6s9HwNuRhnjJhM7pylWwMyAkmCQvQ4GpJHEqRLVk=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
+github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
+github.com/docker/docker v28.5.2+incompatible h1:DBX0Y0zAjZbSrm1uzOkdr1onVghKaftjlSWt4AFexzM=
+github.com/docker/docker v28.5.2+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
+github.com/docker/go-connections v0.7.0 h1:6SsRfJddP22WMrCkj19x9WKjEDTB+ahsdiGYf0mN39c=
+github.com/docker/go-connections v0.7.0/go.mod h1:no1qkHdjq7kLMGUXYAduOhYPSJxxvgWBh7ogVvptn3Q=
+github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
+github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
+github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
+github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
+github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
+github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
+github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
+github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
+github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
+github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
+github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0=
+github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo=
+github.com/moby/moby/api v1.54.2 h1:wiat9QAhnDQjA7wk1kh/TqHz2I1uUA7M7t9SAl/JNXg=
+github.com/moby/moby/api v1.54.2/go.mod h1:+RQ6wluLwtYaTd1WnPLykIDPekkuyD/ROWQClE83pzs=
+github.com/moby/moby/client v0.4.1 h1:DMQgisVoMkmMs7fp3ROSdiBnoAu8+vo3GggFl06M/wY=
+github.com/moby/moby/client v0.4.1/go.mod h1:z52C9O2POPOsnxZAy//WtKcQ32P+jT/NGeXu/7nfjGQ=
+github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
+github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
+github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040=
+github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
+github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
+go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
+go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
+go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 h1:sbiXRNDSWJOTobXh5HyQKjq6wUC5tNybqjIqDpAY4CU=
+go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0/go.mod h1:69uWxva0WgAA/4bu2Yy70SLDBwZXuQ6PbBpbsa5iZrQ=
+go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
+go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
+go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
+go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
+go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY=
+go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg=
+go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o=
+go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w=
+go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
+go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
+golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
+golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q=
+gotest.tools/v3 v3.5.2/go.mod h1:LtdLGcnqToBH83WByAAi/wiwSFCArdFIUV/xxN4pcjA=
+pgregory.net/rapid v1.2.0 h1:keKAYRcjm+e1F0oAuU5F5+YPAWcyxNNRK2wud503Gnk=
+pgregory.net/rapid v1.2.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04=

+ 23 - 0
scheme/check.go

@@ -0,0 +1,23 @@
+package scheme
+
+const CHECK_KIND_BASH = "bash"
+const CHECK_KIND_FILE = "file"
+
+type BashCheck struct {
+	Command string `json:"command" yaml:"command"`
+	Result  Result `json:"result" yaml:"result"`
+}
+
+type FileCheck struct {
+	Path       string      `json:"path" yaml:"path"`
+	Processors []Processor `json:"processors" yaml:"processors"`
+}
+
+type Check struct {
+	ID          string    `json:"id" yaml:"id"`
+	ContainerID string    `json:"container_id" yaml:"container_id"`
+	Timeout     int       `json:"timeout" yaml:"timeout"`
+	Kind        string    `json:"kind" yaml:"kind"`
+	KindBash    BashCheck `json:"kind_bash" yaml:"kind_bash"`
+	KindFile    FileCheck `json:"kind_file" yaml:"kind_file"`
+}

+ 18 - 0
scheme/checker.go

@@ -0,0 +1,18 @@
+package scheme
+
+const CHECKER_KIND_SEQUENTIAL = "sequential"
+const CHECKER_KIND_PARALLEL = "parallel"
+
+type SequentialChecker struct {
+	Checks []Check `json:"checks" yaml:"checks"`
+}
+
+type ParallelChecker struct {
+	Checks []Check `json:"checks" yaml:"checks"`
+}
+
+type Checker struct {
+	Kind           string            `json:"kind" yaml:"kind"`
+	KindSequential SequentialChecker `json:"kind_sequential" yaml:"kind_sequential"`
+	KindParallel   ParallelChecker   `json:"kind_parallel" yaml:"kind_parallel"`
+}

+ 50 - 0
scheme/container.go

@@ -0,0 +1,50 @@
+package scheme
+
+const CONTAINER_KIND_ONESHOT = "oneshot"
+const CONTAINER_KIND_CONTINIOUS = "continious"
+
+type Image struct {
+	Tag     string `json:"tag" yaml:"tag"`
+	Version string `json:"version" yaml:"version"`
+	Pull    bool   `json:"pull" yaml:"pull"`
+}
+
+type Variable struct {
+	Key   string `json:"key" yaml:"key"`
+	Value string `json:"value" yaml:"value"`
+}
+
+type Network struct {
+	Name    string `json:"name" yaml:"name"`
+	CIDR    string `json:"cidr" yaml:"cidr"`
+	Gateway string `json:"gateway" yaml:"gateway"`
+}
+
+type ContainerPermissions struct {
+	Privileged   bool     `json:"privileged" yaml:"privileged"`
+	Capabilities []string `json:"capabilities" yaml:"capabilities"`
+}
+
+type ContainerNetwork struct {
+	IP string `json:"ip" yaml:"ip"`
+}
+
+type ContainerResources struct {
+	CPU    int64 `json:"cpu" yaml:"cpu"`
+	Memory int64 `json:"memory" yaml:"memory"`
+	// TODO: disk iops, network iops
+}
+
+type Container struct {
+	ID             string               `json:"id" yaml:"id"`
+	Image          Image                `json:"image" yaml:"image"`
+	Command        string               `json:"command" yaml:"command"`
+	Variables      []Variable           `json:"environment" yaml:"environment"`
+	Binds          []string             `json:"binds" yaml:"binds"`
+	Permissions    ContainerPermissions `json:"permissions" yaml:"permissions"`
+	Network        ContainerNetwork     `json:"network" yaml:"network"`
+	Resources      ContainerResources   `json:"resources" yaml:"resources"`
+	Kind           string               `json:"kind" yaml:"kind"`
+	KindOneshot    ContainerOneshot     `json:"kind_oneshot" yaml:"kind_oneshot"`
+	KindContinious ContainerContinious  `json:"kind_continious" yaml:"kind_continious"`
+}

+ 24 - 0
scheme/container_continious.go

@@ -0,0 +1,24 @@
+package scheme
+
+const CONDITION_KIND_BASH = "bash"
+
+type BashCondition struct {
+	Command string `json:"command" yaml:"command"`
+	Result  Result `json:"result" yaml:"result"`
+}
+
+type Condition struct {
+	ID       string        `json:"id" yaml:"id"`
+	Timeout  int           `json:"timeout" yaml:"timeout"`
+	Kind     string        `json:"kind" yaml:"kind"`
+	KindBash BashCondition `json:"kind_bash" yaml:"kind_bash"`
+}
+type ContainerContiniousHook struct {
+	Timeout    int         `json:"timeout" yaml:"timeout"`
+	Conditions []Condition `json:"start" yaml:"conditions"`
+}
+
+type ContainerContinious struct {
+	Start ContainerContiniousHook `json:"start" yaml:"start"`
+	Stop  ContainerContiniousHook `json:"stop" yaml:"stop"`
+}

+ 6 - 0
scheme/container_oneshot.go

@@ -0,0 +1,6 @@
+package scheme
+
+type ContainerOneshot struct {
+	Timeout int    `json:"timeout" yaml:"timeout"`
+	Result  Result `json:"result" yaml:"result"`
+}

+ 7 - 0
scheme/registry.go

@@ -0,0 +1,7 @@
+package scheme
+
+type Registry struct {
+	Address  string `json:"address" yaml:"address"`
+	Login    string `json:"login" yaml:"login"`
+	Password string `json:"password" yaml:"password"`
+}

+ 31 - 0
scheme/result.go

@@ -0,0 +1,31 @@
+package scheme
+
+const PROCESSOR_KIND_MATCH = "match"
+const PROCESSOR_KIND_NOT_MATCH = "not_match"
+const PROCESSOR_KIND_EQUAL = "equal"
+
+type MatchProcessor struct {
+	Expression string `json:"expression" yaml:"expression"`
+	Count      int    `json:"count" yaml:"count"`
+}
+
+type NotMatchProcessor struct {
+	Expression string `json:"expression" yaml:"expression"`
+}
+
+type EqualProcessor struct {
+	Content string `json:"content" yaml:"content"`
+}
+
+type Processor struct {
+	Kind         string            `json:"kind" yaml:"kind"`
+	KindMatch    MatchProcessor    `json:"kind_match" yaml:"kind_match"`
+	KindNotMatch NotMatchProcessor `json:"kind_not_match" yaml:"kind_not_match"`
+	KindEqual    EqualProcessor    `json:"kind_equal" yaml:"kind_equal"`
+}
+
+type Result struct {
+	Code   int         `json:"code" yaml:"code"`
+	Stdout []Processor `json:"stdout" yaml:"stdout"`
+	Stderr []Processor `json:"stderr" yaml:"stderr"`
+}

+ 7 - 0
scheme/task.go

@@ -0,0 +1,7 @@
+package scheme
+
+type Task struct {
+	ID         string      `json:"id" yaml:"id"`
+	Containers []Container `json:"containers" yaml:"containers"`
+	Checker    Checker     `json:"checker" yaml:"checker"`
+}