import os
from pathlib import Path
from copy import deepcopy
from collections import OrderedDict
import argparse
import numpy as np
import traceback
import json, jsonschema
import gc
import networkx as nx
from saltproc import SerpentDepcode, OpenMCDepcode, Simulation, Reactor
from saltproc import Process, Sparger, Separator, Materialflow
# Validator that fills defualt values of JSON schema before validating
from saltproc._schema_default import DefaultFillingValidator
CODENAME_MAP = {'serpent': SerpentDepcode,
'openmc': OpenMCDepcode}
SECOND_UNITS = ('s', 'sec')
MINUTE_UNITS = ('min', 'minute')
HOUR_UNITS = ('h', 'hr', 'hour')
DAY_UNITS = ('d', 'day')
YEAR_UNITS = ('a', 'year', 'yr')
_SECONDS_PER_DAY = 60 * 60 * 24
_MINUTES_PER_DAY = 60 * 24
_HOURS_PER_DAY = 24
_DAYS_PER_YEAR = 365.25
[docs]def run():
""" Inititializes main run"""
threads, saltproc_input = parse_arguments()
input_path, process_file, dot_file, mpi_args, \
rebuild_saltproc_results, run_without_reprocessing, object_input = \
read_main_input(saltproc_input)
_print_simulation_input_info(object_input[1], object_input[0])
# Intializing objects
depcode = _create_depcode_object(object_input[0])
simulation = _create_simulation_object(
object_input[1], depcode)
msr = _create_reactor_object(object_input[2])
# Check: Restarting previous simulation or starting new?
failed_step = simulation.check_restart()
# Run sequence
# Start sequence
for step_idx in range(failed_step, len(msr.depletion_timesteps)):
print("\n\n\nStep #%i has been started" % (step_idx + 1))
simulation.sim_depcode.write_runtime_input(msr,
step_idx,
simulation.restart_flag)
if rebuild_saltproc_results:
simulation.sim_depcode.rebuild_simulation_files(step_idx)
else:
depcode.run_depletion_step(mpi_args, threads)
if step_idx == 0 and simulation.restart_flag is False: # First step
# Read general simulation data which never changes
simulation.store_depcode_metadata()
# Parse and store data for initial state (beginning of step_idx)
mats = depcode.read_depleted_materials(False)
simulation.store_mat_data(mats, step_idx - 1, False)
# Finish of First step
# Main sequence
mats = depcode.read_depleted_materials(True)
simulation.store_mat_data(mats, step_idx, False)
simulation.store_step_neutronics_parameters()
simulation.store_step_metadata()
# Reprocessing here
if run_without_reprocessing:
waste_and_feed_streams = None
waste_streams = None
extracted_mass = None
else:
for key in mats.keys():
print('\nMass and volume of '
f'{key} before reproc: {mats[key].mass} g, ',
f'{mats[key].volume} cm3')
waste_streams, extracted_mass = reprocess_materials(mats,
process_file,
dot_file)
for key in mats.keys():
print('\nMass and volume of '
f'{key} after reproc: {mats[key].mass} g, ',
f'{mats[key].volume} cm3')
waste_and_feed_streams = refill_materials(mats,
extracted_mass,
waste_streams,
process_file)
for key in mats.keys():
print('\nMass and volume of '
f'{key} after refill: {mats[key].mass} g, ',
f'{mats[key].volume} cm3')
print("Removed mass [g]:", extracted_mass)
# Store in DB after reprocessing and refill (right before next depl)
simulation.store_after_repr(mats, waste_and_feed_streams, step_idx)
depcode.update_depletable_materials(mats, simulation.burn_time)
# Preserve depletion and transport result and input files
if not rebuild_saltproc_results:
depcode.preserve_simulation_files(step_idx)
del mats, waste_streams, waste_and_feed_streams, extracted_mass
gc.collect()
# Switch to another geometry?
if simulation.adjust_geo and simulation.read_k_eds_delta(step_idx):
depcode.switch_to_next_geometry()
print("\nTime at the end of current depletion step: %fd" %
simulation.burn_time)
print("Simulation succeeded.\n")
def parse_arguments():
"""Parses arguments from command line.
Parameters
----------
Returns
-------
s : int
Number of threads to use for shared-memory parallelism.
i : str
Path and name of main SaltProc input file (json format).
"""
parser = argparse.ArgumentParser()
parser.add_argument('-s', '--threads',
type=int,
default=None,
help='Number of threads to use for shared-memory \
parallelism.')
parser.add_argument('-i', # main input file
type=str,
default=None,
help='Path and name of SaltProc main input file')
args = parser.parse_args()
return args.threads, args.i
[docs]def read_main_input(main_inp_file):
"""Reads main SaltProc input file (json format).
Parameters
----------
main_inp_file : str
Path to SaltProc main input file and name of this file.
Returns
-------
input_path : Path
Path to main input file
process_file : str
Path to the `.json` file describing the fuel reprocessing components.
dot_file : str
Path to the `.dot` describing the fuel reprocessing paths.
mpi_args : list of str
Arguments for running simulations on supercomputers using mpiexec or
similar programs.
rebuild_saltproc_results : bool
Flag to indicate whether or not to rebuild SaltProc results file
from existing depcode results
run_without_reprocessing : bool
Flag to indicate whether or not to run the depletion code in
SaltProc without applying the reprocessing system.
object_inputs : 3-tuple of dict
tuple containing the inputs for constructing the
:class:`~saltproc.Depcode`, :class:`~saltproc.Simulation`, and
:class:`~saltproc.Reactor` objects.
"""
input_schema = (Path(__file__).parents[0] / 'input_schema.json')
with open(main_inp_file) as f:
input_parameters = json.load(f)
with open(input_schema) as s:
schema = json.load(s)
try:
DefaultFillingValidator(schema).validate(input_parameters)
except jsonschema.exceptions.ValidationError:
print("Your input file is improperly structured: \n")
traceback.print_exc()
except jsonschema.exceptions.SchemaError:
traceback.print_exc()
except:
print("Something went wrong during schema validation.")
traceback.print_exc()
# Global input path
input_path = (Path.cwd() / Path(f.name).parents[0]).resolve()
os.chdir(input_path)
# Saltproc settings
process_file = str((input_path /
input_parameters['proc_input_file']).resolve())
dot_file = str((
input_path /
input_parameters['dot_input_file']).resolve())
output_path = input_parameters['output_path']
n_depletion_steps = input_parameters['n_depletion_steps']
mpi_args = input_parameters['mpi_args']
# Global output path
output_path = (input_path / output_path)
input_parameters['output_path'] = output_path.resolve()
# Create output directoy if it doesn't exist
if not Path(input_parameters['output_path']).exists():
Path(input_parameters['output_path']).mkdir(parents=True)
# Rebuild saltproc results?
rebuild_saltproc_results = input_parameters['rebuild_saltproc_results']
# Run without reprocessing?
run_without_reprocessing = input_parameters['run_without_reprocessing']
# Class settings
depcode_input = input_parameters['depcode']
simulation_input = input_parameters['simulation']
reactor_input = input_parameters['reactor']
codename = depcode_input['codename'].lower()
if codename == 'serpent':
depcode_input['template_input_file_path'] = str((
input_path /
depcode_input['template_input_file_path']).resolve())
elif codename == 'openmc':
for key in depcode_input['template_input_file_path']:
value = depcode_input['template_input_file_path'][key]
depcode_input['template_input_file_path'][key] = str((
input_path / value).resolve())
depcode_input['chain_file_path'] = \
str((input_path /
depcode_input['chain_file_path']).resolve())
# process depletion_settings
depletion_settings = depcode_input['depletion_settings']
operator_kwargs = depletion_settings['operator_kwargs']
if operator_kwargs != {}:
fission_q_path = operator_kwargs['fission_q']
if fission_q_path is not None:
operator_kwargs['fission_q'] = str(input_path / fission_q_path)
depletion_settings['operator_kwargs'] = operator_kwargs
depcode_input['depletion_settings'] = depletion_settings
else:
raise ValueError(f'{codename} is not a supported depletion code.'
' Accepts: "serpent" or "openmc".')
depcode_input['codename'] = codename
depcode_input['output_path'] = output_path
geo_list = depcode_input['geo_file_paths']
# Global geometry file paths
geo_file_paths = []
for g in geo_list:
geo_file_paths += [str((input_path / g).resolve())]
depcode_input['geo_file_paths'] = geo_file_paths
# Global output file paths
db_name = (output_path / simulation_input['db_name'])
simulation_input['db_name'] = str(db_name.resolve())
reactor_input = _process_main_input_reactor_params(
reactor_input, n_depletion_steps, depcode_input['codename'])
return input_path, process_file, dot_file, mpi_args, rebuild_saltproc_results, run_without_reprocessing, (
depcode_input, simulation_input, reactor_input)
def _print_simulation_input_info(simulation_input, depcode_input):
"""Helper function for `run()` """
print('Initiating Saltproc:\n'
'\tRestart = ' +
str(simulation_input['restart_flag']) +
'\n'
'\tTemplate File Path = ' +
str(depcode_input['template_input_file_path']) +
'\n'
'\tOutput HDF5 database Path = ' +
simulation_input['db_name'] +
'\n')
def _create_depcode_object(depcode_input):
"""Helper function for `run()` """
codename = depcode_input.pop('codename')
depcode = CODENAME_MAP[codename]
depcode = depcode(**depcode_input)
depcode_input['codename'] = codename
return depcode
def _create_simulation_object(simulation_input, depcode):
"""Helper function for `run()` """
simulation = Simulation(
sim_name='Super test',
sim_depcode=depcode,
restart_flag=simulation_input['restart_flag'],
adjust_geo=simulation_input['adjust_geo'],
db_path=simulation_input['db_name'])
return simulation
def _create_reactor_object(reactor_input):
"""Helper function for `run()` """
msr = Reactor(**reactor_input)
return msr
def _process_main_input_reactor_params(reactor_input,
n_depletion_steps,
codename):
"""
Process SaltProc reactor class input parameters based on the value and
data type of the `n_depletion_steps` parameter as well as the depletion code
being used, and throw errors if the input parameters are incorrect.
"""
depletion_timesteps = np.array(reactor_input['depletion_timesteps'])
power_levels = np.array(reactor_input['power_levels'])
depletion_timesteps, power_levels = \
_validate_depletion_timesteps_power_levels(n_depletion_steps,
depletion_timesteps,
power_levels)
if reactor_input['timestep_type'] == 'cumulative':
depletion_timesteps = _convert_cumulative_to_stepwise(depletion_timesteps)
timestep_units = reactor_input['timestep_units']
depletion_timesteps = _scale_depletion_timesteps(timestep_units,
depletion_timesteps,
codename)
reactor_input['depletion_timesteps'] = depletion_timesteps.tolist()
reactor_input['power_levels'] = power_levels.tolist()
return reactor_input
def _validate_depletion_timesteps_power_levels(n_depletion_steps,
depletion_timesteps,
power_levels):
"""Ensure that the number of depletion timesteps and power levels match
`n_depletion_steps` if it is given. Otherwise, compare the lengths of
`depletion_timesteps` and `power_levels`"""
if n_depletion_steps is not None:
if n_depletion_steps < 0.0 or not int:
raise ValueError('There must be a positive integer number'
' of depletion steps. Provided'
f' n_depletion_steps: {n_depletion_steps}')
if len(depletion_timesteps) == 1:
depletion_timesteps = np.repeat(depletion_timesteps, n_depletion_steps)
if len(power_levels) == 1:
power_levels = np.repeat(power_levels, n_depletion_steps)
if len(depletion_timesteps) != len(power_levels):
raise ValueError('depletion_timesteps and power_levels length mismatch:'
f' depletion_timesteps has {len(depletion_timesteps)}'
f' entries and power_levels has {len(power_levels)}'
' entries.')
else:
return depletion_timesteps, power_levels
def _convert_cumulative_to_stepwise(depletion_timesteps):
ts = np.diff(depletion_timesteps)
return np.concatenate(([depletion_timesteps[0]], ts))
def _scale_depletion_timesteps(timestep_units, depletion_timesteps, codename):
"""Scale `depletion_timesteps` to the correct value based on `timestep_units`"""
# serpent base timestep units are days or mwd/kg
if not(timestep_units in DAY_UNITS) and timestep_units.lower() != 'mwd/kg' and codename == 'serpent':
if timestep_units in SECOND_UNITS:
depletion_timesteps /= _SECONDS_PER_DAY
elif timestep_units in MINUTE_UNITS:
depletion_timesteps /= _MINUTES_PER_DAY
elif timestep_units in HOUR_UNITS:
depletion_timesteps /= _HOURS_PER_DAY
elif timestep_units in YEAR_UNITS:
depletion_timesteps *= _DAYS_PER_YEAR
else:
raise IOError(f'Unrecognized time unit: {timestep_units}')
return depletion_timesteps
[docs]def reprocess_materials(mats, process_file, dot_file):
"""Applies extraction reprocessing scheme to burnable materials.
Parameters
----------
mats : dict of str to Materialflow
Dictionary that contains :class:`saltproc.materialflow.Materialflow`
objects with burnable material data right after irradiation in the
core.
process_file : str
Path to the `.json` file describing the fuel reprocessing components.
dot_file : str
Path to the `.dot` describing the fuel reprocessing paths.
Returns
-------
waste_streams : dict of str to dict
Dictionary mapping material names to waste streams.
``key``
Material name.
``value``
Dictionary mapping waste stream names to
:class:`saltproc.materialflow.Materialflow`
objects representing waste streams.
extracted_mass: dict of str to float
Dictionary mapping material names to the mass in [g] of that material
removed via reprocessing.
"""
inmass = {}
extracted_mass = {}
waste_streams = OrderedDict()
thru_flows = OrderedDict()
extraction_processes = get_extraction_processes(process_file)
material_for_extraction, extraction_process_paths = \
get_extraction_process_paths(dot_file)
# iterate over materials
for mat_name, processes in extraction_processes.items():
initial_material = mats[mat_name]
waste_streams[mat_name] = {}
thru_flows[mat_name] = []
inmass[mat_name] = float(initial_material.mass)
print(f"Mass of material '{mat_name}' before reprocessing: "
f"{inmass[mat_name]} g")
if mat_name == material_for_extraction:
for i, path in enumerate(extraction_process_paths):
thru_flows[mat_name].append(initial_material)
for proc in path:
# Calculate fraction of the flow going to the process proc
divisor = float(processes[proc].mass_flowrate /
processes['core_outlet'].mass_flowrate)
# Calculate waste stream and thru flow on proccess proc
thru_flow, waste_stream = \
processes[proc].process_material(
divisor * thru_flows[mat_name][i])
waste_streams[mat_name]['waste_' + proc] = waste_stream
thru_flows[mat_name][i] = thru_flow
# Sum thru flows from all paths together
mats[mat_name] = thru_flows[mat_name][0]
print(f'1 Materal mass on path 0: '
f'{thru_flows[mat_name][0].mass}')
for idx in range(1, i + 1):
mats[mat_name] += thru_flows[mat_name][idx]
print(f'{i + 1} Materal mass on path {i}: '
f'{thru_flows[mat_name][i].mass}')
extracted_mass[mat_name] = \
inmass[mat_name] - float(mats[mat_name].mass)
# Clear memory
del extraction_processes, inmass, mat_name, processes, thru_flows
del material_for_extraction, extraction_process_paths, divisor
del thru_flow, waste_stream
return waste_streams, extracted_mass
def get_extraction_processes(process_file):
"""Parses ``extraction_processes`` objects from the `.json` file describing
processing system objects.
``extraction_processes`` objects describe components that would perform
fuel processing in a real reactor, such as a gas sparger or a nickel
filter.
Parameters
----------
process_file : str
Path to the `.json` file describing the fuel reprocessing components.
Returns
-------
extraction_processes : dict of str to dict
Dictionary mapping material names to extraction processes.
``key``
Name of burnable material.
``value``
Dictionary mapping process names to
:class:`saltproc.process.Process` objects.
"""
extraction_processes = OrderedDict()
with open(process_file) as f:
j = json.load(f)
for mat_name, procs in j.items():
extraction_processes[mat_name] = OrderedDict()
for proc_name, proc_data in procs['extraction_processes'].items():
st = proc_data['efficiency']
if proc_name == 'sparger' and st == "self":
extraction_processes[mat_name][proc_name] = \
Sparger(**proc_data)
elif proc_name == 'entrainment_separator' and st == "self":
extraction_processes[mat_name][proc_name] = \
Separator(**proc_data)
else:
extraction_processes[mat_name][proc_name] = \
Process(**proc_data)
gc.collect()
return extraction_processes
def get_extraction_process_paths(dot_file):
"""Reads directed graph that describes fuel reprocessing system structure
from a `*.dot` file.
Parameters
----------
dot_file : str
Path to the `.dot` describing the fuel reprocessing paths.
Returns
-------
mat_name : str
Name of burnable material which the reprocessing scheme applies to.
extraction_process_paths : list
List of lists containing all possible paths between `core_outlet` and
`core_inlet`.
"""
digraph = nx.drawing.nx_pydot.read_dot(dot_file)
mat_name = digraph.name
# Iterate over all possible paths between 'core_outlet' and 'core_inlet'
all_simple_paths = nx.all_simple_paths(digraph,
source='core_outlet',
target='core_inlet')
extraction_process_paths = []
for path in all_simple_paths:
extraction_process_paths.append(path)
return mat_name, extraction_process_paths
[docs]def refill_materials(mats, extracted_mass, waste_streams, process_file):
"""Makes up material loss in removal processes by adding fresh fuel.
Parameters
----------
mats : dict of str to Materialflow
Dicitionary mapping material names to
:class:`saltproc.materialflow.Materialflow` objects that have already
been reprocessed by `reprocess_materials`.
extracted_mass : dict of str to float
Dictionary mapping material names to the mass in [g] of that material
removed via reprocessing.
waste_streams : dict of str to dict
Dictionary mapping material names to waste streams from reprocessing
``key``
Material name.
``value``
Dictionary mapping waste stream names to
:class:`saltproc.materialflow.Materialflow` objects representing
waste streams.
process_file : str
Path to the `.json` file describing the fuel reprocessing components.
Returns
-------
waste_streams : dict of str to dict
Superset of the input parameter `waste_streams`. Dictionary has
keys for feed streams that map to :class:`Materialflow` objects
representing those material feed streams.
"""
feeds = get_feeds(process_file)
refill_mats = OrderedDict()
# Get feed group for each material
for mat, mat_feeds in feeds.items():
# Get each feed in the feed group
for feed_name, feed in mat_feeds.items():
scale = extracted_mass[mat] / feed.mass
refill_mats[mat] = scale * feed
waste_streams[mat]['feed_' + str(feed_name)] = refill_mats[mat]
mats[mat] += refill_mats[mat]
print('Refilled fresh material: %s %f g' %
(mat, refill_mats[mat].mass))
return waste_streams
def get_feeds(process_file):
"""Parses ``feed`` objects from `.json` file describing processing system
objects.
``feed`` objects describe material flows that replace nuclides needed to
keep the reactor operating that were removed during reprocessing.
Parameters
----------
process_file : str
Path to the `.json` file describing the fuel reprocessing components.
Returns
-------
feeds : dict of str to dict
Dictionary that maps material names to material flows
``key``
Name of burnable material.
``value``
Dictionary mapping material flow names to
:class:`saltproc.materialflow.Materialflow` objects representing
material feeds.
"""
feeds = OrderedDict()
with open(process_file) as f:
j = json.load(f)
for mat in j:
feeds[mat] = OrderedDict()
for feed_name, feed_data in j[mat]['feeds'].items():
comp = feed_data['comp']
feeds[mat][feed_name] = Materialflow(comp=comp,
density=feed_data['density'],
volume=feed_data['volume'])
return feeds