3.4. Storage interface
Up: 3. Developer guide
Prev: 3.3. Database
Next: 3.5. User interface
Sections:
- Introduction
- How do we read from storage?
- How do we write to storage?
- How do we add new storage functionality?
- How do we use outcomes?
- What about audit logging?
- How is the filesystem organized?
- How should the filesystem be backed up?
- Outcome design patterns
Introduction
All database writes, and some reads, in ATR go through the storage interface. This interface enforces permissions, centralizes audit logging, and provides type-safe access to the database. In other words, avoid calling db directly in route handlers if possible.
The storage interface recognizes several permission levels: general public (unauthenticated visitors), foundation committer (any ASF account), committee participant (committers and PMC members), committee member (PMC members only), and foundation admin (infrastructure administrators). Each level inherits from the previous one, so for example committee members can do everything committee participants can do, plus additional operations.
The storage interface does not make it impossible to bypass authorization, because you can always import db directly and write to the database. But it makes bypassing authorization an explicit choice that requires deliberate action, and it makes the safer path the easier path. This is a pragmatic approach to security: we cannot prevent all mistakes, but we can make it harder to make them accidentally.
How do we read from storage?
Reading from storage is a work in progress. There are some existing methods, but most of the functionality is currently in db or db.interaction, and much work is required to migrate this to the storage interface. We have given this less priority because reads are generally safe, with the exception of a few components such as user tokens, which should be given greater migration priority.
How do we write to storage?
To write to storage we open a write session, request specific permissions, use the exposed functionality, and then handle the outcome. Here is an actual example from post/start.py:
async with storage.write(session) as write:
wacp = await write.as_project_committee_participant(project_name)
new_release, _project = await wacp.release.start(project_name, version)
The wacp object, short for write as committee participant, provides access to domain-specific writers: announce, checks, distributions, keys, policy, project, release, sbom, ssh, tokens, and vote.
The write session takes an optional Committer or ASF UID, typically session.uid from the logged-in user. If you omit the UID, the session determines it automatically from the current request context. The write object checks LDAP memberships and raises storage.AccessError if the user is not authorized for the requested permission level.
Because projects belong to committees, we provide write.as_project_committee_member(project_name) and write.as_project_committee_participant(project_name), which look up the project's committee and authenticate the user as a member or participant of that committee. This is convenient when, for example, the URL provides a project name.
Here is a more complete example from api/__init__.py that shows the classic three step pattern:
async with storage.write(asf_uid) as write:
# 1. Request permissions
wafc = write.as_foundation_committer()
# 2. Use the exposed functionality
outcome = await wafc.keys.ensure_stored_one(data.key)
# 3. Handle the outcome
key = outcome.result_or_raise()
In this case we decide to raise as soon as there is any error. We could also choose to display a warning, ignore the error, collect multiple outcomes for batch processing, or handle it in any other way appropriate for the situation.
How do we add new storage functionality?
Add methods to classes in the storage/writers or storage/readers directories. Code to perform any action associated with public keys that involves writing to storage, for example, goes in storage/writers/keys.py.
Classes in writer and reader modules must be named to match the permission hierarchy:
class GeneralPublic:
def __init__(
self,
write: storage.Write,
write_as: storage.WriteAsGeneralPublic,
data: db.Session,
) -> None:
self.__write = write
self.__write_as = write_as
self.__data = data
class FoundationCommitter(GeneralPublic):
def __init__(
self,
write: storage.Write,
write_as: storage.WriteAsFoundationCommitter,
data: db.Session
) -> None:
super().__init__(write, write_as, data)
self.__write = write
self.__write_as = write_as
self.__data = data
class CommitteeParticipant(FoundationCommitter):
def __init__(
self,
write: storage.Write,
write_as: storage.WriteAsCommitteeParticipant,
data: db.Session,
committee_name: str,
) -> None:
super().__init__(write, write_as, data)
self.__committee_name = committee_name
class CommitteeMember(CommitteeParticipant):
...
This hierarchy that this creates is: GeneralPublic → FoundationCommitter → CommitteeParticipant → CommitteeMember. You can add methods at any level. A method on CommitteeMember is only available to committee members, while a method on FoundationCommitter is available to everyone who has logged in.
Use __private_methods for helper code that is not part of the public interface. Use public_methods for operations that should be available to callers at the appropriate permission level. Consider returning Outcome types to allow callers flexibility in error handling. Refer to the section on using outcomes for more details.
After adding a new writer module, register it in the appropriate WriteAs* classes in storage/__init__.py. For example, when adding the distributions writer, it was necessary to add self.distributions = writers.distributions.CommitteeMember(write, self, data, committee_name) to the WriteAsCommitteeMember class.
How do we use outcomes?
Consider using outcome types from storage.outcome when returning results from writer methods. Outcomes let you represent both success and failure without raising exceptions, which gives callers flexibility in how they handle errors.
An Outcome[T] is either a Result[T] wrapping a successful value, or an Error[T] wrapping an exception. You can check which it is with the ok property or pattern matching, extract the value with result_or_raise(), or extract the error with error_or_raise().
Here is an example from post/keys.py that processes multiple keys and collects outcomes:
async with storage.write() as write:
wacm = write.as_committee_member(selected_committee)
outcomes = await wacm.keys.ensure_associated(keys_text)
success_count = outcomes.result_count
error_count = outcomes.error_count
The ensure_associated method returns an outcome.List, which is a collection of outcomes. Some keys might import successfully, and others might fail because they are malformed or already exist. The caller can inspect the list to see how many succeeded and how many failed, and present that information to the user.
The outcome.List class provides many useful methods: results() to get only the successful values, errors() to get only the exceptions, result_count and error_count to count them, and results_or_raise() to extract all values or raise on the first error.
Use outcomes when an operation might fail for some items but succeed for others, or when you want to give the caller control over error handling. Do not use them when failure should always raise an exception, such as authorization failures or database connection errors. Those should be raised immediately.
What about audit logging?
Storage write operations can be logged to config.AppConfig.STORAGE_AUDIT_LOG_FILE, which is state/storage-audit.log by default. Each log entry is a JSON object containing the timestamp, the action name, and relevant parameters. When you write a storage method that should be audited, call self.__write_as.append_to_audit_log(**kwargs) with whatever parameters are relevant to that specific operation. The action name is extracted automatically from the call stack using log.caller_name(), so if the method is called i_am_a_teapot, the audit log will show i_am_a_teapot without you having to pass the name explicitly.
Audit logging must be done manually because the values to log are often those computed during method execution, not just those passed as arguments which could be logged automatically. When deleting a release, for example, we log asf_uid (instance attribute), project_name (argument), and version (argument), but when issuing a JWT from a PAT, we log asf_uid (instance attribute) and pat_hash (computed). Each operation logs what makes sense for that operation.
How is the filesystem organized?
The storage interface writes to the database and the filesystem. There is one shared state directory for all of ATR, configured by the STATE_DIR parameter in [atr/config.py]. By default this is $PROJECT_ROOT/state, where PROJECT_ROOT is another ATR configuration parameter.
Only a small number of subdirectories of the state directory are written to by the storage interface, and many of these locations are also configurable. These directories, and their configuration variables, are:
attestable, configured byATTESTABLE_STORAGE_DIRdownloads, configured byDOWNLOADS_STORAGE_DIRfinished, configured byFINISHED_STORAGE_DIRsubversion, configured bySVN_STORAGE_DIRtemporary, which is unconfigurableunfinished, configured byUNFINISHED_STORAGE_DIR
And the purposes of these directories is as follows. Note that "immutable" here means that existing files cannot be modified, but does not preclude new files from being added.
attestable[immutable] holds JSON files of data that ATR has automatically verified and which must now be held immutably. (We could store this data in the database, but the aim is to eventually write attestation files here, so this prepares for that approach.)downloads[mutable] are hard links to released artifacts in thefinisheddirectory. Thefinisheddirectory contains the files exactly as they were arranged by the release managers upon announcing the release, separated strictly into one directory per release. Thedownloadsfolder, on the other hand, has no restrictions on its organisation and can be rearranged.finished[immutable, except for moving to external archive] contains, as mentioned above, all of the files of a release as they were when announced. This therefore constitutes an historical record and allows us to rewrite the hard links in thedownloadsdirectory without having to consider not accidentally deleting files by removing all references, etc.subversion[mutable] is designed to mirror two subdirectories,devandrelease, ofhttps://dist.apache.org/repos/dist. This is currently unused.temporary[mutable] holds temporary files during operations where the data cannot be modified in place. One important example is when creating a staging directory of a new revision. A subdirectory with a random name is made in this directory, and then the files in the prior version are hard linked into it. The modifications take place in this staging area before the directory is finalised and moved tounfinished.unfinished[immutable, except for moving tofinished] contains all of the files in a release before it is announced. In other words, when the release managers compose a release, when the committee votes on the release, and when the release has been voted on but not yet announced, the files for that release are in this directory.
This list does not include any configuration files, logs, or log directories.
How should the filesystem be backed up?
Only the attestable, downloads, finished, and unfinished directories need to be backed up. The subversion directory is unused, and the temporary directory is for temporary staging.
The structure of the directories that need backing up is as follows. An ellipsis, ..., means any number of further files or subdirectories containing subdirectories or files recursively.
attestable/PROJECT/VERSION/REVISION.jsondownloads/COMMITTEE/PATH/...finished/PROJECT/VERSION/...unfinished/PROJECT/VERSION/REVISION/
Because of the versioning scheme used for the attestable, finished, and unfinished directories, these can be incrementally updated by simple copying without deletion. The downloads directory, however, must be snapshotted as its organization is random.
This list does not include any configuration files, logs, or log directories. All configuration files and the audit logs, at a minimum, should also be backed up.
Outcome design patterns
One common pattern when designing outcome types is about how to handle an exception after a success, and how to handle a warning during success:
- An exception after a success is when an object is processed in multiple stages, and the first few stages succeed but then subsequently there is an exception.
- A warning during success is when an object is processed in multiple stages, an exception is raised, but we determine that we can proceed to subsequent stages as long as we keep a note of the exception.
Both of these workflows appear incompatible with outcomes. In outcomes, we can record either a successful result, or an exception. But in exception after success we want to record the successes up to the exception; and in a warning during a success we want to record the exception even though we return a success result.
The solution is similar in both cases: create a wrapper of the primary type which can hold an instance of the secondary type.
In exception after a success the primary type is an exception, and the secondary type is the result which was obtained up to that exception. The type will look like this:
class AfterSuccessError(Exception):
def __init__(self, result_before_error: Result):
self.result_before_error = result_before_error
In warning during success, the primary type is the result, and the secondary type is the exception raised during successful processing which we consider a warning. This is the inverse of the above, and the types are therefore inverted too.
@dataclasses.dataclass
class Result:
value: Value
warning: Exception | None
This could just as easily be a Pydantic class or whatever is appropriate in the situation, as long as it can hold the warning. If the warning is generated during an additional or side task, we can use Outcome[SideValue] instead. We do this, for example, in the type representing a linked committee:
@dataclasses.dataclass
class LinkedCommittee:
name: str
autogenerated_keys_file: Outcome[str]
In this case, if the autogenerated keys file call succeeded without an error, the Outcome will be an OutcomeResult[str] where the str represents the full path to the autogenerated file.