import numpy as np # physical/external base state of all entites class EntityState(object): def __init__(self): # physical position self.p_pos = None # physical velocity self.p_vel = None # state of agents (including communication and internal/mental state) class AgentState(EntityState): def __init__(self): super(AgentState, self).__init__() # communication utterance self.c = None # action of the agent class Action(object): def __init__(self): # physical action self.u = None # communication action self.c = None # properties and state of physical world entity class Entity(object): def __init__(self): # name self.name = '' # properties: self.size = 0.050 # entity can move / be pushed self.movable = False # entity collides with others self.collide = True # material density (affects mass) self.density = 25.0 # color self.color = None # max speed and accel self.max_speed = None self.accel = None # state self.state = EntityState() # mass self.initial_mass = 1.0 @property def mass(self): return self.initial_mass # properties of landmark entities class Landmark(Entity): def __init__(self): super(Landmark, self).__init__() # properties of agent entities class Agent(Entity): def __init__(self): super(Agent, self).__init__() # agents are movable by default self.movable = True # cannot send communication signals self.silent = False # cannot observe the world self.blind = False # physical motor noise amount self.u_noise = None # communication noise amount self.c_noise = None # control range self.u_range = 1.0 # state self.state = AgentState() # action self.action = Action() # script behavior to execute self.action_callback = None # multi-agent world class World(object): def __init__(self): # list of agents and entities (can change at execution-time!) self.agents = [] self.landmarks = [] # communication channel dimensionality self.dim_c = 0 # position dimensionality self.dim_p = 2 # color dimensionality self.dim_color = 3 # simulation timestep self.dt = 0.1 # physical damping self.damping = 0.25 # contact response parameters self.contact_force = 1e+2 self.contact_margin = 1e-3 # return all entities in the world @property def entities(self): return self.agents + self.landmarks # return all agents controllable by external policies @property def policy_agents(self): return [agent for agent in self.agents if agent.action_callback is None] # return all agents controlled by world scripts @property def scripted_agents(self): return [agent for agent in self.agents if agent.action_callback is not None] # update state of the world def step(self): # set actions for scripted agents for agent in self.scripted_agents: agent.action = agent.action_callback(agent, self) # gather forces applied to entities p_force = [None] * len(self.entities) # apply agent physical controls p_force = self.apply_action_force(p_force) # apply environment forces p_force = self.apply_environment_force(p_force) # integrate physical state self.integrate_state(p_force) # update agent state for agent in self.agents: self.update_agent_state(agent) # gather agent action forces def apply_action_force(self, p_force): # set applied forces for i,agent in enumerate(self.agents): if agent.movable: noise = np.random.randn(*agent.action.u.shape) * agent.u_noise if agent.u_noise else 0.0 p_force[i] = agent.action.u + noise return p_force # gather physical forces acting on entities def apply_environment_force(self, p_force): # simple (but inefficient) collision response for a,entity_a in enumerate(self.entities): for b,entity_b in enumerate(self.entities): if(b <= a): continue [f_a, f_b] = self.get_collision_force(entity_a, entity_b) if(f_a is not None): if(p_force[a] is None): p_force[a] = 0.0 p_force[a] = f_a + p_force[a] if(f_b is not None): if(p_force[b] is None): p_force[b] = 0.0 p_force[b] = f_b + p_force[b] return p_force # integrate physical state def integrate_state(self, p_force): for i,entity in enumerate(self.entities): if not entity.movable: continue entity.state.p_vel = entity.state.p_vel * (1 - self.damping) if (p_force[i] is not None): entity.state.p_vel += (p_force[i] / entity.mass) * self.dt if entity.max_speed is not None: speed = np.sqrt(np.square(entity.state.p_vel[0]) + np.square(entity.state.p_vel[1])) if speed > entity.max_speed: entity.state.p_vel = entity.state.p_vel / np.sqrt(np.square(entity.state.p_vel[0]) + np.square(entity.state.p_vel[1])) * entity.max_speed entity.state.p_pos += entity.state.p_vel * self.dt def update_agent_state(self, agent): # set communication state (directly for now) if agent.silent: agent.state.c = np.zeros(self.dim_c) else: noise = np.random.randn(*agent.action.c.shape) * agent.c_noise if agent.c_noise else 0.0 agent.state.c = agent.action.c + noise # get collision forces for any contact between two entities def get_collision_force(self, entity_a, entity_b): if (not entity_a.collide) or (not entity_b.collide): return [None, None] # not a collider if (entity_a is entity_b): return [None, None] # don't collide against itself # compute actual distance between entities delta_pos = entity_a.state.p_pos - entity_b.state.p_pos dist = np.sqrt(np.sum(np.square(delta_pos))) # minimum allowable distance dist_min = entity_a.size + entity_b.size # softmax penetration k = self.contact_margin penetration = np.logaddexp(0, -(dist - dist_min)/k)*k force = self.contact_force * delta_pos / dist * penetration force_a = +force if entity_a.movable else None force_b = -force if entity_b.movable else None return [force_a, force_b]