Source code for climlab.process.time_dependent_process

from builtins import str, range
import numpy as np
import copy
from .process import Process
from climlab.utils import walk, ProcNameWarning, constants as const
from climlab.utils.attrdict import AttrDict
import warnings


[docs] def couple(proclist, name='Parent'): # Union of the two state dictionaries new_state = AttrDict() new_input = AttrDict() all_input = {} all_diagnotics_list = [] timestep = np.timedelta64(const.seconds_per_day * 365 * 1000000, 's') # very long! for proc in proclist: timestep = np.minimum(timestep, proc.timestep) for key in proc.state: new_state[key] = proc.state[key] all_diagnotics_list += list(proc.diagnostics.keys()) for key in proc.input: all_input[key] = proc.input[key] for key in all_input: if (key not in new_state) and (key not in all_diagnotics_list): # This quantity is still a necessary input for the parent process new_input[key] = all_input[key] # The newly created parent process has the minimum timestep coupled = TimeDependentProcess(state=new_state, timestep=timestep, name=name) current_time = proclist[0].current_time # need to synchronize time across processes coupled.current_time = current_time warnings.filterwarnings("error", category=ProcNameWarning) # Warning becomes error try: for proc in proclist: coupled.add_subprocess(proc.name, proc) except ProcNameWarning: raise ValueError('Coupled subprocesses must have unique names.') for key in new_input: coupled.add_input(key, new_input[key]) warnings.resetwarnings() return coupled
[docs] class TimeDependentProcess(Process): r"""A generic parent class for all time-dependent processes. ``TimeDependentProcess`` is a child of the :class:`~climlab.process.process.Process` class and therefore inherits all those attributes. **Initialization parameters** An instance of ``TimeDependentProcess`` is initialized with the following arguments *(for detailed information see Object attributes below)*: :param float timestep: specifies the timestep of the object (in seconds or numpy.timedelta64) [default: 1 day] :param str time_type: how time-dependent-process should be computed [default: 'explicit'] :param bool topdown: whether geneterate *process_types* in regular or in reverse order [default: True] **Object attributes** Additional to the parent class :class:`~climlab.process.process.Process` following object attributes are generated during initialization: :ivar bool has_process_type_list: information whether attribute *process_types* (which is needed for :func:`compute` and build in :func:`_build_process_type_list`) exists or not. Attribute is set to ``'False'`` during initialization. :ivar bool topdown: information whether the list *process_types* (which contains all processes and sub-processes) should be generated in regular or in reverse order. See :func:`_build_process_type_list`. :ivar dict timeave: a time averaged collection of all states and diagnostic processes over the timeperiod that :func:`integrate_years` has been called for last. :ivar dict tendencies: computed difference in a timestep for each state. See :func:`compute` for details. :ivar str time_type: how time-dependent-process should be computed. Possible values are: ``'explicit'``, ``'implicit'``, ``'diagnostic'``, ``'adjustment'``. :ivar dict time: a collection of all time-related attributes of the process. The dictionary contains following items: * ``'timestep'``: The model timestep as np.timedelta64 object * ``'steps'``: counter how many steps have been integrated in total * ``'initial_time'``: The initial time when the Process is first created as np.datetime64 object [default: Jan. 1 1970] * ``'current_time'``: The current model time as np.datetime64 object (should be equal to initial_time + steps*timestep) * ``'active_now'``: Boolean that indicates whether the Process is currently active (used for asynchronous coupling) """ def __str__(self): str1 = super(TimeDependentProcess, self).__str__() str1 += 'Current time is {}'.format(self.current_time) return str1 def __init__(self, time_type='explicit', timestep=const.seconds_per_day, initial_time=np.datetime64('1970-01-01T00:00'), topdown=True, **kwargs): # Create the state dataset self.tendencies = {} super(TimeDependentProcess, self).__init__(**kwargs) for name, var in self.state.items(): self.tendencies[name] = var * 0. self.timeave = {} self.time = {'initial_time': initial_time, 'current_time': initial_time, 'steps': 0, 'active_now': True,} self.timestep = timestep self.time_type = time_type self.topdown = topdown self.has_process_type_list = False def __add__(self, other): newparent = couple([self,other]) return newparent @property def timestep(self): r"""The amount of time over which :func:`step_forward` is integrating in unit seconds. :getter: Returns the object timestep which is stored in ``self.time['timestep']``. :setter: Sets the timestep to the given input. :type: float """ return self.time['timestep'] @timestep.setter def timestep(self, value): # Convert to timedelta64 in seconds if necessary if type(value) is not np.timedelta64: # assume the value is in seconds value = np.timedelta64(int(value), 's') self.time['timestep'] = value self.param['timestep'] = value @property def timestep_in_seconds(self): """Return a float value representing the timestep in units of seconds""" return self.timestep / np.timedelta64(1, 's') @property def current_time(self): return self.time['current_time'] @current_time.setter def current_time(self, value): self.time['current_time'] = value for name, proc in self.subprocess.items(): proc.current_time = value # Synchronize time across subprocesses @property def elapsed_time(self): return self.current_time - self.time['initial_time']
[docs] def set_state(self, name, value): super(TimeDependentProcess, self).set_state(name,value) # Make sure that the new state variable is added to the tendencies dict self.tendencies[name] = value * 0.
[docs] def compute(self): r"""Computes the tendencies for all state variables given current state and specified input. The function first computes all diagnostic processes. They don't produce any tendencies directly but they may affect the other processes (such as change in solar distribution). Subsequently, all tendencies and diagnostics for all explicit processes are computed. Tendencies due to implicit and adjustment processes need to be calculated from a state that is already adjusted after explicit alteration. For that reason the explicit tendencies are applied to the states temporarily. Now all tendencies from implicit processes are calculated by matrix inversions and similar to the explicit tendencies, the implicit ones are applied to the states temporarily. Subsequently, all instantaneous adjustments are computed. Then the changes that were made to the states from explicit and implicit processes are removed again as this :class:`~climlab.process.time_dependent_process.TimeDependentProcess.compute()` function is supposed to calculate only tendencies and not apply them to the states. Finally, all calculated tendencies from all processes are collected for each state, summed up and stored in the dictionary ``self.tendencies``, which is an attribute of the time-dependent-process object, for which the :class:`~climlab.process.time_dependent_process.TimeDependentProcess.compute()` method has been called. **Object attributes** During method execution following object attributes are modified: :ivar dict tendencies: dictionary that holds tendencies for all states is calculated for current timestep through adding up tendencies from explicit, implicit and adjustment processes. :ivar dict diagnostics: process diagnostic dictionary is updated by diagnostic dictionaries of subprocesses after computation of tendencies. """ # First reset tendencies to zero -- recomputing them is the point of this method for varname in self.tendencies: self.tendencies[varname] *= 0. # Also reset all diagnostics to zero -- they will be accumulated from subprocesses for diagname, diag in self.diagnostics.items(): diag *= 0. if not self.has_process_type_list: self._build_process_type_list() tendencies = {} ignored = self._compute_type('diagnostic') tendencies['explicit'] = self._compute_type('explicit') # Tendencies due to implicit and adjustment processes need to be # calculated from a state that is already adjusted after explicit stuff # So apply the tendencies temporarily and then remove them again for name, var in self.state.items(): var += tendencies['explicit'][name] * self.timestep_in_seconds # Now compute all implicit processes -- matrix inversions tendencies['implicit'] = self._compute_type('implicit') # Same deal ... temporarily apply tendencies from implicit step for name, var in self.state.items(): var += tendencies['implicit'][name] * self.timestep_in_seconds # Finally compute all instantaneous adjustments -- expressed as explicit forward step tendencies['adjustment'] = self._compute_type('adjustment') # Now remove the changes from the model state for name, var in self.state.items(): var -= ( (tendencies['implicit'][name] + tendencies['explicit'][name]) * self.timestep_in_seconds) # Sum up all subprocess tendencies for proctype in ['explicit', 'implicit', 'adjustment']: for varname, tend in tendencies[proctype].items(): self.tendencies[varname] += tend # Finally compute my own tendencies, if any self_tend = self._compute() # Adjustment processes _compute method returns absolute adjustment # Needs to be converted to rate of change if self.time_type == 'adjustment': for varname, adj in self_tend.items(): self_tend[varname] /= self.timestep_in_seconds for varname, tend in self_tend.items(): self.tendencies[varname] += tend # Now accumulate additive diagnostics from subprocesses for procname, proc in self.subprocess.items(): for diagname, value in proc.diagnostics.items(): if self.__dict__[diagname].shape == value.shape: self.__dict__[diagname] += value return self.tendencies
def _compute_type(self, proctype): r"""Computes tendencies due to all subprocesses of given type ``'proctype'``. Also pass all diagnostics up to parent process.""" tendencies = {} for varname in self.state: tendencies[varname] = 0. * self.state[varname] for proc in self.process_types[proctype]: # Asynchronous coupling # if subprocess has longer timestep than parent # We compute subprocess tendencies once # and apply the same tendency at each substep step_ratio = int(proc.timestep / self.timestep) # Does the number of parent steps divide evenly by the ratio? # If so, it's time to do a subprocess step. if self.time['steps'] % step_ratio == 0: proc.time['active_now'] = True tenddict = proc.compute() else: # proc.tendencies is unchanged from last subprocess timestep if we didn't recompute it above proc.time['active_now'] = False tenddict = proc.tendencies for name, tend in tenddict.items(): tendencies[name] += tend return tendencies def _compute(self): """Where the tendencies are actually computed... Needs to be implemented for each daughter class Returns a dictionary with same keys as self.state""" tendencies = {} for name, value in self.state.items(): tendencies[name] = value * 0. return tendencies def _build_process_type_list(self): r"""Generates lists of processes organized by process type. Following object attributes are generated or updated: :ivar dict process_types: a dictionary with entries: ``'diagnostic'``, ``'explicit'``, ``'implicit'`` and ``'adjustment'`` which point to a list of processes according to the process types. The ``process_types`` dictionary is created while walking through the processes with :func:`~climlab.utils.walk.walk_processes` CHANGING THIS TO REFER ONLY TO THE CURRENT LEVEL IN SUBPROCESS TREE """ self.process_types = {'diagnostic': [], 'explicit': [], 'implicit': [], 'adjustment': []} for name, proc in self.subprocess.items(): self.process_types[proc.time_type].append(proc) self.has_process_type_list = True
[docs] def step_forward(self): r"""Updates state variables with computed tendencies. Calls the :func:`compute` method to get current tendencies for all process states. Multiplied with the timestep and added up to the state variables is updating all model states. :Example: :: >>> import climlab >>> model = climlab.EBM() >>> # checking time step counter >>> model.time['steps'] 0 >>> # stepping the model forward >>> model.step_forward() >>> # step counter increased >>> model.time['steps'] 1 """ tenddict = self.compute() # Total tendency is applied as an explicit forward timestep # (already accounting properly for order of operations in compute() ) for varname, tend in tenddict.items(): self.state[varname] += tend * self.timestep_in_seconds # Update all time counters for this and all subprocesses in the tree self.current_time += self.timestep # setter method synchronizes subprocesses for name, proc, level in walk.walk_processes(self, ignoreFlag=True): if proc.time['active_now']: proc.time['steps'] += 1
# proc._update_time()
[docs] def compute_diagnostics(self, num_iter=3): r"""Compute all tendencies and diagnostics, but don't update model state. By default it will call compute() 3 times to make sure all subprocess coupling is accounted for. The number of iterations can be changed with the input argument. """ for n in range(num_iter): ignored = self.compute()
[docs] def integrate_years(self, years=1.0, verbose=True): r"""Integrates the model by a given number of years. :param float years: integration time for the model in years [default: 1.0] :param bool verbose: information whether model time details should be printed [default: True] It calls :func:`step_forward` repetitively and calculates a time averaged value over the integrated period for every model state and all diagnostics processes. :Example: :: >>> import climlab >>> model = climlab.EBM() >>> model.global_mean_temperature() Field(11.997968598413685) >>> model.integrate_years(2.) Integrating for 180 steps, 730.4844 days, or 2.0 years. Total elapsed time is 2.0 years. >>> model.global_mean_temperature() Field(13.531055349437258) """ numsteps = int(np.timedelta64(int(const.seconds_per_year), 's') / self.timestep * years) if verbose: print("Integrating for {} steps or {} years.".format(numsteps, years)) # begin time loop for count in range(numsteps): # Compute the timestep self.step_forward() if count == 0: # on first step only... # This implements a generic time-averaging feature # using the list of model state variables self.timeave = self.state.copy() # add any new diagnostics to the timeave dictionary self.timeave.update(self.diagnostics) # reset all values to zero for varname, value in self.timeave.items(): # moves on to the next varname if value is None # this preserves NoneType diagnostics if value is None: continue self.timeave[varname] = 0*value # adding up all values for each timestep for varname in list(self.timeave.keys()): try: self.timeave[varname] += self.state[varname] except: try: self.timeave[varname] += self.diagnostics[varname] except: pass # calculating mean values through dividing the sum by number of steps for varname, value in self.timeave.items(): if value is None: continue self.timeave[varname] /= numsteps if verbose: print("Total elapsed time is {} or {:.4f} years.".format(self.elapsed_time, self.elapsed_time/np.timedelta64(1, 's') / const.seconds_per_year))
[docs] def integrate_days(self, days=1.0, verbose=True): r"""Integrates the model forward for a specified number of days. It convertes the given number of days into years and calls :func:`integrate_years`. :param float days: integration time for the model in days [default: 1.0] :param bool verbose: information whether model time details should be printed [default: True] :Example: :: >>> import climlab >>> model = climlab.EBM() >>> model.global_mean_temperature() Field(11.997968598413685) >>> model.integrate_days(80.) Integrating for 19 steps, 80.0 days, or 0.219032740466 years. Total elapsed time is 0.211111111111 years. >>> model.global_mean_temperature() Field(11.873680783355553) """ years = days / const.days_per_year self.integrate_years(years=years, verbose=verbose)
[docs] def integrate_converge(self, crit=1e-4, verbose=True): r"""Integrates the model until model states are converging. :param crit: exit criteria for difference of iterated solutions [default: 0.0001] :type crit: float :param bool verbose: information whether total elapsed time should be printed [default: True] :Example: :: >>> import climlab >>> model = climlab.EBM() >>> model.global_mean_temperature() Field(11.997968598413685) >>> model.integrate_converge() Total elapsed time is 10.0 years. >>> model.global_mean_temperature() Field(14.288155406577301) """ # implemented by m-kreuzer for varname, value in self.state.items(): value_old = copy.deepcopy(value) self.integrate_years(1,verbose=False) while np.max(np.abs(value_old-value)) > crit : value_old = copy.deepcopy(value) self.integrate_years(1,verbose=False) if verbose == True: print("Total elapsed time is {} or {:.4f} years.".format(self.elapsed_time, self.elapsed_time/np.timedelta64(1, 's') / const.seconds_per_year))