-
Notifications
You must be signed in to change notification settings - Fork 0
/
util.py
357 lines (303 loc) · 11.4 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
import subprocess
import os
from PIL import Image, ImageDraw
import multiprocessing as mp
import time
import glob
import shutil
def scan_quadseeds(qx, qy, search_time, quadfile):
process = subprocess.Popen(
['./find_quadhuts', str(qx), str(qy)], stdout=open(quadfile, 'w'))
try:
print(
f'Generating quad witch hut seeds at ({qx * 512}, {qy * 512}) for {search_time} seconds...')
process.wait(timeout=search_time)
except subprocess.TimeoutExpired:
process.kill()
# Remove last line from file
with open(quadfile) as qf:
lines = qf.readlines()[:-1]
qf.close()
with open(quadfile, 'w') as qf:
qf.writelines(lines)
qf.close()
print(f'Found {len(lines)} quad witch hut seeds!')
def convert_all_ppm_to_png(seedlist, folder, xsize=1024, ysize=512):
"""Create png image given seed and folder."""
# Create folder, make sure it exists
folder = folder.strip().rstrip('/') + "/"
if not os.path.exists(os.path.dirname(folder)):
os.makedirs(os.path.dirname(folder))
def convert_single_ppm(seed):
ppm_filepath = folder + str(seed)
# Call c binary to generate ppm
os.system(f'./image_generator {seed} {folder}/ {xsize} {ysize}')
while not os.path.exists(ppm_filepath + '.ppm'):
time.sleep(0.1)
# Convert ppm to png, remove ppm to save space
im = Image.open(f'{ppm_filepath}.ppm')
draw = ImageDraw.Draw(im)
draw.rectangle([im.width//2 - 20, im.height//2 - 20, im.width //
2 + 20, im.height//2 + 20], width=4, outline="#ff0000")
im.save(f'{ppm_filepath}.png')
os.remove(f'{ppm_filepath}.ppm')
def convert_batch_ppm(miniseedlist):
for seed in miniseedlist:
seed = int(seed.strip())
convert_single_ppm(seed)
# Determine number of processes and splits per process
num_processes = mp.cpu_count()
even_split = len(seedlist) // num_processes
processpool = []
# Create a process for each split, split into even chunks
for idx in range(num_processes):
# If last index, give rest
if idx == num_processes - 1:
a = mp.Process(target=convert_batch_ppm, args=[
seedlist[idx*even_split:]])
else:
a = mp.Process(target=convert_batch_ppm, args=[
seedlist[idx*even_split:idx*even_split + even_split]])
a.start()
processpool.append(a)
# Attempt to join processes
for process in processpool:
process.join()
def get_lookup_table():
return {
"ocean": 0,
"deep_ocean": 24,
"frozen_ocean": 10,
"deep_frozen_ocean": 50,
"cold_ocean": 46,
"deep_cold_ocean": 49,
"lukewarm_ocean": 45,
"deep_lukewarm_ocean": 48,
"warm_ocean": 44,
"deep_warm_ocean": 47,
"river": 7,
"frozen_river": 11,
"beach": 16,
"stone_shore": 25,
"snowy_beach": 26,
"forest": 4,
"wooded_hills": 18,
"flower_forest": 132,
"birch_forest": 27,
"birch_forest_hills": 28,
"tall_birch_forest": 155,
"tall_birch_hills": 156,
"dark_forest": 29,
"dark_forest_hills": 157,
"jungle": 21,
"jungle_hills": 22,
"modified_jungle": 149,
"jungle_edge": 23,
"modified_jungle_edge": 151,
"bamboo_jungle": 168,
"bamboo_jungle_hills": 169,
"taiga": 5,
"taiga_hills": 19,
"taiga_mountains": 133,
"snowy_taiga": 30,
"snowy_taiga_hills": 31,
"snowy_taiga_mountains": 158,
"giant_tree_taiga": 32,
"giant_tree_taiga_hills": 33,
"giant_spruce_taiga": 160,
"giant_spruce_taiga_hills": 161,
"mushroom_fields": 14,
"mushroom_field_shore": 15,
"swamp": 6,
"swamp_hills": 134,
"savanna": 35,
"savanna_plateau": 36,
"shattered_savanna": 163,
"shattered_savanna_plateau": 164,
"plains": 1,
"sunflower_plains": 129,
"desert": 2,
"desert_hills": 17,
"desert_lakes": 130,
"snowy_tundra": 12,
"snowy_mountains": 13,
"ice_spikes": 140,
"mountains": 3,
"wooded_mountains": 34,
"gravelly_mountains": 131,
"modified_gravelly_mountains": 162,
"mountain_edge": 20,
"badlands": 37,
"badlands_plateau": 39,
"modified_badlands_plateau": 167,
"wooded_badlands_plateau": 38,
"modified_wooded_badlands_plateau": 166,
"eroded_badlands": 165,
"nether_wastes": 8,
"crimson_forest": 171,
"warped_forest": 172,
"soul_sand_valley": 170,
"basalt_deltas": 173,
"the_end": 9,
"small_end_islands": 40,
"end_midlands": 41,
"end_highlands": 42,
"end_barrens": 43,
"the_void": 127,
}
def get_filter(given_filter=None, filter_path='biome_filters.txt'):
"""Read in user-defined filter from file."""
if not os.path.exists(filter_path):
open(filter_path, 'w').close()
fil_lines = {}
with open(filter_path) as filter_file:
for line in filter_file:
if line.strip().startswith('#'):
continue
fil_lines[int(line.strip().split(': ')[0])] = ", ".join(line.strip().split(": ")[1:])
# fil_lines = open(filter_path).readlines()
if len(fil_lines) == 0:
print("Error: no filters found in", filter_path)
exit(1)
# Only query user if option is not used
if given_filter is None:
print("Select a filter out of the following:")
for idx, line in fil_lines.items():
print(f" {idx}: {line.strip()}")
filter_selection = int(input())
else:
filter_selection = given_filter
user_filter = fil_lines[filter_selection]
user_filter = [val.strip() for val in user_filter.split(', ')]
return user_filter, filter_selection
def get_search_coords(bank_folder="quad_bank/"):
"""
Get all coords from seed bank files.
Example filename: "quad_bank/quadbank_8x0y.txt"
"""
if not os.path.exists(bank_folder):
os.mkdir(bank_folder)
s_coords = [fil.split("/")[1].split("_")[1].split(".")[0]
for fil in glob.glob(bank_folder + "*.txt")]
if len(s_coords) == 0:
print(
f"Error: no seed files found in '{bank_folder}', generate some with ./find_quadhuts")
exit(1)
return s_coords
def make_splits(master_file, search_coords, tmp_dir="biome_scans/tmp/"):
"""Generate equal splits of master seedfile in tmp_dir."""
if os.path.exists(tmp_dir):
shutil.rmtree(tmp_dir)
os.mkdir(tmp_dir)
# Read in master file
master_lines = open(master_file).readlines()
# Split master file to separate files
num_splits = mp.cpu_count()
even_split = len(master_lines) // num_splits
for idx in range(num_splits):
if idx == num_splits - 1:
split = master_lines[idx*even_split:]
else:
split = master_lines[idx*even_split:idx *
even_split + even_split]
with open(tmp_dir + search_coords + "_split" + str(idx) + ".txt", 'w') as outfile:
outfile.writelines(split)
def ensure_scan_structure(filter_id, search_range, scan_folder="biome_scans/", tmp_dir="tmp/"):
"""Ensures base folder structure exists."""
if not os.path.exists(scan_folder):
os.mkdir(scan_folder)
if not os.path.exists(scan_folder + tmp_dir):
os.mkdir(scan_folder + tmp_dir)
filter_path = scan_folder + f"filter{filter_id}_{search_range}r/"
if os.path.exists(filter_path):
shutil.rmtree(filter_path)
os.mkdir(filter_path)
os.mkdir(filter_path + "all/")
def run_biome_scan(biome_ids, search_coords, search_range, tmp_dir="biome_scans/tmp/"):
"""
Scans splits of seedbank file in C.
Writes to filtered files per split in tmp_dir, waits until complete.
"""
args = [
'./find_filtered_biomes',
str(mp.cpu_count()),
str(search_range),
tmp_dir + search_coords + ".txt"
]
args += biome_ids
filter_process = subprocess.Popen(args)
filter_process.wait()
def aggregate_scan(filter_id, search_coords, search_range, scan_folder="biome_scans/", tmp_dir="biome_scans/tmp/"):
"""Aggregate filtered splits and return path."""
base_dir = f"{scan_folder}filter{filter_id}_{search_range}r/"
export_folder = base_dir + search_coords + "/"
if not os.path.exists(base_dir):
os.mkdir(base_dir)
if not os.path.exists(export_folder):
os.mkdir(export_folder)
EXPORT_PATH = export_folder + search_coords + "_filtered.txt"
export_file = open(EXPORT_PATH, "w")
for idx in range(mp.cpu_count()):
fil_path = tmp_dir + search_coords + \
"_split" + str(idx) + "_filtered.txt"
filtered_lines = open(fil_path).readlines()
export_file.writelines(filtered_lines)
export_file.close()
return EXPORT_PATH
def generate_images(export_filepath, s_coord):
"""Generate png images of matched seeds."""
generated_path = os.path.dirname(export_filepath) + "/generated/"
filtered_lines = open(export_filepath).readlines()
if os.path.exists(generated_path):
shutil.rmtree(generated_path)
os.mkdir(generated_path)
convert_ppm_parallelized(
filtered_lines, export_filepath, generated_path, s_coord)
# Copy images to all/ directory also
all_path = os.path.dirname(os.path.dirname(export_filepath)) + "/all/"
# ignore error if no matches
os.system(f'cp {generated_path}* {all_path} 2>/dev/null')
def convert_ppm_parallelized(filtered_lines, export_filepath, generated_path, s_coord, tmp_dir='biome_scans/tmp/'):
"""Convert all filtered seeds to ppm, then png images."""
# Call C code to convert to ppm images
process = subprocess.Popen([
'./generate_images',
generated_path,
tmp_dir+s_coord+".txt",
str(1024),
str(512),
str(16)
])
process.wait()
# Convert ppms to pngs with parallelism
def convert_single_ppm(seed):
ppm_filepath = generated_path + str(seed)
# Convert ppm to png, remove ppm to save space
im = Image.open(f'{ppm_filepath}.ppm')
draw = ImageDraw.Draw(im)
draw.rectangle([im.width//2 - 20, im.height//2 - 20, im.width //
2 + 20, im.height//2 + 20], width=4, outline="#ff0000")
im.save(f'{ppm_filepath}.png')
os.remove(f'{ppm_filepath}.ppm')
def convert_batch_ppm(miniseedlist):
for seed in miniseedlist:
seed = int(seed.strip())
convert_single_ppm(seed)
# Determine number of processes and splits per process
num_processes = mp.cpu_count()
even_split = len(filtered_lines) // num_processes
processpool = []
# Create a process for each split, split into even chunks
for idx in range(num_processes):
# If last index, give rest
if idx == num_processes - 1:
a = mp.Process(target=convert_batch_ppm, args=[
filtered_lines[idx*even_split:]])
else:
a = mp.Process(target=convert_batch_ppm, args=[
filtered_lines[idx*even_split:idx*even_split + even_split]])
a.start()
processpool.append(a)
# Attempt to join processes
for process in processpool:
process.join()