API Reference

yapapi

Golem Python API.

get_version

get_version() -> str

Returns:

the version of the yapapi library package

yapapi.log

Utilities for logging computation events via the standard logging module.

Functions in this module fall into two categories:

  • Functions that convert computation events generated by the Engine.map method to calls to the standard Python logging module, using the loggers in the "yapapi" namespace (e.g. logging.getLogger("yapapi.runner")). These functions should be passed as event_emitter arguments to Engine().

  • Functions that perform configuration of the logging module itself. Since logging configuration is in general a responsibility of the code that uses yapapi as a library, we only provide the enable_default_logger function in this category, that enables logging to stderr with level logging.INFO and, optionally, to a given file with level logging.DEBUG.

Functions for handling events

Several functions from this module can be passed as event_emitter callback to yapapi.runner.Engine().

For detailed, human-readable output use the log_event function:

Engine(..., event_emitter=yapapi.log.log_event)

For even more detailed, machine-readable output use log_event_repr:

Engine(..., event_emitter=yapapi.log.log_event_repr)

For summarized, human-readable output use log_summary():

Engine(..., event_emitter=yapapi.log.log_summary())

Summary output can be combined with a detailed one by passing the detailed logger as an argument to log_summary:

Engine(
...
event_emitter=yapapi.log.log_summary(yapapi.log.log_event_repr)
)

enable_default_logger

enable_default_logger(format_: str = "[%(asctime)s %(levelname)s %(name)s] %(message)s", log_file: Optional[str] = None)

Enable the default logger that logs messages to stderr with level INFO.

If log_file is specified, the logger with output messages with level DEBUG to the given file.

log_event

log_event(event: events.Event) -> None

Log an event in human-readable representation.

log_event_repr

log_event_repr(event: events.Event) -> None

Log the result of calling __repr__() for the event.

SummaryLogger Objects

class SummaryLogger()

Aggregates information from computation events to provide a high-level summary.

The logger's log() method can be used as event_emitter callback to Engine(). It will aggregate the events generated by Engine.map() and output some summary information.

The optional wrapped_emitter argument can be used for chaining event emitters: each event logged with log() is first passed to wrapped_emitter.

For example, with the following setup, each event emitted by engine will be logged by log_event_repr, and additionally, certain events will cause summary messages to be logged.

detailed_logger = log_event_repr
summary_logger = SummaryLogger(wrapped_emitter=detailed_logger).log
engine = Engine(..., event_emitter=summary_logger)

__init__

| __init__(wrapped_emitter: Optional[Callable[[events.Event], None]] = None)

Create a SummaryLogger.

log

| log(event: events.Event) -> None

Register an event.

log_summary

log_summary(wrapped_emitter: Optional[Callable[[events.Event], None]] = None)

Output a summary of computation.

This is a utility function that creates a SummaryLogger instance wrapping an optional wrapped_emitter and returns its log method.

See the documentation of SummaryLogger for more information.

yapapi.props

Identification Objects

@dataclass
class Identification(Model)

Properties describing the node's identity

name

human-readable name of the Golem node

subnet_tag

the name of the subnet within which the Demands and Offers are matched

Activity Objects

@dataclass()
class Activity(Model)

Activity-related Properties

cost_cap

Sets a Hard cap on total cost of the Activity (regardless of the usage vector or pricing function). The Provider is entitled to 'kill' an Activity which exceeds the capped cost amount indicated by Requestor.

cost_warning

Sets a Soft cap on total cost of the Activity (regardless of the usage vector or pricing function). When the cost_warning amount is reached for the Activity, the Provider is expected to send a Debit Note to the Requestor, indicating the current amount due

timeout_secs

A timeout value for batch computation (eg. used for container-based batch processes). This property allows to set the timeout to be applied by the Provider when running a batch computation: the Requestor expects the Activity to take no longer than the specified timeout value - which implies that eg. the golem.usage.duration_sec counter shall not exceed the specified timeout value.

yapapi.props.builder

DemandBuilder Objects

class DemandBuilder()

Builds a dictionary of properties and constraints from high-level models.

The dictionary represents a Demand object, which is later matched by the new Golem's market implementation against Offers coming from providers to find those providers who can satisfy the requestor's demand.

example usage:

>>> import yapapi
>>> from yapapi import props as yp
>>> from yapapi.props.builder import DemandBuilder
>>> from datetime import datetime, timezone
>>> builder = DemandBuilder()
>>> builder.add(yp.Identification(name="a node", subnet_tag="testnet"))
>>> builder.add(yp.Activity(expiration=datetime.now(timezone.utc)))
>>> builder.__repr__
>>> print(builder)
{'props':
{'golem.node.id.name': 'a node',
'golem.node.debug.subnet': 'testnet',
'golem.srv.comp.expiration': 1601655628772},
'constraints': []}

props

| @property
| props() -> dict

List of properties for this demand.

cons

| @property
| cons() -> str

List of constraints for this demand.

ensure

| ensure(constraint: str)

Add a constraint to the demand definition.

add

| add(m: Model)

Add properties from the specified model to this demand definition.

subscribe

| async subscribe(market: Market) -> Subscription

Create a Demand on the market and subscribe to Offers that will match that Demand.

yapapi.props.com

Payment-related properties.

yapapi.props.inf

Infrastructural properties.

yapapi.props.base

InvalidPropertiesError Objects

class InvalidPropertiesError(Exception)

Raised by Model.from_props(cls, props) when given invalid props.

Model Objects

class Model(abc.ABC)

Base class from which all property models inherit.

Provides helper methods to load the property model data from a dictionary and to get a mapping of all the keys available in the given model.

from_props

| @classmethod
| from_props(cls: Type[ME], props: Props) -> ME

Initialize the model from a dictionary representation.

When provided with a dictionary of properties, it will find the matching keys within it and fill the model fields with the values from the dictionary.

It ignores non-matching keys - i.e. doesn't require filtering of the properties' dictionary before the model is fed with the data. Thus, several models can be initialized from the same dictionary and all models will only load their own data.

keys

| @classmethod
| keys(cls)

Returns:

a mapping between the model's field names and the property keys

example:

>>> import dataclasses
>>> import typing
>>> from yapapi.props.base import Model
>>> @dataclasses.dataclass
... class Identification(Model):
... name: typing.Optional[str] = \
... dataclasses.field(default=None, metadata={"key": "golem.node.id.name"})
...
>>> Identification.keys().name
'golem.node.id.name'

yapapi.runner

An implementation of the new Golem's requestor engine.

CFG_INVOICE_TIMEOUT

Time to receive invoice from provider after tasks ended.

MarketStrategy Objects

class MarketStrategy(abc.ABC)

Abstract market strategy

Engine Objects

class Engine(AsyncContextManager)

Requestor engine.

Used to run tasks using the specified application package within providers' execution units.

__init__

| __init__(*, package: "Package", max_workers: int = 5, timeout: timedelta = timedelta(minutes=5), budget: Union[float, Decimal], strategy: MarketStrategy = DummyMS(), subnet_tag: Optional[str] = None, event_emitter: Optional[Callable[[Event], None]] = None)

Create a new requestor engine.

Arguments:

  • package: a package common for all tasks; vm.repo() function may be

    used to return package from a repository

  • max_workers: maximum number of workers doing the computation

  • timeout: timeout for the whole computation

  • budget: maximum budget for payments

  • strategy: market strategy used to select providers from the market

    (e.g. LeastExpensiveLinearPayuMS or DummyMS)

  • subnet_tag: use only providers in the subnet with the subnet_tag name

  • event_emitter: a callable that emits events related to the

    computation; by default it is a function that logs all events

map

| async map(worker: Callable[[WorkContext, AsyncIterator[Task[D, R]]], AsyncGenerator[Work, None]], data: Iterable[Task[D, R]]) -> AsyncIterator[Task[D, R]]

Run computations on providers.

In other words, map the computation's inputs specified by task data to the ouput resulting from running the execution script on a providers' execution units.

Arguments:

  • worker: a callable that takes a WorkContext object and a list o tasks,

    adds commands to the context object and yields committed commands

  • data: an iterator of Task objects to be computed on providers

Returns:

yields computation progress events

yapapi.runner._smartq

YAPAPI internal module. This is not a part of the public API. It can change at any time.

yapapi.runner.task

Task Objects

class Task(Generic[TaskData, TaskResult])

One computation unit.

Represents one computation unit that will be run on the provider (e.g. rendering of one frame of an animation).

__init__

| __init__(data: TaskData, *, expires: Optional[datetime] = None, timeout: Optional[timedelta] = None)

Create a new Task object.

Arguments:

  • data: contains information needed to prepare command list for the provider

  • expires: expiration datetime

  • timeout: timeout from now; overrides expires parameter if provided

accept_task

| accept_task(result: Optional[TaskResult] = None) -> None

Accept task that was completed.

Must be called when the results of a task are correct.

Arguments:

  • result: computation result (optional)

Returns:

None

reject_task

| reject_task(reason: Optional[str] = None, retry: bool = False) -> None

Reject task.

Must be called when the results of the task are not correct and it should be retried.

Arguments:

  • reason: task rejection description (optional)

Returns:

None

yapapi.runner.vm

repo

async repo(*, image_hash: str, min_mem_gib: float = 0.5, min_storage_gib: float = 2.0) -> Package

Builds reference to application package.

  • image_hash: finds package by its contents hash.

  • min_mem_gib: minimal memory required to execute application code.

  • min_storage_gib minimal disk storage to execute tasks.

yapapi.runner.events

Representing events in Golem computation.

Event Objects

@dataclass(init=False)
class Event()

An abstract base class for types of events emitted by Engine.map().

WorkerFinished Objects

@dataclass
class WorkerFinished(AgreementEvent)

exception

Exception thrown by worker script.

None if worker returns without error.

yapapi.runner.utils

Utility functions and classes used within the yapapi.runner package.

AsyncWrapper Objects

class AsyncWrapper(AsyncContextManager)

Wraps a given callable to provide asynchronous calls.

Example usage:

with AsyncWrapper(func) as wrapper: wrapper.async_call("Hello", world=True) wrapper.async_call("Bye!")

The above code will make two asynchronous calls to func. The results of the calls, if any, are discarded, so this class is most useful for wrapping callables that return None.

async_call

| async_call(*args, **kwargs) -> None

Schedule an asynchronous call to the wrapped callable.

yapapi.runner.ctx

Work Objects

class Work(abc.ABC)

prepare

| async prepare()

A hook to be executed on requestor's end before the script is sent to the provider.

register

| register(commands: CommandContainer)

A hook which adds the required command to the exescript.

post

| async post()

A hook to be executed on requestor's end after the script has finished.

_Steps Objects

class _Steps(Work)

prepare

| async prepare()

Execute the prepare hook for all the defined steps.

register

| register(commands: CommandContainer)

Execute the register hook for all the defined steps.

post

| async post()

Execute the post step for all the defined steps.

WorkContext Objects

class WorkContext()

An object used to schedule commands to be sent to provider.

send_json

| send_json(json_path: str, data: dict)

Schedule sending JSON data to the provider.

Arguments:

  • json_path: remote (provider) path

  • data: dictionary representing JSON data

Returns:

None

send_file

| send_file(src_path: str, dst_path: str)

Schedule sending file to the provider.

Arguments:

  • src_path: local (requestor) path

  • dst_path: remote (provider) path

Returns:

None

run

| run(cmd: str, *args: Iterable[str], *, env: Optional[Dict[str, str]] = None)

Schedule running a command.

Arguments:

  • cmd: command to run on the provider, e.g. /my/dir/run.sh

  • args: command arguments, e.g. "input1.txt", "output1.txt"

  • env: optional dictionary with environmental variables

Returns:

None

download_file

| download_file(src_path: str, dst_path: str)

Schedule downloading remote file from the provider.

Arguments:

  • src_path: remote (provider) path

  • dst_path: local (requestor) path

Returns:

None

commit

| commit() -> Work

Creates sequence of commands to be send to provider.

Returns:

Work object (the latter contains sequence commands added before calling this method)

yapapi.storage

Storage models.

OutputStorageProvider Objects

class OutputStorageProvider(abc.ABC)

new_destination

| @abc.abstractmethod
| async new_destination(destination_file: Optional[PathLike] = None) -> Destination

Creates slot for receiving file.

Parameters

destination_file: Optional hint where received data should be placed.

yapapi.storage.webdav

yapapi.storage.gftp

Golem File Transfer Storage Provider

class PubLink(TypedDict)

GFTP linking information.

file

file on local filesystem.

url

GFTP url as which local files is exposed.

GftpDriver Objects

class GftpDriver(Protocol)

Golem FTP service API.

version

| async version() -> str

Gets driver version.

publish

| async publish(*, files: List[str]) -> List[PubLink]

Exposes local file as GFTP url.

files : local files to be exposed

close

| async close(*, urls: List[str]) -> CommandStatus

Stops exposing GFTP urls created by publish\(files=\[..\]\)

receive

| async receive(*, output_file: str) -> PubLink

Creates GFTP url for receiving file.

: output_file -

shutdown

| async shutdown() -> CommandStatus

Stops GFTP service.

After shutdown all generated urls will be unavailable.

yapapi._cli

yapapi._cli.run

yapapi._cli.market

Demand Objects

class Demand()

Offer subscription management.

list

| @async_run
| async list(only_expired: bool = False)

Lists all active demands

clear

| @async_run
| async clear(only_expired: bool = False)

Removes demands.

By default removes all demands.

Arguments:

  • only_expired: removes only expired demands.

yapapi._cli.payment

Allocation Objects

class Allocation()

Payment allocation management.

list

| @async_run
| async list(details: bool = False)

Lists current active payment allocation

clear

| @async_run
| async clear()

Removes all active payment allocations

Invoices Objects

class Invoices()

Invoice management.

accept

| @async_run
| async accept(allocation_id: str, invoice_id: str)

Accepts given invoice_id with allocation_id

Arguments:

  • allocation_id: Allocation from which invoice will be paid. see

    allocation list.

  • invoice_id: Invoice identifier.

list

| @async_run
| async list(by_status: Optional[str] = None)

Lists all invoices.

yapapi.__main__

yapapi.rest

Mid-level binding for Golem REST API.

Serves as a more convenient interface between the agent code and the REST API.

yapapi.rest.configuration

Configuration Objects

class Configuration(object)

REST API's setup and top-level access utility.

By default, it expects the yagna daemon to be available locally and listening on the default port. The urls for the specific APIs are then based on this default base URL.

It requires one external argument, namely Yagna's application key, which is used to authenticate with the daemon. The application key must be either specified explicitly using the app_key argument or provided by the YAGNA_APPKEY environment variable.

Other than that, the URLs of each specific REST API can be overridden using the following environment variables:

  • YAGNA_MARKET_URL

  • YAGNA_PAYMENT_URL

  • YAGNA_ACTIVITY_URL

app_key

| @property
| app_key() -> str

Yagna daemon's application key used to access the REST API.

market_url

| @property
| market_url() -> str

The URL of the Market REST API

payment_url

| @property
| payment_url() -> str

The URL of the Payment REST API

activity_url

| @property
| activity_url() -> str

The URL of the Activity REST API

market

| market() -> ya_market.ApiClient

Return a REST client for the Market API.

payment

| payment() -> ya_payment.ApiClient

Return a REST client for the Payment API.

activity

| activity() -> ya_activity.ApiClient

Return a REST client for the Activity API.

yapapi.rest.market

Agreement Objects

class Agreement(object)

Mid-level interface to the REST's Agreement model.

confirm

| async confirm() -> bool

Sign and send the agreement to the provider and then wait for it to be approved.

Returns:

True if the agreement has been confirmed, False otherwise

OfferProposal Objects

class OfferProposal(object)

Mid-level interface to handle the negotiation phase between the parties.

reject

| async reject(reason: Optional[str] = None)

Reject the Offer.

respond

| async respond(props: dict, constraints: str) -> str

Create an agreeement Proposal for a received Offer, based on our Demand.

agreement

| async agreement(timeout=timedelta(hours=1)) -> Agreement

Create an Agreement based on this Proposal.

Subscription Objects

class Subscription(object)

Mid-level interface to REST API's Subscription model.

details

| @property
| details() -> models.Demand

Returns:

the Demand for which the Subscription has been registered.

delete

| async delete()

Unsubscribe this Demand from the market.

events

| async events() -> AsyncIterator[OfferProposal]

Yield counter-proposals based on the incoming, matching Offers.

Market Objects

class Market(object)

Mid-level interface to the Market REST API.

subscribe

| subscribe(props: dict, constraints: str) -> AsyncResource[Subscription]

Create a subscription for a demand specified by the supplied properties and constraints.

subscriptions

| async subscriptions() -> AsyncIterator[Subscription]

Yield all the subscriptions that this requestor agent has on the market.

yapapi.rest.activity

ActivityService Objects

class ActivityService(object)

A convenience helper to facilitate the creation of an Activity.

new_activity

| async new_activity(agreement_id: str) -> "Activity"

Create an activity within bounds of the specified agreement.

Returns:

the object that represents the Activity and allows to query and control its state :rtype: Activity

Activity Objects

class Activity(AsyncContextManager["Activity"])

Mid-level wrapper for REST's Activity endpoint

state

| async state() -> yaa.ActivityState

Query the state of the activity.

send

| async send(script: List[dict])

Send the execution script to the provider's execution unit.

yapapi.rest.resource

yapapi.rest.payment

Allocation Objects

@dataclass
class Allocation(_Link)

Payment reservation for task processing.

id

Allocation object id

amount

Total amount allocated

expires

Allocation expiration timestamp

Payment Objects

class Payment(object)

new_allocation

| new_allocation(amount: Decimal, *, expires: Optional[datetime] = None, make_deposit: bool = False) -> ResourceCtx[Allocation]

Creates new allocation.

  • amount: Allocation amount.

  • expires: expiration timestamp. by default 30 minutes from now.

  • make_deposit: (unimplemented).

allocations

| async allocations() -> AsyncIterator[Allocation]

Lists all active allocations.

Example:

Listing all active allocations

from yapapi import rest

async def list_allocations(payment_api: rest.Payment): async for allocation in payment_api.allocations():

  • print(f'''allocation - {allocation.id}

    amount={allocation.amount},

    expires={allocation.expires}''')