Essential API#

The functions and classes that are necessary to create a solution and run our environment appropriately are contained on this page. While we do include the API in its entirety, this goal of this page is to assist anyone that needs to know the bare minimum API to run their solutions

AirliftEnv#

The Airlift Environment is the main interface between the actions given to agents by your solution and the appropriate events occurring based on that input.

Step#

airlift.envs.airlift_env.AirliftEnv.step(self, actions)#

Steps the environment base don the given actions and returns a new observation. Based on the new observation, the policy should create another set of actions for the next step.

Parameters:

actions – Dictionary that contains the actions for all agents

Returns:

obs, Dictionary that contains the observation for all agents. rewards,Dictionary that contains rewards. dones, Dictionary that indicates if an agent has completed a scenario. info, A dictionary containing a list of Warnings for each agent.

Reset#

airlift.envs.airlift_env.AirliftEnv.reset(self, seed=None) Dict#

Resets the environment and generates a new random realization. If called without a seed, a new realization is generated.

Parameters:

seed – Environment seed

Returns:

A dictionary containing the initial observations for all agents. Individual observations of agents can be accessed using the ‘a_0…n’ keys.

State#

airlift.envs.airlift_env.AirliftEnv.state(self)#

Returns the complete state of the environment.

Observe#

airlift.envs.airlift_env.AirliftEnv.observe(self, agent: Optional[str] = None)#

Action Space#

airlift.envs.airlift_env.AirliftEnv.action_space(self, agent: str) Space#

Takes in agent and returns the action space for that agent.

MUST return the same value for the same agent name

Default implementation is to return the action_spaces dict

Example#

The following codeblock is an example of an environment being initialized with some parameters. The reset function is called with a seed, actions being created from an observation, as well as the environment being stepped to the next time-step and a new observation returned:

env = AirliftEnv(initialization_parameters)
_done = False
obs = env.reset(seed=379)
solution.reset(obs, 462)
while not _done:
    actions = solution.policies(env.observe(), env.dones)
    obs, rewards, dones, _ = env.step(actions)
    _done = all(dones.values())
    env.render()

Solutions#

Utilizing the solution class is a requirement for your submission. As shown in the example below with the LegalRandomAgent, your solution should also be an extension of the Solution class.

Reset#

airlift.solutions.solutions.Solution.reset(self, obs, observation_spaces=None, action_spaces=None, seed=None)#

Resets the solution with a seed. initializes the state space, observation space and action space.

Parameters:
  • obs – A dictionary containing the observation

  • observation_spaces – A dictionary containing all the observation spaces

  • action_spaces – A dictionary containing all the action spaces.

  • seed – Environment seed.

Policies#

airlift.solutions.solutions.Solution.policies(self, obs, dones, infos=None)#

The main policy for returning actions for each agent is contained here.

Parameters:
  • obs – A dictionary containing an observation

  • dones – A dictionary containing the done values for each agent

Returns:

Actions for each agent.

Name#

airlift.solutions.solutions.Solution.name()#

A property that contains the name of the solution.

Get State#

airlift.solutions.solutions.Solution.get_state(obs)#

Gets the state for each agent. Assumes that the state space for each agent is the same

Example#

This example is taken from our random legal agent. We create a new class that extends the Solution class and use the observation to generate a policy for the agents:

class LegalRandomAgent(Solution):
    def __init__(self):
        super().__init__()

    def reset(self, obs, observation_spaces=None, action_spaces=None, seed=None):
        super().reset(obs, observation_spaces, action_spaces, seed)
        self._action_helper = ActionHelper(np_random=self._np_random)

    def policies(self, obs, dones):
        return self._action_helper.sample_legal_actions(observation=obs)

Do Episode#

The solutions module also contains a doepisode function that can assist in creating the environment and running the defined solution.

airlift.solutions.solutions.doepisode(env, solution, render=False, env_seed=None, solution_seed=None, render_sleep_time=0.1, capture_metrics=False, render_mode='human')#

Runs a single episode.

Parameters:
  • env – AirLiftEnv - An initialized Airlift Environment

  • solution – Solution - the solution that is being utilized

  • render – Render options, (video, window, none…)

  • env_seed – int, environment seed,

  • solution_seed – int, solution seed

  • render_sleep_time – float, sleep timer

Returns:

env.env_info: a NamedTuple that contains all the environment initialization parameters env.metrics: a NamedTuple that contains all the environment metrics collected for the solution.

Agents#

The Agents class contains all the information pertaining to the individual agents. For example weight capacity, speed, their current states and as well as the loading and unloading of cargo. Just like the environment– the agent also has its own step function. This step function is separate from the environment step function and assists the agent in transitioning appropriately between states.

Plane States#

Airplanes can be in one of four states. These are all contained in the PlaneState enumeration located in agents.py.

airlift.envs.agents.PlaneState(value, names=None, *, module=None, qualname=None, type=None, start=1)#

Enumeration that defines the states an agent can be in

  • 0:

    Waiting - airplane is waiting to process at an airport

  • 1:

    Processing - airplane is refueling and loading/unloading cargo

  • 2:

    Moving - Airplane is in flight to its destination

  • 3:

    Ready for takeoff - airplane is ready for takeoff once it is given a destination

Also see: airplane state machine

Load Cargo#

airlift.envs.agents.EnvAgent.load_cargo(self, cargo_to_load: Collection[Cargo], elapsed_steps, warnings: List[str])#

Checks to make sure airplane can load cargo and loads the cargo. Also checks to ensure that cargo is assigned to that airplane

Parameters:
  • cargo_to_load – A list that contains the Cargo to load

  • warnings – List of warnings issued by the environment. Ex: If an action is given to an unavailable route

Unload Cargo#

airlift.envs.agents.EnvAgent.unload_cargo(self, cargo_to_unload: Collection[Cargo], warnings: List[str])#

Does a check to ensure the correct cargo is being unloaded and removes the cargo from the airplane.

Parameters:
  • cargo_to_unload – A list that contains the Cargo to unload

  • warnings – List of warnings issued by the environment. Ex: If an action is given to an unavailable route

Current Cargo Weight#

airlift.envs.agents.EnvAgent.current_cargo_weight()#

Gets the total current cargo weight on an airplane

Returns:

current cargo weight on an airplane

Action Helper#

This class will assist you in making actions in the environment.

Load Action#

airlift.envs.airlift_env.ActionHelper.load_action(cargo_to_load) dict#

Loads Cargo by ID

Parameters:

cargo_to_load – int,cargo ID to load

Returns:

A dictionary containing all the actions the agent will take that includes the cargo to load.

Unload Action#

airlift.envs.airlift_env.ActionHelper.unload_action(cargo_to_unload) dict#

Unloads Cargo by ID

Parameters:

cargo_to_unload – int, cargo ID to unload

Returns:

A dictionary containing all the actions the agent will take that includes the cargo to unload.

Process Action#

airlift.envs.airlift_env.ActionHelper.process_action(cargo_to_load=[], cargo_to_unload=[]) dict#

Processes an action

Parameters:
  • cargo_to_load – A list containing the cargo IDs to load

  • cargo_to_unload – A list containing cargo IDs to unload

Returns:

A dictionary that contains all the cargo to load and unload.

Takeoff Action#

airlift.envs.airlift_env.ActionHelper.takeoff_action(destination: int) dict#

Gives the next destination

Parameters:

destination – int, destination ID airport

Returns:

A dictionary containing the destination for the airplane to take off to.

Noop Action#

airlift.envs.airlift_env.ActionHelper.noop_action() dict#

No-Op Action, “Do nothing” (except refuel based on priority)

Returns:

A dictionary where the values contained will make the airplane take no action.

Is Noop Action#

airlift.envs.airlift_env.ActionHelper.is_noop_action(action) bool#

Checks if an action is a No-Op action.

Parameters:

action – dictionary, contains actions of a single agent

Returns:

Boolean, True if No-Op action.

Example#

Using the ActionHelper we can issue actions to the agents:

from airlift.envs.airlift_env import ActionHelper as ah

# We create a function that does one single agent action
def do_single_agent_action(env, agent, action, interimstate, donestate, render):
    env.step({agent: action})
    while env.observe(agent)["state"] != donestate and not env.dones[agent]:
        env.step({agent: None})

# Load Cargo #0
do_single_agent_action(env, agent, ah.load_action(0), PlaneState.PROCESSING, PlaneState.READY_FOR_TAKEOFF)

# Unload Cargo #0
do_single_agent_action(env, agent, ah.unload_action(0), PlaneState.PROCESSING, PlaneState.READY_FOR_TAKEOFF)

Observation Helper#

This is a helper class to assist with utilizing the state and observation. Includes helper functions such as getting available destinations or the shortest path route.

Airplane Idle#

Available Destinations#

airlift.envs.airlift_env.ObservationHelper.available_destinations(state, airplane_obs, plane_type: int)#

Returns available destination from an airport node.

Parameters:
  • state – Airplane current state

  • airplane_obs – Airplane Observation

  • plane_type – Airplane Model by PlaneTypeID

Returns:

Returns a list of all available destinations that the agent can travel to from its current node.

Get Lowest Cost Path#

airlift.envs.airlift_env.ObservationHelper.get_lowest_cost_path(state, airport1, airport2, plane_type: int)#

Gets the shortest path from airport1 to airport2 based on the plane model.

Parameters:
  • state – Airplane current state

  • airport1 – From Airport

  • airport2 – To Airport

  • plane_type – airplane model by PlaneTypeID

Returns:

A list containing the shortest path from airport1 to airport2.

Get Active Cargo Info#

airlift.envs.airlift_env.ObservationHelper.get_active_cargo_info(state, cargoid)#

Gets current active cargo info. Active cargo is cargo that has not been assigned & delivered.

Parameters:
  • state – Airplane current state

  • cargoid – Cargo by ID

Returns:

A list containing all the currently active cargo.

Get MultiGraph#

airlift.envs.airlift_env.ObservationHelper.get_multidigraph(state) <networkx.classes.multidigraph.MultiDiGraph object at 0x0000021FFD0232B0>#

Gets the routemap as a multigraph.

Parameters:

state – Current Airplane state

Returns:

A route map as a MultiDiGraph

Example#

Using the ObservationHelper we can get important information that assists us in making the proper agent actions. An example of using the observation helper to check if an agent is idle and also has available routes. After checking for this we then use the “get_active_cargo_info” function to get active cargo information for that agent:

# Import the ObservationHelper
from airlift.envs.airlift_env import ObservationHelper as oh

obs = env.observe(a)

if oh.is_airplane_idle(obs) and oh.available_destinations(state, obs, obs["plane_type"]: # Sometimes available destinations is an empty list.
    available_destinations =  oh.available_destinations(state, env.observe(a), obs["plane_type"]]
    actions[a] = {"process": 1,
                  "cargo_to_load": set(),
                  "cargo_to_unload": set(),
                  "destination": numpy.random.choice(available_destinations)}

# Get info about a cargo that needs to be picked up
ca = oh.get_active_cargo_info(state, cargo_to_pick_up)