API Reference

yapapi

Golem Python API.

get_version

get_version() -> str

Returns:

the version of the yapapi library package

windows_event_loop_fix

windows_event_loop_fix()

Set up asyncio to use ProactorEventLoop implementation for new event loops on Windows.

This work-around is only needed for Python 3.6 and 3.7. With Python 3.8, ProactorEventLoop is already the default on Windows.

yapapi.strategy

Implementation of strategies for choosing offers from market.

ComputationHistory Objects

class ComputationHistory(Protocol)

A protocol for objects that provide information about the history of current computation.

rejected_last_agreement

| rejected_last_agreement(issuer_id) -> bool

Return True iff the previous agreement proposed to issuer_id has been rejected.

MarketStrategy Objects

class MarketStrategy(DemandDecorator, abc.ABC)

Abstract market strategy.

decorate_demand

| async decorate_demand(demand: DemandBuilder) -> None

Optionally add relevant constraints to a Demand.

score_offer

| async score_offer(offer: rest.market.OfferProposal, history: Optional[ComputationHistory] = None) -> float

Score offer. Better offers should get higher scores.

DummyMS Objects

@dataclass
class DummyMS(MarketStrategy, object)

A default market strategy implementation.

Its score_offer() method returns SCORE_NEUTRAL for every offer with prices that do not exceed maximum prices specified for each counter. For other offers, returns SCORE_REJECTED.

decorate_demand

| async decorate_demand(demand: DemandBuilder) -> None

Ensure that the offer uses PriceModel.LINEAR price model.

score_offer

| async score_offer(offer: rest.market.OfferProposal, history: Optional[ComputationHistory] = None) -> float

Score offer. Returns either SCORE_REJECTED or SCORE_NEUTRAL.

LeastExpensiveLinearPayuMS Objects

@dataclass
class LeastExpensiveLinearPayuMS(MarketStrategy, object)

A strategy that scores offers according to cost for given computation time.

decorate_demand

| async decorate_demand(demand: DemandBuilder) -> None

Ensure that the offer uses PriceModel.LINEAR price model.

score_offer

| async score_offer(offer: rest.market.OfferProposal, history: Optional[ComputationHistory] = None) -> float

Score offer according to cost for expected computation time.

DecreaseScoreForUnconfirmedAgreement Objects

class DecreaseScoreForUnconfirmedAgreement(MarketStrategy)

A market strategy that modifies a base strategy based on history of agreements.

__init__

| __init__(base_strategy, factor)

Arguments:

  • base_strategy: the base strategy around which this strategy is wrapped

  • factor: the factor by which the score of an offer for a provider which

    failed to confirm the last agreement proposed to them will be multiplied

decorate_demand

| async decorate_demand(demand: DemandBuilder) -> None

Decorate demand using the base strategy.

score_offer

| async score_offer(offer: rest.market.OfferProposal, history: Optional[ComputationHistory] = None) -> float

Score offer using the base strategy and apply penalty if needed.

If the offer issuer failed to approve the previous agreement (if any) then the base score is multiplied by self._factor.

yapapi.ctx

Work Objects

class Work(abc.ABC)

prepare

| async prepare()

A hook to be executed on requestor's end before the script is sent to the provider.

register

| register(commands: CommandContainer)

A hook which adds the required command to the exescript.

post

| async post()

A hook to be executed on requestor's end after the script has finished.

timeout

| @property
| timeout() -> Optional[timedelta]

Return the optional timeout set for execution of this work.

Steps Objects

class Steps(Work)

__init__

| __init__(*steps: Work, *, timeout: Optional[timedelta] = None)

Create a Work item consisting of a sequence of steps (subitems).

Arguments:

  • steps: sequence of steps to be executed

  • timeout: timeout for waiting for the steps' results

timeout

| @property
| timeout() -> Optional[timedelta]

Return the optional timeout set for execution of all steps.

prepare

| async prepare()

Execute the prepare hook for all the defined steps.

register

| register(commands: CommandContainer)

Execute the register hook for all the defined steps.

post

| async post()

Execute the post step for all the defined steps.

ExecOptions Objects

@dataclass
class ExecOptions()

Options related to command batch execution.

WorkContext Objects

class WorkContext()

An object used to schedule commands to be sent to provider.

id

Unique identifier for this work context.

provider_name

| @property
| provider_name() -> Optional[str]

Return the name of the provider associated with this work context.

deploy

| deploy()

Schedule a Deploy command.

start

| start(*args: str)

Schedule a Start command.

terminate

| terminate()

Schedule a Terminate command.

send_json

| send_json(json_path: str, data: dict)

Schedule sending JSON data to the provider.

Arguments:

  • json_path: remote (provider) path

  • data: dictionary representing JSON data

Returns:

None

send_bytes

| send_bytes(dst_path: str, data: bytes)

Schedule sending bytes data to the provider.

Arguments:

  • dst_path: remote (provider) path

  • data: bytes to send

Returns:

None

send_file

| send_file(src_path: str, dst_path: str)

Schedule sending file to the provider.

Arguments:

  • src_path: local (requestor) path

  • dst_path: remote (provider) path

Returns:

None

run

| run(cmd: str, *args: Iterable[str], *, env: Optional[Dict[str, str]] = None)

Schedule running a command.

Arguments:

  • cmd: command to run on the provider, e.g. /my/dir/run.sh

  • args: command arguments, e.g. "input1.txt", "output1.txt"

  • env: optional dictionary with environmental variables

Returns:

None

download_file

| download_file(src_path: str, dst_path: str)

Schedule downloading remote file from the provider.

Arguments:

  • src_path: remote (provider) path

  • dst_path: local (requestor) path

Returns:

None

download_bytes

| download_bytes(src_path: str, on_download: Callable[[bytes], Awaitable], limit: int = DOWNLOAD_BYTES_LIMIT_DEFAULT)

Schedule downloading a remote file as bytes

Arguments:

  • src_path: remote (provider) path

  • on_download: the callable to run on the received data

  • limit: the maximum length of the expected byte string

    :return None

download_json

| download_json(src_path: str, on_download: Callable[[Any], Awaitable], limit: int = DOWNLOAD_BYTES_LIMIT_DEFAULT)

Schedule downloading a remote file as JSON

Arguments:

  • src_path: remote (provider) path

  • on_download: the callable to run on the received JSON data

  • limit: the maximum length of the expected remote file

    :return None

commit

| commit(timeout: Optional[timedelta] = None) -> Work

Creates a sequence of commands to be sent to provider.

Returns:

Work object containing the sequence of commands scheduled within this work context before calling this method)

yapapi.executor

An implementation of the new Golem's task executor.

CFG_INVOICE_TIMEOUT

Time to receive invoice from provider after tasks ended.

DEFAULT_EXECUTOR_TIMEOUT

Joint timeout for all tasks submitted to an executor.

Executor Objects

class Executor(AsyncContextManager)

Task executor.

Used to run batch tasks using the specified application package within providers' execution units.

__init__

| @overload
| __init__(*, payload: Optional[Payload] = None, max_workers: int = 5, timeout: timedelta = DEFAULT_EXECUTOR_TIMEOUT, _engine: _Engine)

Initialize the Executor to use a specific Golem _engine.

__init__

| @overload
| __init__(*, budget: Union[float, Decimal], strategy: Optional[MarketStrategy] = None, subnet_tag: Optional[str] = None, driver: Optional[str] = None, network: Optional[str] = None, event_consumer: Optional[Callable[[Event], None]] = None, stream_output: bool = False, payload: Optional[Payload] = None, max_workers: int = 5, timeout: timedelta = DEFAULT_EXECUTOR_TIMEOUT)

Initialize the Executor for standalone usage, with payload parameter.

__init__

| @overload
| __init__(*, budget: Union[float, Decimal], strategy: Optional[MarketStrategy] = None, subnet_tag: Optional[str] = None, driver: Optional[str] = None, network: Optional[str] = None, event_consumer: Optional[Callable[[Event], None]] = None, stream_output: bool = False, max_workers: int = 5, timeout: timedelta = DEFAULT_EXECUTOR_TIMEOUT, package: Optional[Payload] = None)

Initialize the Executor for standalone usage, with package parameter.

__init__

| __init__(*, budget: Optional[Union[float, Decimal]] = None, strategy: Optional[MarketStrategy] = None, subnet_tag: Optional[str] = None, driver: Optional[str] = None, network: Optional[str] = None, event_consumer: Optional[Callable[[Event], None]] = None, stream_output: bool = False, max_workers: int = 5, timeout: timedelta = DEFAULT_EXECUTOR_TIMEOUT, package: Optional[Payload] = None, payload: Optional[Payload] = None, _engine: Optional[_Engine] = None)

Initialize an Executor.

Arguments:

  • budget: [DEPRECATED use Golem instead] maximum budget for payments

  • strategy: [DEPRECATED use Golem instead] market strategy used to

    select providers from the market (e.g. LeastExpensiveLinearPayuMS or DummyMS)

  • subnet_tag: [DEPRECATED use Golem instead] use only providers in the

    subnet with the subnet_tag name

  • driver: [DEPRECATED use Golem instead] name of the payment driver

    to use or None to use the default driver;

    only payment platforms with the specified driver will be used

  • network: [DEPRECATED use Golem instead] name of the network

    to use or None to use the default network;

    only payment platforms with the specified network will be used

  • event_consumer: [DEPRECATED use Golem instead] a callable that

    processes events related to the computation;

    by default it is a function that logs all events

  • stream_output: [DEPRECATED use Golem instead]

    stream computation output from providers

  • max_workers: maximum number of concurrent workers performing the computation

  • payload: specification of payload (for example a VM package) that needs to be

    deployed on providers in order to compute tasks with this Executor

  • timeout: timeout for the whole computation

driver

| @property
| driver() -> str

Return the payment driver used for this Executor's engine.

network

| @property
| network() -> str

Return the payment network used for this Executor's engine.

__aenter__

| async __aenter__() -> "Executor"

Start computation using this Executor.

__aexit__

| async __aexit__(exc_type, exc_val, exc_tb)

Release resources used by this Executor.

emit

| emit(event: events.Event) -> None

Emit a computation event using this Executor's engine.

submit

| async submit(worker: Callable[
| [WorkContext, AsyncIterator[Task[D, R]]],
| AsyncGenerator[WorkItem, Awaitable[List[events.CommandEvent]]],
| ], data: Union[AsyncIterator[Task[D, R]], Iterable[Task[D, R]]]) -> AsyncIterator[Task[D, R]]

Submit a computation to be executed on providers.

Arguments:

  • worker: a callable that takes a WorkContext object and a list o tasks,

    adds commands to the context object and yields committed commands

  • data: an iterable or an async generator iterator of Task objects to be computed

    on providers

Returns:

yields computation progress events

yapapi.executor.strategy

yapapi.executor.ctx

yapapi.executor._smartq

YAPAPI internal module. This is not a part of the public API. It can change at any time.

Handle Objects

class Handle(Generic[Item])

Handle of the queue item, iow, binding between a queue item and a specific consumer.

Additionally it keeps track of the previously used consumers of the given item to prevent them from being assigned to this item again.

SmartQueue Objects

class SmartQueue(Generic[Item])

__init__

| __init__(items: AsyncIterator[Item])

Arguments:

  • items: the items to be iterated over

has_unassigned_items

| has_unassigned_items() -> bool

Check if this queue has a new or rescheduled item immediately available.

get

| async get(consumer: "Consumer[Item]") -> Handle[Item]

Get a handle to the next item to be processed (either a new one or rescheduled).

mark_done

| async mark_done(handle: Handle[Item]) -> None

Mark an item, referred to by handle, as done.

reschedule

| async reschedule(handle: Handle[Item]) -> None

Free the item for reassignment to another consumer.

reschedule_all

| async reschedule_all(consumer: "Consumer[Item]")

Make all items currently assigned to the consumer available for reassignment.

wait_until_done

| async wait_until_done() -> None

Wait until all items in the queue are processed.

Consumer Objects

class Consumer(
Generic[Item],
AsyncIterator[Handle[Item]],
AsyncIterable[Handle[Item]],
ContextManager["Consumer[Item]"])

Provides an interface to asynchronously iterate over items in the given queue while cooperating with other consumers attached to this queue.

current_item

| @property
| current_item() -> Optional[Item]

The most-recent queue item that has been fetched to be processed by this consumer.

yapapi.executor.task

Task Objects

class Task(Generic[TaskData, TaskResult])

One computation unit.

Represents one computation unit that will be run on the provider (e.g. rendering of one frame of an animation).

__init__

| __init__(data: TaskData)

Create a new Task object.

Arguments:

  • data: contains information needed to prepare command list for the provider

running_time

| @property
| running_time() -> Optional[timedelta]

Return the running time of the task (if in progress) or time it took to complete it.

accept_result

| accept_result(result: Optional[TaskResult] = None) -> None

Accept the result of this task.

Must be called when the result is correct to mark this task as completed.

Arguments:

  • result: task computation result (optional)

Returns:

None

reject_result

| reject_result(reason: Optional[str] = None, retry: bool = False) -> None

Reject the result of this task.

Must be called when the result is not correct to indicate that the task should be retried.

Arguments:

  • reason: task rejection description (optional)

Returns:

None

yapapi.executor.events

yapapi.log

Utilities for logging computation events via the standard logging module.

Functions in this module fall into two categories:

  • Functions that convert computation events generated by the Executor.submit method to calls to the standard Python logging module, using the loggers in the "yapapi" namespace (e.g. logging.getLogger("yapapi.executor")). These functions should be passed as event_consumer arguments to Executor().

  • Functions that perform configuration of the logging module itself. Since logging configuration is in general a responsibility of the code that uses yapapi as a library, we only provide the enable_default_logger function in this category, that enables logging to stderr with level logging.INFO and, optionally, to a given file with level logging.DEBUG.

Functions for handling events

Several functions from this module can be passed as event_consumer callback to yapapi.Executor().

For detailed, human-readable output use the log_event function:

Executor(..., event_consumer=yapapi.log.log_event)

For even more detailed, machine-readable output use log_event_repr:

Executor(..., event_consumer=yapapi.log.log_event_repr)

For summarized, human-readable output use log_summary():

Executor(..., event_consumer=yapapi.log.log_summary())

Summary output can be combined with a detailed one by passing the detailed logger as an argument to log_summary:

Executor(
...
event_consumer=yapapi.log.log_summary(yapapi.log.log_event_repr)
)

enable_default_logger

enable_default_logger(format_: str = "[%(asctime)s %(levelname)s %(name)s] %(message)s", log_file: Optional[str] = None, debug_activity_api: bool = False, debug_market_api: bool = False, debug_payment_api: bool = False)

Enable the default logger that logs messages to stderr with level INFO.

If log_file is specified, the logger with output messages with level DEBUG to the given file.

log_event

log_event(event: events.Event) -> None

Log event with a human-readable description.

log_event_repr

log_event_repr(event: events.Event) -> None

Log the result of calling repr(event).

SummaryLogger Objects

class SummaryLogger()

Aggregates information from computation events to provide a high-level summary.

The logger's log() method can be used as event_consumer callback to Executor(). It will aggregate the events generated by Executor.submit() and output some summary information.

The optional wrapped_emitter argument can be used for chaining event emitters: each event logged with log() is first passed to wrapped_emitter.

For example, with the following setup, each event emitted by executor will be logged by log_event_repr, and additionally, certain events will cause summary messages to be logged.

detailed_logger = log_event_repr
summary_logger = SummaryLogger(wrapped_emitter=detailed_logger).log
executor = Executor(..., event_consumer=summary_logger)

__init__

| __init__(wrapped_emitter: Optional[Callable[[events.Event], None]] = None)

Create a SummaryLogger.

log

| log(event: events.Event) -> None

Register an event.

log_summary

log_summary(wrapped_emitter: Optional[Callable[[events.Event], None]] = None)

Output a summary of computation.

This is a utility function that creates a SummaryLogger instance wrapping an optional wrapped_emitter and returns its log method.

See the documentation of SummaryLogger for more information.

pluralize

pluralize(num: int, thing: str) -> str

Return the string f"1 {thing}" or f"{num} {thing}s", depending on num.

str_capped

str_capped(object: Any, max_len: int) -> str

Return the string representation of object trimmed to max_len.

Trailing ellipsis is added to the returned string if the original had to be trimmed.

yapapi.rest

Mid-level binding for Golem REST API.

Serves as a more convenient interface between the agent code and the REST API.

yapapi.rest.payment

Allocation Objects

@dataclass
class Allocation(_Link)

Payment reservation for task processing.

id

Allocation object id

amount

Total amount allocated

payment_platform

Payment platform, e.g. NGNT

payment_address

Payment address, e.g. 0x123...

expires

Allocation expiration timestamp

Payment Objects

class Payment(object)

new_allocation

| new_allocation(amount: Decimal, payment_platform: str, payment_address: str, *, expires: Optional[datetime] = None, make_deposit: bool = False) -> ResourceCtx[Allocation]

Creates new allocation.

  • amount: Allocation amount.

  • expires: expiration timestamp. by default 30 minutes from now.

  • make_deposit: (unimplemented).

allocations

| async allocations() -> AsyncIterator[Allocation]

Lists all active allocations.

Example:

Listing all active allocations

from yapapi import rest

async def list_allocations(payment_api: rest.Payment): async for allocation in payment_api.allocations():

  • print(f'''allocation - {allocation.id}

    amount={allocation.amount},

    expires={allocation.expires}''')

yapapi.rest.configuration

Configuration Objects

class Configuration(object)

REST API's setup and top-level access utility.

By default, it expects the yagna daemon to be available locally and listening on the default port. The urls for the specific APIs are then based on this default base URL.

It requires one external argument, namely Yagna's application key, which is used to authenticate with the daemon. The application key must be either specified explicitly using the app_key argument or provided by the YAGNA_APPKEY environment variable.

If YAGNA_API_URL environment variable exists, it will be used as a base URL for all REST API URLs. Example value: http://127.0.10.10:7500 (no trailing slash).

Other than that, the URLs of each specific REST API can be overridden using the following environment variables:

  • YAGNA_MARKET_URL

  • YAGNA_PAYMENT_URL

  • YAGNA_ACTIVITY_URL

app_key

| @property
| app_key() -> str

Yagna daemon's application key used to access the REST API.

market_url

| @property
| market_url() -> str

The URL of the Market REST API

payment_url

| @property
| payment_url() -> str

The URL of the Payment REST API

activity_url

| @property
| activity_url() -> str

The URL of the Activity REST API

market

| market() -> ya_market.ApiClient

Return a REST client for the Market API.

payment

| payment() -> ya_payment.ApiClient

Return a REST client for the Payment API.

activity

| activity() -> ya_activity.ApiClient

Return a REST client for the Activity API.

yapapi.rest.resource

yapapi.rest.activity

ActivityService Objects

class ActivityService(object)

A convenience helper to facilitate the creation of an Activity.

new_activity

| async new_activity(agreement_id: str, stream_events: bool = False) -> "Activity"

Create an activity within bounds of the specified agreement.

Returns:

the object that represents the Activity and allows to query and control its state :rtype: Activity

Activity Objects

class Activity(AsyncContextManager["Activity"])

Mid-level wrapper for REST's Activity endpoint

state

| async state() -> yaa.ActivityState

Query the state of the activity.

send

| async send(script: List[dict], deadline: Optional[datetime] = None) -> "Batch"

Send the execution script to the provider's execution unit.

__aexit__

| async __aexit__(exc_type, exc_val, exc_tb) -> None

Call DestroyActivity API operation.

CommandExecutionError Objects

class CommandExecutionError(Exception)

An exception that indicates that a command failed on a provider.

command

The command that failed.

message

Optional error message from exe unit.

stderr

Stderr produced by the command running on provider.

BatchTimeoutError Objects

class BatchTimeoutError(Exception)

An exception that indicates that an execution of a batch of commands timed out.

Batch Objects

class Batch(abc.ABC, AsyncIterable[events.CommandEventContext])

Abstract base class for iterating over events related to a batch running on provider.

seconds_left

| seconds_left() -> float

Return how many seconds are left until the deadline.

id

| @property
| id()

Return the ID of this batch.

PollingBatch Objects

class PollingBatch(Batch)

A Batch implementation that polls the server repeatedly for command status.

StreamingBatch Objects

class StreamingBatch(Batch)

A Batch implementation that uses event streaming to return command status.

yapapi.rest.market

AgreementDetails Objects

class AgreementDetails(object)

View Objects

@dataclass
class View()

A certain fragment of an agreement's properties.

extract

| extract(m: Type[_ModelType]) -> _ModelType

Extract properties for the given model from this view's properties.

provider_view

| @property
| provider_view() -> View

Get the view of provider's properties in this Agreement.

requestor_view

| @property
| requestor_view() -> View

Get the view of requestor's properties in this Agreement.

Agreement Objects

class Agreement(object)

Mid-level interface to the REST's Agreement model.

confirm

| async confirm() -> bool

Sign and send the agreement to the provider and then wait for it to be approved.

Returns:

True if the agreement has been confirmed, False otherwise

terminate

| async terminate(reason: dict) -> bool

Terminate the agreement.

Returns:

True is the agreement has been successfully terminated, False otherwise.

OfferProposal Objects

class OfferProposal(object)

Mid-level interface to handle the negotiation phase between the parties.

reject

| async reject(reason: str = "Rejected")

Reject the Offer.

respond

| async respond(props: dict, constraints: str) -> str

Create an agreeement Proposal for a received Offer, based on our Demand.

create_agreement

| async create_agreement(timeout=timedelta(hours=1)) -> Agreement

Create an Agreement based on this Proposal.

Subscription Objects

class Subscription(object)

Mid-level interface to REST API's Subscription model.

details

| @property
| details() -> models.Demand

Returns:

the Demand for which the Subscription has been registered.

delete

| async delete()

Unsubscribe this Demand from the market.

events

| async events() -> AsyncIterator[OfferProposal]

Yield counter-proposals based on the incoming, matching Offers.

Market Objects

class Market(object)

Mid-level interface to the Market REST API.

subscribe

| subscribe(props: dict, constraints: str) -> AsyncResource[Subscription]

Create a subscription for a demand specified by the supplied properties and constraints.

subscriptions

| async subscriptions() -> AsyncIterator[Subscription]

Yield all the subscriptions that this requestor agent has on the market.

yapapi._cli

yapapi._cli.payment

Allocation Objects

class Allocation()

Payment allocation management.

list

| @async_run
| async list(details: bool = False)

Lists current active payment allocation

clear

| @async_run
| async clear()

Removes all active payment allocations

Invoices Objects

class Invoices()

Invoice management.

accept

| @async_run
| async accept(allocation_id: str, invoice_id: str)

Accepts given invoice_id with allocation_id

Arguments:

  • allocation_id: Allocation from which invoice will be paid. see

    allocation list.

  • invoice_id: Invoice identifier.

list

| @async_run
| async list(by_status: Optional[str] = None)

Lists all invoices.

yapapi._cli.run

yapapi._cli.market

Demand Objects

class Demand()

Offer subscription management.

list

| @async_run
| async list(only_expired: bool = False)

Lists all active demands

clear

| @async_run
| async clear(only_expired: bool = False)

Removes demands.

By default removes all demands.

Arguments:

  • only_expired: removes only expired demands.

yapapi.engine

DEBIT_NOTE_MIN_TIMEOUT

Shortest debit note acceptance timeout the requestor will accept.

NoPaymentAccountError Objects

class NoPaymentAccountError(Exception)

The error raised if no payment account for the required driver/network is available.

required_driver

Payment driver required for the account.

required_network

Network required for the account.

__init__

| __init__(required_driver: str, required_network: str)

Initialize NoPaymentAccountError.

Arguments:

  • required_driver: payment driver for which initialization was required

  • required_network: payment network for which initialization was required

WorkItem

The type of items yielded by a generator created by the worker function supplied by user.

exescript_ids

An iterator providing unique ids used to correlate events related to a single exe script.

_Engine Objects

class _Engine(AsyncContextManager)

Base execution engine containing functions common to all modes of operation.

__init__

| __init__(*, budget: Union[float, Decimal], strategy: Optional[MarketStrategy] = None, subnet_tag: Optional[str] = None, driver: Optional[str] = None, network: Optional[str] = None, event_consumer: Optional[Callable[[events.Event], None]] = None, stream_output: bool = False, app_key: Optional[str] = None)

Initialize the engine.

Arguments:

  • budget: maximum budget for payments

  • strategy: market strategy used to select providers from the market

    (e.g. LeastExpensiveLinearPayuMS or DummyMS)

  • subnet_tag: use only providers in the subnet with the subnet_tag name

  • driver: name of the payment driver to use or None to use the default driver;

    only payment platforms with the specified driver will be used

  • network: name of the network to use or None to use the default network;

    only payment platforms with the specified network will be used

  • event_consumer: a callable that processes events related to the

    computation; by default it is a function that logs all events

  • stream_output: stream computation output from providers

  • app_key: optional Yagna application key. If not provided, the default is to

    get the value from YAGNA_APPKEY environment variable

create_demand_builder

| async create_demand_builder(expiration_time: datetime, payload: Payload) -> DemandBuilder

Create a DemandBuilder for given payload and expiration_time.

driver

| @property
| driver() -> str

Return the name of the payment driver used by this engine.

network

| @property
| network() -> str

Return the name of the payment network used by this engine.

storage_manager

| @property
| storage_manager()

Return the storage manager used by this engine.

strategy

| @property
| strategy() -> MarketStrategy

Return the instance of MarketStrategy used by this engine.

emit

| emit(event: events.Event) -> None

Emit an event to be consumed by this engine's event consumer.

__aenter__

| async __aenter__() -> "_Engine"

Initialize resources and start background services used by this engine.

accept_payments_for_agreement

| async accept_payments_for_agreement(agreement_id: str, *, partial: bool = False) -> None

Add given agreement to the set of agreements for which invoices should be accepted.

accept_debit_notes_for_agreement

| accept_debit_notes_for_agreement(agreement_id: str) -> None

Add given agreement to the set of agreements for which debit notes should be accepted.

add_job

| add_job(job: "Job")

Register a job with this engine.

finalize_job

| @staticmethod
| finalize_job(job: "Job")

Mark a job as finished.

PaymentDecorator Objects

@dataclass
class PaymentDecorator(DemandDecorator)

A DemandDecorator that adds payment-related constraints and properties to a Demand.

decorate_demand

| async decorate_demand(demand: DemandBuilder)

Add properties and constraints to a Demand.

create_activity

| async create_activity(agreement_id: str)

Create an activity for given agreement_id.

process_batches

| async process_batches(agreement_id: str, activity: rest.activity.Activity, command_generator: AsyncGenerator[WorkItem, Awaitable[List[events.CommandEvent]]]) -> None

Send command batches produced by command_generator to activity.

Job Objects

class Job()

Functionality related to a single job.

Responsible for posting a Demand to market and collecting Offer proposals for the Demand.

__init__

| __init__(engine: _Engine, expiration_time: datetime, payload: Payload)

Initialize a Job instance.

param engine: a Golem engine which will run this job param expiration_time: expiration time for the job; all agreements created for this job must expire before this date param payload: definition of a service runtime or a runtime package that needs to be deployed on providers for executing this job

find_offers

| async find_offers() -> None

Create demand subscription and process offers.

When the subscription expires, create a new one. And so on...

yapapi.__main__

yapapi.package

yapapi.package.vm

repo

@deprecated(version="0.6.0", reason="moved to yapapi.payload.vm.repo")
async repo(*, image_hash: str, min_mem_gib: float = 0.5, min_storage_gib: float = 2.0) -> Package

Build reference to application package.

  • image_hash: finds package by its contents hash.

  • min_mem_gib: minimal memory required to execute application code.

  • min_storage_gib minimal disk storage to execute tasks.

resolve_repo_srv

@deprecated(version="0.6.0", reason="moved to yapapi.payload.vm.resolve_repo_srv")
resolve_repo_srv(repo_srv, fallback_url=_FALLBACK_REPO_URL) -> str

Get the url of the package repository based on its SRV record address.

Arguments:

  • repo_srv: the SRV domain name

  • fallback_url: temporary hardcoded fallback url in case there's a problem resolving SRV

Returns:

the url of the package repository containing the port :raises: PackageException if no valid service could be reached

yapapi.golem

Golem Objects

class Golem(_Engine)

The main entrypoint of Golem's high-level API.

Provides two methods that reflect the two modes of operation, or two types of jobs that can currently be executed on the Golem network.

The first one - execute_tasks - instructs Golem to take a sequence of tasks that the user wishes to compute on Golem and distributes those among the providers.

The second one - run_service - instructs Golem to spawn a certain number of instances of a service based on a single service specification (a specialized implementation inheriting from yapapi.Service).

While the two models are not necessarily completely disjoint - in that we can create a service that exists to process a certain number of computations and, similarly, we can use the task model to run some service - the main difference lies in the lifetime of such a job.

Whereas a task-based job exists for the purpose of computing the specific sequence of tasks and is done once the whole sequence has been processed, the service-based job is created for a potentially indefinite period and the services spawned within it are kept alive for as long as they're needed.

Additionally, the service interface provides a way to easily define handlers for certain, discrete phases of a lifetime of each service instance - startup, running and shutdown.

As Golem's job includes tracking and executing payments for activities spawned by either mode of operation, it's usually good to have just one instance of Golem active at any given time.

execute_tasks

| async execute_tasks(worker: Callable[
| [WorkContext, AsyncIterator[Task[D, R]]],
| AsyncGenerator[WorkItem, Awaitable[List[events.CommandEvent]]],
| ], data: Union[AsyncIterator[Task[D, R]], Iterable[Task[D, R]]], payload: Payload, max_workers: Optional[int] = None, timeout: Optional[timedelta] = None, budget: Optional[Union[float, Decimal]] = None) -> AsyncIterator[Task[D, R]]

Submit a sequence of tasks to be executed on providers.

Internally, this method creates an instance of yapapi.executor.Executor and calls its submit() method with given worker function and sequence of tasks.

Arguments:

  • worker: an async generator that takes a WorkContext object and a sequence

    of tasks, and generates as sequence of work items to be executed on providers in order

    to compute given tasks

  • data: an iterable or an async generator of Task objects to be computed on providers

  • payload: specification of the payload that needs to be deployed on providers

    (for example, a VM runtime package) in order to compute the tasks, passed to

    the created Executor instance

  • max_workers: maximum number of concurrent workers, passed to the Executor instance

  • timeout: timeout for computing all tasks, passed to the Executor instance

  • budget: budget for computing all tasks, passed to the Executor instance

Returns:

an iterator that yields completed Task objects

example usage:

async def worker(context: WorkContext, tasks: AsyncIterable[Task]):
async for task in tasks:
context.run("/bin/sh", "-c", "date")
future_results = yield context.commit()
results = await future_results
task.accept_result(result=results[-1])
package = await vm.repo(
image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376",
)
async with Golem(budget=1.0, subnet_tag="devnet-beta.2") as golem:
async for completed in golem.execute_tasks(worker, [Task(data=None)], payload=package):
print(completed.result.stdout)

run_service

| async run_service(service_class: Type[Service], num_instances: int = 1, payload: Optional[Payload] = None, expiration: Optional[datetime] = None) -> Cluster

Run a number of instances of a service represented by a given Service subclass.

Arguments:

  • service_class: a subclass of Service that represents the service to be run

  • num_instances: optional number of service instances to run, defaults to a single

    instance

  • payload: optional runtime definition for the service; if not provided, the

    payload specified by the get_payload() method of service_class is used

  • expiration: optional expiration datetime for the service

Returns:

a Cluster of service instances

example usage:

DATE_OUTPUT_PATH = "/golem/work/date.txt"
REFRESH_INTERVAL_SEC = 5
class DateService(Service):
@staticmethod
async def get_payload():
return await vm.repo(
image_hash="d646d7b93083d817846c2ae5c62c72ca0507782385a2e29291a3d376",
)
async def start(self):
# every `DATE_POLL_INTERVAL` write output of `date` to `DATE_OUTPUT_PATH`
self._ctx.run(
"/bin/sh",
"-c",
f"while true; do date > {DATE_OUTPUT_PATH}; sleep {REFRESH_INTERVAL_SEC}; done &",
)
yield self._ctx.commit()
async def run(self):
while True:
await asyncio.sleep(REFRESH_INTERVAL_SEC)
self._ctx.run(
"/bin/sh",
"-c",
f"cat {DATE_OUTPUT_PATH}",
)
future_results = yield self._ctx.commit()
results = await future_results
print(results[0].stdout.strip())
async def main():
async with Golem(budget=1.0, subnet_tag="devnet-beta.2") as golem:
cluster = await golem.run_service(DateService, num_instances=1)
start_time = datetime.now()
while datetime