import re, sys
from copy import deepcopy
from gym_nethack.nhdata import *
from gym_nethack.fileio import DIR_CHAR
from gym_nethack.conn import send_msg, rcv_msg, nethack_dir
from gym_nethack.misc import to_matrix, VERBOSE, verboseprint
[docs]def unpack_msg(msg, base_map, ignore_monsters=False, parse_ammo=True, update_base=True, parse_monsters=True):
attrstat = msg[(21*COLNO):]
if 'Dlvl' not in attrstat:
raise Exception("Unexpected NH message: " + msg)
align_pos = attrstat.find("Dlvl")
assert align_pos >= 0
top_delim_pos = attrstat[align_pos:].find("--")
assert top_delim_pos >= 0
top_delim_pos += align_pos
ppos_pos = attrstat[top_delim_pos+2:].find("**")
assert ppos_pos >= 0
ppos_pos += top_delim_pos+2
attmsg = attrstat[0:align_pos].replace("18/**", "21")
sttmsg = attrstat[align_pos:top_delim_pos]
topmsg = attrstat[top_delim_pos+2:ppos_pos]
posmsg = attrstat[ppos_pos+2:-8].replace('\x00', "")
uy, ux = posmsg.split('-')
cur_pos = (int(ux), int(uy))
back_glyph = int(attrstat[-6:-2])
if 'laden with moisture' in topmsg or 'engulfs' in topmsg: # map obscured
update_base = False
full_map = to_matrix(list(msg[:(21*COLNO)]), COLNO) # 20x120
ammo_positions = []
monster_positions = []
item_positions = set()
food_positions = set()
misc_positions = []
critical_positions = []
concrete_positions = set()
num_explored_squares = 0
for i, row in enumerate(full_map):
for j, col in enumerate(row):
if col != ' ':
num_explored_squares += 1
if update_base and base_map is not None and col in TOPOGRAPHICAL_CHARS:
base_map[i][j] = full_map[i][j]
if col in '+.': #|-
critical_positions.append((i, j))
elif col in '|-':
concrete_positions.add((i, j))
#elif col in '`^<' and slim_charset:
# base_map[i][j] = '.'
# full_map[i][j] = '.'
elif col == ')' and parse_ammo:
ammo_positions.append((i, j))
elif col in MONS_CHARS and (i, j) != cur_pos:
monster_positions.append((i, j))
if col in '$[(%?/=!"*):':
item_positions.add((i, j))
if not parse_monsters: # or slim_charset:
if base_map is not None:
base_map[i][j] = '.'
full_map[i][j] = '.'
else:
if update_base and base_map is not None and base_map[i][j] == ' ':
base_map[i][j] = '^'
full_map[i][j] = 'i'
if col == '%':
food_positions.add((i, j))
elif col in '_{\}': # room features
misc_positions.append((i, j))
if base_map is not None:
base_map[i][j] = '.'
full_map[i][j] = '.'
elif col == '`' or (col in MONS_CHARS and (i, j) != cur_pos):
if col not in MONS_CHARS:
misc_positions.append((i, j))
if not parse_monsters: # or slim_charset:
if base_map is not None:
base_map[i][j] = '.'
full_map[i][j] = '.'
else:
if update_base and base_map is not None and base_map[i][j] == ' ':
base_map[i][j] = '^'
if update_base and base_map is None:
base_map = deepcopy(full_map)
base_map[cur_pos[0]][cur_pos[1]] = '.' # player always starts in room.
for ipos in list(item_positions) + misc_positions:
base_map[ipos[0]][ipos[1]] = '.'
for ipos in monster_positions:
base_map[ipos[0]][ipos[1]] = '^'
concrete_positions = set() #TODO
return base_map, full_map, attmsg, sttmsg, topmsg, cur_pos, monster_positions, ammo_positions, item_positions, food_positions, back_glyph, critical_positions, concrete_positions, num_explored_squares
[docs]def get_inventory(socket):
try:
send_msg(socket, CMD.INVENTORY)
raw = rcv_msg(socket)
except zmq.error.Again:
raise Exception("Error occurred communicating with NetHack to get inventory.")
items = raw.split("--")
inventory = []
matched_names = []
for item_str in items[1:-1]:
if len(item_str.split(",")) != 2:
print("Couldn't interpret:", item_str)
item_name, item_char = item_str.split(",")
item_name = item_name.replace("-2", "-1").replace("+2", "+1").replace("-3", "-1").replace("+3", "+1").replace("+4", "+1").replace("-4", "-1").replace("thoroughly ", "").replace("very ", "")
qty, stripped_name = get_stripped_itemname(item_name)
if stripped_name in IGNORED_ITEMS:
continue
matched_item = None
for i, weap in enumerate(ALL_ITEMS):
if item_match(weap.full_name, stripped_name):
#if weap.full_name in matched_names:
# print(weap.full_name, "already present in inven! :", inventory)
matched_names.append(weap.full_name)
matched_item = weap
break
if matched_item is None:
verboseprint("\nMatched item was none! Item_str:", item_str, "and stripped:", stripped_name)
continue
assert item_char is not None
if ':' in item_name:
ind = item_name.index(':')
qty = item_name[ind+1:ind+2]
inventory.append((item_name, item_char, stripped_name, matched_item, 1 if len(qty) == 0 else int(qty)))
return inventory
[docs]def get_stripped_itemname(inventory_name):
# get rid of opening digits (quantities)
qty = ""
while inventory_name[0].isdigit():
qty += inventory_name[0]
inventory_name = inventory_name[1:]
while inventory_name[0] == ' ':
inventory_name = inventory_name[1:]
strip_strs = ['a ', 'an ', 'the ']
for sstr in strip_strs:
if inventory_name.startswith(sstr):
inventory_name = inventory_name.replace(sstr, "", 1)
# get rid of brackets
if '(' in inventory_name:
inventory_name = inventory_name[:inventory_name.index('(')-1]
# get rid of plurals
for x, y in PLURALS:
inventory_name = inventory_name.replace(x, y)
# named items (artifacts)
if ' named ' in inventory_name:
assert False #TODO-incorporate buc status
inventory_name = inventory_name.split(' named ')[1]
if 'ring of' in inventory_name:
inventory_name = inventory_name.replace("+0 ", "")
# only needed when artifacts are included
#SPECIAL_ARTIFACT_NAMES = ['Staff of Aesculapius', 'Tsurugi of Muramasa', 'Sceptre of Might']
#if any(art_name in inventory_name for art_name in SPECIAL_ARTIFACT_NAMES):
# inventory_name = inventory_name.replace('The ', '')
return qty, inventory_name
[docs]def item_match(item_name, inventory_name):
if 'cursed' not in inventory_name and 'blessed' not in inventory_name and 'holy water' not in inventory_name:
inventory_name = 'uncursed ' + inventory_name # default to uncursed
#if VERBOSE:
# print("matching '", item_name, "' to '", inventory_name, "'")
return item_name == inventory_name
[docs]def update_attrs(attr_line, attributes):
m = re.search(r'the (?P<role_title>'+NH_ROLE_TITLES_FLAT+')\s*'
r'St:(?P<st>[/\d\*]+)\s*'
r'Dx:(?P<dx>\d+)\s*'
r'Co:(?P<co>\d+)\s*'
r'In:(?P<in>\d+)\s*'
r'Wi:(?P<wi>\d+)\s*'
r'Ch:(?P<ch>\d+)\s*'
r'S:(?P<sc>\d+)\s*'
r'(I:(?P<inv>\d+)\s*)'
r'(?P<align>\S+)', attr_line)
if m:
prev_attributes = deepcopy(attributes)
attributes = m.groupdict()
for k, v in attributes.items():
if v and v.isdigit():
attributes[k] = int(v)
if '/' in str(attributes['st']):
strength = str(attributes['st']).split("/")
strength = [int(s) for s in strength]
assert strength[0] == 18
if 0 <= strength[1] <= 31:
strength = 19
elif 32 <= strength[1] <= 81:
strength = 20
elif 82 <= strength[1]:
strength = 21
attributes['st'] = strength
return prev_attributes, attributes
else:
raise Exception("No attributes! Attr line was:\n" + attr_line)
[docs]def update_stats(stat_line, stats):
m = re.search(r'Dlvl:(?P<dlvl>\S+)\s*'
r'\\\w+:(?P<money>\d+)\s*'
r'HP:(?P<hp>\d+)\((?P<hp_max>\d+)\)\s*'
r'Pw:(?P<pw>\d+)\((?P<pw_max>\d+)\)\s*'
r'AC:(?P<ac>[+-]?\d+)\s*'
r'R:(?P<rooms>\d+)\s*'
r'SD:(?P<sdoor>\d+)\s*'
r'Exp:(?P<exp>\d+)\s*'
r'(?P<hunger>Satiated|Hungry|Weak|Fainting)?\s*'
r'(?P<stun>Stun)?\s*'
r'(?P<conf>Conf)?\s*'
r'(?P<blind>Blind)?\s*'
r'(?P<burden>Burdened|Stressed|Strained|Overtaxed|Overloaded)?\s*'
r'(?P<hallu>Hallu)?\s*', stat_line)
if m:
prev_stats = deepcopy(stats)
stats = m.groupdict()
for k, v in stats.items():
if v and v.isdigit():
stats[k] = int(v)
return prev_stats, stats
else:
raise Exception("No statistics! Stat line was: ", stat_line)
[docs]def assert_setup(starting_items, inventory, cur_monster, top_line, monster_positions, stats, start_ac):
verboseprint("Asserting setup...")
# check that we have all 5 items in our inventory...
for item in starting_items:
found = False
if item[0].isdigit():
item = item[item.index(" ")+1:-1] # get rid of ammo quantity
for _, _, stripped_inven_item, matched_item, _ in inventory:
#print("Trying to match the starting item,", item, "to the thing in our inventory, stripped name:", stripped_inven_item)
itemname = matched_item.full_name.replace("poisoned ", "")
if itemname == item.full_name: # item_match(item.full_name, stripped_inven_item):
found = True
break
if not found:
print("\nCould not find", item, "in inventory:")
for inven_name, _, _, it, _ in inventory:
print(inven_name, it.full_name.replace("poisoned ", ""))
#append("Could not find " + item.full_name + " in inventory: " + str(inventory) + " (starting items: " + str(starting_items) + ")", self.savedir+"errors")
#raise Exception
# check that there is at least one monster...
if len(monster_positions) < 1 and stats['hallu'] != 'Hallu':
#append("Could not find monster " + cur_monster, self.savedir+"errors")
raise Exception("No monster (was looking for " + cur_monster + ")!")
# check that the monster is the correct char
mon_names = [cur_monster.replace("_", "-").lower(), cur_monster.replace("_", " ").lower()]
if all(mname not in top_line.lower() for mname in mon_names) and stats['hallu'] != 'Hallu':
print("Monster (" + cur_monster + ") not in top line (" + top_line.lower() + ")!")
#append("Monster (" + cur_monster + ") not in top line (" + top_line.lower() + ")", self.savedir+"errors")
#return False
verboseprint("Setup looks good.")
return True
[docs]def save_nh_conf(proc_id, secret_rooms=False, character="Bar", race="Human", clvl=1, st=0, dx=0, mtype=None, create_mons=False, ac=999, inven=[], dlvl=1, lyc=None, stateffs=1, adj_mlvl=True, create_items=True, seed=-1):
if sys.platform == "win32":
sysconf_fname = nethack_dir + DIR_CHAR + "defaults.nh"
else:
sysconf_fname = nethack_dir + DIR_CHAR + "sysconf" + str(proc_id)
verboseprint("Writing to sysconf file:", sysconf_fname)
with open(sysconf_fname, 'w') as sysconf:
sysconf.write("OPTIONS=!autopickup, !bones, pushweapon, pettype:none, time, disclose:-i -a -v -g -c -o, ")
if secret_rooms:
sysconf.write("secret_rooms, ")
if not adj_mlvl:
sysconf.write("!")
sysconf.write("adjust_mlvl, ")
if not create_items:
sysconf.write("!")
sysconf.write("create_items, ")
if mtype is not None:
sysconf.write("combat_setup, create_mons, ")
elif create_mons:
sysconf.write("!combat_setup, create_mons, ")
else:
sysconf.write("!combat_setup, !create_mons, ")
sysconf.write("character:"+character+", race:"+race+", gender:male, name:Merlin, align:chaotic, ")
sysconf.write("reqlevel:"+str(clvl)+", reqstr:"+str(st)+", reqdex:"+str(dx)+", reqdlvl:"+str(dlvl))
if ac < 999:
sysconf.write(", reqac:"+str(ac))
if lyc is not None:
sysconf.write(", reqlyc:"+str(int(lyc)))
if stateffs > 1:
sysconf.write(", stateffs:"+str(stateffs))
if mtype is not None:
sysconf.write(", mtypeid:"+str(mtype))
if seed > -1:
sysconf.write(", seed:"+str(seed))
sysconf.write("\nWIZKIT=wizkit" + str(proc_id) + ".txt\n")
wizkit_fname = nethack_dir + DIR_CHAR + "wizkit" + str(proc_id) + ".txt"
with open(wizkit_fname, 'w') as wizkit:
for item in inven:
#enc = 3 if 'wand' in item else 0 # charges for wand
#buc = 'uncursed' if 'holy water' not in item else 'blessed'
#
#if item[0].isdigit(): # if quantity specified
# quantity = item.split(" ")[0]
# item = item[item.index(" "):]
#else:
# quantity = 1
#
#wizkit.write(str(quantity) + " " + buc + " +" + str(enc) + " " + item + "\n")
wizkit.write(item + "\n")
[docs]def get_cmd_from_delta(dx, dy):
for delta, cmd in DIR_MAPPING:
if dx == delta[0] and dy == delta[1]:
return cmd
raise Exception("Couldn't find a CMD mapping from " + str(dx) + "," + str(dy))
[docs]class Room(object):
def __init__(self, nh):
self.nh = nh
self.wall_positions = set()
self.wall_openings = set()
self.corners = set()
self.positions = set()
self.top_left_corner = None
self.__get_wall_infos()
self.centroid = (sum([p[0] for p in self.positions]) // len(self.positions), sum([p[1] for p in self.positions]) // len(self.positions))
def __get_dists_to_walls(self):
dists = []
for dx, dy in DIRS:
cur_x, cur_y = self.nh.cur_pos
d = 0
while self.nh.basemap_char(cur_x+dx, cur_y+dy) not in WALL_CHARS + DOOR_CHARS and self.nh.basemap_char(cur_x+dx, cur_y+dy) not in DOOR_CHARS and (cur_x+dx, cur_y+dy) not in self.nh.room_openings:
# probably still in a room
adjacent_chars = self.nh.get_chars_adjacent_to(cur_x+dx, cur_y+dy)
if (adjacent_chars.count('.') + adjacent_chars.count('<') + adjacent_chars.count('>') + adjacent_chars.count('^')) < 2:
break
cur_x += dx
cur_y += dy
d += 1
dists.append(d)
return dists
def __get_wall_positions(self, fixed, c1, c2, x_axis=True):
positions, openings = [], []
for c in range(c1, c2):
if x_axis:
wall = self.nh.base_map[c][fixed] == '|'
coord = (c, fixed)
else:
wall = self.nh.base_map[fixed][c] == '-'
coord = (fixed, c)
if wall:
positions.append(coord)
else:
openings.append(coord)
self.wall_positions.update(positions)
self.wall_openings.update(openings)
def __get_wall_infos(self):
x, y = self.nh.cur_pos
wall_dists = self.__get_dists_to_walls()
topx = x - wall_dists[0] # (-1, 0)
topy = y - wall_dists[2] # (0, -1)
bottomx = x + wall_dists[1] # (1, 0)
bottomy = y + wall_dists[3] # (0, 1)
self.corners = [(topx-1, topy-1), (bottomx+1, bottomy+1), (topx-1, bottomy+1), (bottomx+1, topy-1)]
self.__get_wall_positions(topy-1, topx, bottomx+1)
self.__get_wall_positions(bottomy+1, topx, bottomx+1)
self.__get_wall_positions(topx-1, topy, bottomy+1, x_axis=False)
self.__get_wall_positions(bottomx+1, topy, bottomy+1, x_axis=False)
for px in range(topx, bottomx+1):
for py in range(topy, bottomy+1):
self.positions.add((px, py))
self.top_left_corner = (topx, topy)
[docs] def count_char(self, char):
count = 0
for x, y in self.positions:
if self.nh.map[x][y] == char:
count += 1
return count
[docs] def find_char(self, char):
for x, y in self.positions:
if self.nh.map[x][y] == char:
return (True, x, y)
return (False, -1, -1)
[docs] def get_lined_positions(self, mpos):
lined_positions = set()
# get possible positions
for dir_x, dir_y in DIRS_DIAG:
for mrange in range(13): # TODO
lx = mpos[0] + dir_x*mrange
ly = mpos[1] + dir_y*mrange
if (lx, ly) not in self.positions:
break
if (mpos[0] != lx and mpos[1] != ly):
lined_positions.add((lx, ly))
return lined_positions
[docs]class Passage():
"""Helper class to maintain info about map corridors and what rooms they lead to."""
next_id = 0
def __init__(self, first_room, first_position):
self.positions = set()
self.connected_rooms = set()
self.connected_room_openings = set()
self.connect_room(first_room)
self.add_position(first_position)
self.id = Passage.next_id
Passage.next_id += 1
#verboseprint("Creating a new passage with first pos", self.positions, "and first room", self.connected_rooms)
def __repr__(self):
return str(self.id) + str(self.positions) + str(self.connected_rooms)
[docs] def connect_room(self, room_pos):
self.connected_rooms.add((room_pos))
[docs] def add_position(self, passage_pos):
self.positions.add((passage_pos))
[docs] @staticmethod
def merge(p1, p2):
for room in p2.connected_rooms:
p1.connect_room(room)
for pos in p2.positions:
p1.add_position(pos)
for pos in p2.connected_room_openings:
p1.connected_room_openings.add((pos))
return p1