Golem Python API.
get_version() -> str
Returns:
the version of the yapapi library package
windows_event_loop_fix()
Set up asyncio to use ProactorEventLoop implementation for new event loops on Windows.
This work-around is only needed for Python 3.6 and 3.7. With Python 3.8, ProactorEventLoop
is already the default on Windows.
Utilities for logging computation events via the standard logging
module.
Functions in this module fall into two categories:
Functions that convert computation events generated by the Executor.submit
method to calls to the standard Python logging
module, using the loggers in the "yapapi" namespace (e.g. logging.getLogger("yapapi.executor")
). These functions should be passed as event_consumer
arguments to Executor()
.
Functions that perform configuration of the logging
module itself. Since logging configuration is in general a responsibility of the code that uses yapapi
as a library, we only provide the enable_default_logger
function in this category, that enables logging to stderr with level logging.INFO
and, optionally, to a given file with level logging.DEBUG
.
Several functions from this module can be passed as event_consumer
callback to yapapi.Executor()
.
For detailed, human-readable output use the log_event
function:
Executor(..., event_consumer=yapapi.log.log_event)
For even more detailed, machine-readable output use log_event_repr
:
Executor(..., event_consumer=yapapi.log.log_event_repr)
For summarized, human-readable output use log_summary()
:
Executor(..., event_consumer=yapapi.log.log_summary())
Summary output can be combined with a detailed one by passing the detailed logger as an argument to log_summary
:
Executor(...event_consumer=yapapi.log.log_summary(yapapi.log.log_event_repr))
enable_default_logger(format_: str = "[%(asctime)s %(levelname)s %(name)s] %(message)s", log_file: Optional[str] = None, debug_activity_api: bool = False, debug_market_api: bool = False, debug_payment_api: bool = False)
Enable the default logger that logs messages to stderr with level INFO
.
If log_file
is specified, the logger with output messages with level DEBUG
to the given file.
log_event(event: events.Event) -> None
Log event
with a human-readable description.
log_event_repr(event: events.Event) -> None
Log the result of calling repr(event)
.
class SummaryLogger()
Aggregates information from computation events to provide a high-level summary.
The logger's log()
method can be used as event_consumer
callback to Executor()
. It will aggregate the events generated by Executor.submit()
and output some summary information.
The optional wrapped_emitter
argument can be used for chaining event emitters: each event logged with log()
is first passed to wrapped_emitter
.
For example, with the following setup, each event emitted by executor
will be logged by log_event_repr
, and additionally, certain events will cause summary messages to be logged.
detailed_logger = log_event_reprsummary_logger = SummaryLogger(wrapped_emitter=detailed_logger).logexecutor = Executor(..., event_consumer=summary_logger)
| __init__(wrapped_emitter: Optional[Callable[[events.Event], None]] = None)
Create a SummaryLogger.
| log(event: events.Event) -> None
Register an event.
log_summary(wrapped_emitter: Optional[Callable[[events.Event], None]] = None)
Output a summary of computation.
This is a utility function that creates a SummaryLogger
instance wrapping an optional wrapped_emitter
and returns its log
method.
See the documentation of SummaryLogger
for more information.
@dataclassclass NodeInfo(Model)
Properties describing the information regarding the node.
human-readable name of the Golem node
the name of the subnet within which the Demands and Offers are matched
@dataclass()class Activity(Model)
Activity-related Properties
Sets a Hard cap on total cost of the Activity (regardless of the usage vector or pricing function). The Provider is entitled to 'kill' an Activity which exceeds the capped cost amount indicated by Requestor.
Sets a Soft cap on total cost of the Activity (regardless of the usage vector or pricing function). When the cost_warning amount is reached for the Activity, the Provider is expected to send a Debit Note to the Requestor, indicating the current amount due
A timeout value for batch computation (eg. used for container-based batch processes). This property allows to set the timeout to be applied by the Provider when running a batch computation: the Requestor expects the Activity to take no longer than the specified timeout value - which implies that eg. the golem.usage.duration_sec counter shall not exceed the specified timeout value.
Whether client supports multi_activity (executing more than one activity per agreement).
class DemandBuilder()
Builds a dictionary of properties and constraints from high-level models.
The dictionary represents a Demand object, which is later matched by the new Golem's market implementation against Offers coming from providers to find those providers who can satisfy the requestor's demand.
example usage:
>>> import yapapi>>> from yapapi import properties as yp>>> from yapapi.props.builder import DemandBuilder>>> from datetime import datetime, timezone>>> builder = DemandBuilder()>>> builder.add(yp.NodeInfo(name="a node", subnet_tag="testnet"))>>> builder.add(yp.Activity(expiration=datetime.now(timezone.utc)))>>> builder.__repr__>>> print(builder){'properties':{'golem.node.id.name': 'a node','golem.node.debug.subnet': 'testnet','golem.srv.comp.expiration': 1601655628772},'constraints': []}
| @property| properties() -> dict
List of properties for this demand.
| @property| constraints() -> str
List of constraints for this demand.
| ensure(constraint: str)
Add a constraint to the demand definition.
| add(m: Model)
Add properties from the specified model to this demand definition.
| async subscribe(market: Market) -> Subscription
Create a Demand on the market and subscribe to Offers that will match that Demand.
Payment-related properties.
Infrastructural properties.
class InvalidPropertiesError(Exception)
Raised by Model.from_properties(cls, properties)
when given invalid properties
.
class Model(abc.ABC)
Base class from which all property models inherit.
Provides helper methods to load the property model data from a dictionary and to get a mapping of all the keys available in the given model.
| @classmethod| from_properties(cls: Type[ME], props: Props) -> ME
Initialize the model from a dictionary representation.
When provided with a dictionary of properties, it will find the matching keys within it and fill the model fields with the values from the dictionary.
It ignores non-matching keys - i.e. doesn't require filtering of the properties' dictionary before the model is fed with the data. Thus, several models can be initialized from the same dictionary and all models will only load their own data.
| @classmethod| keys(cls)
Returns:
a mapping between the model's field names and the property keys
example:
>>> import dataclasses>>> import typing>>> from yapapi.properties.base import Model>>> @dataclasses.dataclass... class NodeInfo(Model):... name: typing.Optional[str] = \... dataclasses.field(default=None, metadata={"key": "golem.node.id.name"})...>>> NodeInfo.keys().name'golem.node.id.name'
Storage models.
class OutputStorageProvider(abc.ABC)
| @abc.abstractmethod| async new_destination(destination_file: Optional[PathLike] = None) -> Destination
Creates slot for receiving file.
destination_file: Optional hint where received data should be placed.
Golem File Transfer Storage Provider
class PubLink(TypedDict)
GFTP linking information.
file on local filesystem.
GFTP url as which local files is exposed.
class GftpDriver(Protocol)
Golem FTP service API.
| async version() -> str
Gets driver version.
| async publish(*, files: List[str]) -> List[PubLink]
Exposes local file as GFTP url.
files
: local files to be exposed
| async close(*, urls: List[str]) -> CommandStatus
Stops exposing GFTP urls created by publish(files=[..]).
| async receive(*, output_file: str) -> PubLink
Creates GFTP url for receiving file.
: output_file
-
| async shutdown() -> CommandStatus
Stops GFTP service.
After shutdown all generated urls will be unavailable.
class Demand()
Offer subscription management.
| @async_run| async list(only_expired: bool = False)
Lists all active demands
| @async_run| async clear(only_expired: bool = False)
Removes demands.
By default removes all demands.
Arguments:
only_expired
: removes only expired demands.
class Allocation()
Payment allocation management.
| @async_run| async list(details: bool = False)
Lists current active payment allocation
| @async_run| async clear()
Removes all active payment allocations
class Invoices()
Invoice management.
| @async_run| async accept(allocation_id: str, invoice_id: str)
Accepts given invoice_id
with allocation_id
Arguments:
allocation_id
: Allocation from which invoice will be paid. see
allocation list
.
invoice_id
: Invoice identifier.
| @async_run| async list(by_status: Optional[str] = None)
Lists all invoices.
An implementation of the new Golem's task executor.
Shortest debit note acceptance timeout the requestor will accept.
Time to receive invoice from provider after tasks ended.
class NoPaymentAccountError(Exception)
The error raised if no payment account for the required driver/network is available.
Payment driver required for the account.
Network required for the account.
class Executor(AsyncContextManager)
Task executor.
Used to run tasks using the specified application package within providers' execution units.
| __init__(*, package: Package, max_workers: int = 5, timeout: timedelta = timedelta(minutes=5), budget: Union[float, Decimal], strategy: MarketStrategy = LeastExpensiveLinearPayuMS(), subnet_tag: Optional[str] = None, driver: Optional[str] = None, network: Optional[str] = None, event_consumer: Optional[Callable[[Event], None]] = None)
Create a new executor.
Arguments:
package
: a package common for all tasks; vm.repo() function may be
used to return package from a repository
max_workers
: maximum number of workers doing the computation
timeout
: timeout for the whole computation
budget
: maximum budget for payments
strategy
: market strategy used to select providers from the market
(e.g. LeastExpensiveLinearPayuMS or DummyMS)
subnet_tag
: use only providers in the subnet with the subnet_tag name
driver
: name of the payment driver to use or None
to use the default driver;
only payment platforms with the specified driver will be used
network
: name of the network to use or None
to use the default network;
only payment platforms with the specified network will be used
event_consumer
: a callable that processes events related to the
computation; by default it is a function that logs all events
| async submit(worker: Callable[[WorkContext, AsyncIterator[Task[D, R]]], AsyncGenerator[Work, None]], data: Iterable[Task[D, R]]) -> AsyncIterator[Task[D, R]]
Submit a computation to be executed on providers.
Arguments:
worker
: a callable that takes a WorkContext object and a list o tasks,
adds commands to the context object and yields committed commands
data
: an iterator of Task objects to be computed on providers
Returns:
yields computation progress events
YAPAPI internal module. This is not a part of the public API. It can change at any time.
class SmartQueue(Generic[Item], object)
| has_new_items() -> bool
Check whether this queue has any items that were not retrieved by any consumer yet.
| has_unassigned_items() -> bool
Check whether this queue has any unassigned items.
An item is unassigned if it's new (hasn't been retrieved yet by any consumer) or it has been rescheduled and is not in progress.
A queue has unassigned items iff get()
will immediately return some item, without waiting for an item that is currently "in progress" to be rescheduled.
class Task(Generic[TaskData, TaskResult])
One computation unit.
Represents one computation unit that will be run on the provider (e.g. rendering of one frame of an animation).
| __init__(data: TaskData)
Create a new Task object.
Arguments:
data
: contains information needed to prepare command list for the provider
| @property| running_time() -> Optional[timedelta]
Return the running time of the task (if in progress) or time it took to complete it.
| accept_result(result: Optional[TaskResult] = None) -> None
Accept the result of this task.
Must be called when the result is correct to mark this task as completed.
Arguments:
result
: task computation result (optional)
Returns:
None
| reject_result(reason: Optional[str] = None, retry: bool = False) -> None
Reject the result of this task.
Must be called when the result is not correct to indicate that the task should be retried.
Arguments:
reason
: task rejection description (optional)
Returns:
None
@dataclassclass BufferedAgreement()
Confirmed agreement with additional local metadata
class AgreementsPool()
Manages proposals and agreements pool
| async cycle()
Performs cyclic tasks.
Should be called regularly.
| async add_proposal(score: float, proposal: OfferProposal) -> None
Adds providers' proposal to the pool of available proposals
| async use_agreement(cbk)
Gets an agreement and performs cbk() on it
| async release_agreement(agreement_id: str) -> None
Marks agreement as ready for reuse
| async terminate(reason: dict) -> None
Terminates all agreements
| async on_agreement_terminated(agr_id: str, reason: dict) -> None
Reacts to agreement termination event
Should be called when AgreementTerminated event is received.
Representing events in Golem computation.
@dataclass(init=False)class Event()
An abstract base class for types of events emitted by Executor.submit()
.
| extract_exc_info() -> Tuple[Optional[ExcInfo], "Event"]
Extract exception information from this event.
Return the extracted exception information and a copy of the event without the exception information.
@dataclass(init=False)class HasExcInfo(Event)
A base class for types of events that carry an optional exception info.
| extract_exc_info() -> Tuple[Optional[ExcInfo], "Event"]
Return the exc_info
field and a copy of this event with the field set to None
.
@dataclassclass ComputationFinished(HasExcInfo)
Indicates successful completion if exc_info
is None
and a failure otherwise.
@dataclassclass WorkerFinished(HasExcInfo, AgreementEvent)
Indicates successful completion if exc_info
is None
and a failure otherwise.
@dataclassclass ShutdownFinished(HasExcInfo)
Indicates the completion of Executor shutdown sequence
Implementation of strategies for choosing offers from market.
class MarketStrategy(abc.ABC)
Abstract market strategy.
| async decorate_demand(demand: DemandBuilder) -> None
Optionally add relevant constraints to a Demand.
| async score_offer(offer: rest.market.OfferProposal) -> float
Score offer
. Better offers should get higher scores.
@dataclassclass DummyMS(MarketStrategy, object)
A default market strategy implementation.
Its score_offer()
method returns SCORE_NEUTRAL
for every offer with prices that do not exceed maximum prices specified for each counter. For other offers, returns SCORE_REJECTED
.
| async decorate_demand(demand: DemandBuilder) -> None
Ensure that the offer uses PriceModel.LINEAR
price model.
| async score_offer(offer: rest.market.OfferProposal) -> float
Score offer
. Returns either SCORE_REJECTED
or SCORE_NEUTRAL
.
@dataclassclass LeastExpensiveLinearPayuMS(MarketStrategy, object)
A strategy that scores offers according to cost for given computation time.
| async decorate_demand(demand: DemandBuilder) -> None
Ensure that the offer uses PriceModel.LINEAR
price model.
| async score_offer(offer: rest.market.OfferProposal) -> float
Score offer
according to cost for expected computation time.
Utility functions and classes used within the yapapi.executor
package.
class AsyncWrapper()
Wraps a given callable to provide asynchronous calls.
Example usage:
with AsyncWrapper(func) as wrapper: wrapper.async_call("Hello", world=True) wrapper.async_call("Bye!")
The above code will make two asynchronous calls to func
. The results of the calls, if any, are discarded, so this class is most useful for wrapping callables that return None
.
| async stop() -> None
Stop the wrapper, process queued calls but do not accept any new ones.
| async_call(*args, **kwargs) -> None
Schedule an asynchronous call to the wrapped callable.
class Work(abc.ABC)
| async prepare()
A hook to be executed on requestor's end before the script is sent to the provider.
| register(commands: CommandContainer)
A hook which adds the required command to the exescript.
| async post()
A hook to be executed on requestor's end after the script has finished.
| @property| timeout() -> Optional[timedelta]
Return the optional timeout set for execution of this work.
class _Steps(Work)
| @property| timeout() -> Optional[timedelta]
Return the optional timeout set for execution of all steps.
| async prepare()
Execute the prepare
hook for all the defined steps.
| register(commands: CommandContainer)
Execute the register
hook for all the defined steps.
| async post()
Execute the post
step for all the defined steps.
class WorkContext()
An object used to schedule commands to be sent to provider.
Unique identifier for this work context.
| @property| provider_name() -> Optional[str]
Return the name of the provider associated with this work context.
| send_json(json_path: str, data: dict)
Schedule sending JSON data to the provider.
Arguments:
json_path
: remote (provider) path
data
: dictionary representing JSON data
Returns:
None
| send_file(src_path: str, dst_path: str)
Schedule sending file to the provider.
Arguments:
src_path
: local (requestor) path
dst_path
: remote (provider) path
Returns:
None
| run(cmd: str, *args: Iterable[str], *, env: Optional[Dict[str, str]] = None)
Schedule running a command.
Arguments:
cmd
: command to run on the provider, e.g. /my/dir/run.sh
args
: command arguments, e.g. "input1.txt", "output1.txt"
env
: optional dictionary with environmental variables
Returns:
None
| download_file(src_path: str, dst_path: str)
Schedule downloading remote file from the provider.
Arguments:
src_path
: remote (provider) path
dst_path
: local (requestor) path
Returns:
None
| commit(timeout: Optional[timedelta] = None) -> Work
Creates sequence of commands to be sent to provider.
Returns:
Work object (the latter contains sequence commands added before calling this method)
Mid-level binding for Golem REST API.
Serves as a more convenient interface between the agent code and the REST API.
class Configuration(object)
REST API's setup and top-level access utility.
By default, it expects the yagna daemon to be available locally and listening on the default port. The urls for the specific APIs are then based on this default base URL.
It requires one external argument, namely Yagna's application key, which is used to authenticate with the daemon. The application key must be either specified explicitly using the app_key
argument or provided by the YAGNA_APPKEY
environment variable.
If YAGNA_API_URL
environment variable exists, it will be used as a base URL for all REST API URLs. Example value: http://127.0.10.10:7500 (no trailing slash).
Other than that, the URLs of each specific REST API can be overridden using the following environment variables:
YAGNA_MARKET_URL
YAGNA_PAYMENT_URL
YAGNA_ACTIVITY_URL
| @property| app_key() -> str
Yagna daemon's application key used to access the REST API.
| @property| market_url() -> str
The URL of the Market REST API
| @property| payment_url() -> str
The URL of the Payment REST API
| @property| activity_url() -> str
The URL of the Activity REST API
| market() -> ya_market.ApiClient
Return a REST client for the Market API.
| payment() -> ya_payment.ApiClient
Return a REST client for the Payment API.
| activity() -> ya_activity.ApiClient
Return a REST client for the Activity API.
class AgreementDetails(object)
@dataclassclass View()
A certain fragment of an agreement's properties.
| extract(m: Type[_ModelType]) -> _ModelType
Extract properties for the given model from this view's properties.
| @property| provider_view() -> View
Get the view of provider's properties in this Agreement.
| @property| requestor_view() -> View
Get the view of requestor's properties in this Agreement.
class Agreement(object)
Mid-level interface to the REST's Agreement model.
| async confirm() -> bool
Sign and send the agreement to the provider and then wait for it to be approved.
Returns:
True if the agreement has been confirmed, False otherwise
class OfferProposal(object)
Mid-level interface to handle the negotiation phase between the parties.
| async reject(reason: Optional[str] = None)
Reject the Offer.
| async respond(props: dict, constraints: str) -> str
Create an agreeement Proposal for a received Offer, based on our Demand.
| async create_agreement(timeout=timedelta(hours=1)) -> Agreement
Create an Agreement based on this Proposal.
class Subscription(object)
Mid-level interface to REST API's Subscription model.
| @property| details() -> models.Demand
Returns:
the Demand for which the Subscription has been registered.
| async delete()
Unsubscribe this Demand from the market.
| async events() -> AsyncIterator[OfferProposal]
Yield counter-proposals based on the incoming, matching Offers.
class Market(object)
Mid-level interface to the Market REST API.
| subscribe(props: dict, constraints: str) -> AsyncResource[Subscription]
Create a subscription for a demand specified by the supplied properties and constraints.
| async subscriptions() -> AsyncIterator[Subscription]
Yield all the subscriptions that this requestor agent has on the market.
class ActivityService(object)
A convenience helper to facilitate the creation of an Activity.
| async new_activity(agreement_id: str) -> "Activity"
Create an activity within bounds of the specified agreement.
Returns:
the object that represents the Activity and allows to query and control its state :rtype: Activity
class Activity(AsyncContextManager["Activity"])
Mid-level wrapper for REST's Activity endpoint
| async state() -> yaa.ActivityState
Query the state of the activity.
| async send(script: List[dict], stream: bool = False, deadline: Optional[datetime] = None)
Send the execution script to the provider's execution unit.
| async __aexit__(exc_type, exc_val, exc_tb) -> None
Call DestroyActivity API operation.
class CommandExecutionError(Exception)
An exception that indicates that a command failed on a provider.
The command that failed.
The command's output, if any.
class BatchTimeoutError(Exception)
An exception that indicates that an execution of a batch of commands timed out.
class Batch(abc.ABC, AsyncIterable[events.CommandEventContext])
Abstract base class for iterating over events related to a batch running on provider.
| seconds_left() -> float
Return how many seconds are left until the deadline.
| @property| id()
Return the ID of this batch.
class PollingBatch(Batch)
A Batch
implementation that polls the server repeatedly for command status.
class StreamingBatch(Batch)
A Batch
implementation that uses event streaming to return command status.
@dataclassclass Allocation(_Link)
Payment reservation for task processing.
Allocation object id
Total amount allocated
Payment platform, e.g. zksync-rinkeby-tglm
Payment address, e.g. 0x123...
Allocation expiration timestamp
class Payment(object)
| new_allocation(amount: Decimal, payment_platform: str, payment_address: str, *, expires: Optional[datetime] = None, make_deposit: bool = False) -> ResourceCtx[Allocation]
Creates new allocation.
amount
: Allocation amount.
expires
: expiration timestamp. by default 30 minutes from now.
make_deposit
: (unimplemented).
| async allocations() -> AsyncIterator[Allocation]
Lists all active allocations.
Example:
Listing all active allocations
from yapapi import rest
async def list_allocations(payment_api: rest.Payment): async for allocation in payment_api.allocations():
print(f'''allocation
- {allocation.id}
amount={allocation.amount},
expires={allocation.expires}''')
class PackageException(Exception)
Exception raised on any problems related to the package repository.
class Package(abc.ABC)
Information on task package to be used for running tasks on providers.
| @abc.abstractmethod| async resolve_url() -> str
Return package URL.
| @abc.abstractmethod| async decorate_demand(demand: DemandBuilder)
Add package information to a Demand.
async repo(*, image_hash: str, min_mem_gib: float = 0.5, min_storage_gib: float = 2.0) -> Package
Build reference to application package.
image_hash: finds package by its contents hash.
min_mem_gib: minimal memory required to execute application code.
min_storage_gib minimal disk storage to execute tasks.
resolve_repo_srv(repo_srv, fallback_url=_FALLBACK_REPO_URL) -> str
Get the url of the package repository based on its SRV record address.
Arguments:
repo_srv
: the SRV domain name
fallback_url
: temporary hardcoded fallback url in case there's a problem resolving SRV
Returns:
the url of the package repository containing the port :raises: PackageException if no valid service could be reached
Utilities for yapapi example scripts.