Source code for buckpy.buckpy_solver

"""
This module contains the solver functions of BuckPy.
"""

import time
from multiprocess import Process, Queue, cpu_count
import numpy as np
import pandas as pd
from numba import jit
from .buckpy_variables import *

[docs] def exec_buckpy(fric_sampling, np_distr, np_scen, np_ends, n_sim, bl_verbose = False): """ Execute buckpy methodology Parameters ---------- fric_sampling : int Switch to select the sampling method of the lateral friction: 0 for 'per Element' or 1 'per OOS Reference Length' np_distr : numpy.ndarray 3-D array describing the stochastic distributions: - dim 1: properties - dim 2: elements - dim 3: values np_scen : numpy.ndarray Array containing the pipeline desing data on assessed mesh. np_ends : numpy.ndarray Array containing information about the boundary conditions at the ends of the pipeline. n_sim : int Number of Monte-Carlo simulations to be run. Returns ------- df_pp_plot : DataFrame Definition on assessed mesh of CBF and EAF in different conditions for case to plot. df_VAP_plot : DataFrame Definition of virtual anchor points for case to plot. df_pp_buckle_prop : DataFrame Results for all buckles triggered. Other Parameters ---------------- bl_verbose : boolean, optional True if intermediate printouts are required (False by default). """ # Starting time of the solver module start_time = time.time() # Run deterministic case if bl_verbose: print("2. Run the deterministic case") np_prob_var_det = np.empty(0) np_case_det = preprocess_case(np_prob_var_det, np_scen, np_ends, bl_det = True) np_pp_buckle_det, np_case_det_updated, vap_list_det = solve_case(np_scen, np_case_det) # To be discussed: This block has been replaced to enable a like to like comparison with # Buckfast. # # Print-out force profiles for graphical display # # (Deterministic case if it buckles, or otherwise first random case that buckles) # np_pp_plot = np.empty((9, np_scen[KP].size)) # vap_list = [] # df_VAP_plot = pd.DataFrame(vap_list, columns = ['ielt VAP', 'KP VAP', 'ESF VAP']) # n_buckle = np_pp_buckle_det.shape[0] # if n_buckle > 0: # Deterministic case buckled # for i_buckle in range(n_buckle): # np_pp_buckle_det[i_buckle, SIM_PP] = 1 # np_pp_plot[P_KP] = np_scen[KP] # np_pp_plot[P_CBF_HT] = np_case_det[CBF_HT] # np_pp_plot[P_CBF_OP] = np_case_det[CBF_OP] # np_pp_plot[P_EAF_INST] = np_case_det[EAF_INST] # np_pp_plot[P_EAF_HT] = np_case_det_updated[EAF_HT] # np_pp_plot[P_EAF_P_OP] = np_case_det_updated[EAF_P_OP] # np_pp_plot[P_EAF_OP] = np_case_det_updated[EAF_OP] # np_pp_plot[P_EAF_OP_UNBUCK] = np_case_det[EAF_OP_UNBUCK] # df_VAP_plot = pd.DataFrame(vap_list_det, columns = ['ielt VAP', 'KP VAP', 'ESF VAP']) # print(' The deterministic case has buckled') # if bl_verbose: # print(f' Time taken to run the deterministic case: {time.time()-start_time:.1f}s') # # If there is no buckling on deterministic case then for result display # # generate a random case until buckling is triggered (max iteration<n_sim) # else: # for i_sim in range(n_sim): # np_prob_var = sample_randomness_case(fric_sampling, np_distr, np_scen) # np_case = preprocess_case(np_prob_var, np_scen, np_ends, bl_det = False) # np_pp_buckle, np_case_updated, vap_list = solve_case(np_scen, np_case) # n_buckle = np_pp_buckle.shape[0] # if n_buckle > 0: # Found random case buckling # for i_buckle in range(n_buckle): # np_pp_buckle[i_buckle, SIM_PP] = i_sim # np_pp_plot[P_KP] = np_scen[KP] # np_pp_plot[P_CBF_HT] = np_case[CBF_HT] # np_pp_plot[P_CBF_OP] = np_case[CBF_OP] # np_pp_plot[P_EAF_INST] = np_case[EAF_INST] # np_pp_plot[P_EAF_HT] = np_case_updated[EAF_HT] # np_pp_plot[P_EAF_P_OP] = np_case_updated[EAF_P_OP] # np_pp_plot[P_EAF_OP] = np_case_updated[EAF_OP] # np_pp_plot[P_EAF_OP_UNBUCK] = np_case[EAF_OP_UNBUCK] # # # df_VAP_plot = pd.DataFrame(vap_list, columns = ['ielt VAP', 'KP VAP', 'ESF VAP']) # print(f' The random case {i_sim} has buckled') # break # Print-out force profiles for graphical display (deterministic simulation only) np_pp_plot = np.empty((9, np_scen[KP].size)) vap_list = [] df_VAP_plot = pd.DataFrame(vap_list, columns = ['ielt VAP', 'KP VAP', 'ESF VAP']) n_buckle = np_pp_buckle_det.shape[0] for i_buckle in range(n_buckle): np_pp_buckle_det[i_buckle, SIM_PP] = 1 np_pp_plot[P_KP] = np_scen[KP] np_pp_plot[P_CBF_HT] = np_case_det[CBF_HT] np_pp_plot[P_CBF_OP] = np_case_det[CBF_OP] np_pp_plot[P_EAF_INST] = np_case_det[EAF_INST] np_pp_plot[P_EAF_HT] = np_case_det_updated[EAF_HT] np_pp_plot[P_EAF_P_OP] = np_case_det_updated[EAF_P_OP] np_pp_plot[P_EAF_OP] = np_case_det_updated[EAF_OP] np_pp_plot[P_EAF_OP_UNBUCK] = np_case_det[EAF_OP_UNBUCK] df_VAP_plot = pd.DataFrame(vap_list_det, columns = ['ielt VAP', 'KP VAP', 'ESF VAP']) print(' The deterministic case has buckled') if bl_verbose: print(f' Time taken to run the deterministic case: {time.time()-start_time:.1f}s') # Run Monte-Carlo loop if bl_verbose: print("3. Run the Monte-Carlo loop") start_time = time.time() if n_sim <= 10000: N_WORKER = 1 else: N_WORKER = cpu_count() list_buckle_prop = run_monte_carlo(N_WORKER, n_sim, np_distr, np_scen, np_ends, fric_sampling, bl_verbose = bl_verbose) if bl_verbose: print(f' Time taken to extract {len(list_buckle_prop)}' f' simulations: {time.time()-start_time:.1f}s') # Post-process 'df_pp_buckle_prop' and 'df_pp_buckle_prop' column_list = ['isim', 'KP', 'route_type', 'muax', 'mulat_op', 'HOOS', 'CBF_op', 'VAS_op'] df_pp_buckle_prop = pd.DataFrame(np.concatenate(list_buckle_prop), columns = column_list) column_list = ['KP', 'CBF_ht', 'CBF_op', 'EAF_inst', 'EAF_ht', 'EAF_p_op','EAF_op', 'beta2', 'EAF_op_unbuck'] df_pp_plot = pd.DataFrame(np.transpose(np_pp_plot), columns = column_list) return df_pp_plot, df_VAP_plot, df_pp_buckle_prop
@jit(nopython=True) def sample_randomness_case(fric_sampling, np_distr, np_scen): """ Samples the stochastic variables (HOOS and friction factors). Parameters ---------- fric_sampling : int Switch to select the sampling method of the lateral friction: 0 for 'per Element' or 1 'per OOS Reference Length' np_distr : numpy.ndarray 3-D array describing the stochastic distributions: Dim 1 = Properties, Dim 2 = Elements and Dim 3 = values np_scen : numpy.ndarray Array containing the pipeline desing data on assessed mesh. Returns ------- np_prob_var : numpy.ndarray Definition of probabilistic variables on assessed mesh. Rows correspond to properties: MUAX = 0, MULAT_HT = 1, MULAT_OP = 2, HOOS = 3 Notes ----- The returned `np_prob_var` has values by columns corresponding to mesh elements over 4 rows corresponding to properties defined above. | MUAX: Axial friction coefficient | MULAT_HT: Lateral friction coefficient for Hydrotest condition | MULAT_OP: Lateral friction coefficient for Operation condition | HOOS: Soil property indicator """ # Number of elements in the assessed mesh n_elts = np_scen[KP].size # Initialize the array to store probabilistic variables np_prob_var = np.empty((4, n_elts)) # Sample axial friction coefficient muax_rand = np.random.rand() # Sample lateral friction coefficient if fric_sampling == 0: # Sample per element mul_rand = np.random.rand(n_elts) elif fric_sampling == 1: # Sample per OOS Reference Length mul_rand = np.full(n_elts, np.random.rand()) for i in range(2, int(np.amax(np_scen[SECTION_ID])) + 1): mul_rand[np_scen[SECTION_ID] == i] = np.random.rand() # Sample HOOS factor hoos_rand = np.random.rand(n_elts) # Interpolate sampled values based on CDF arrays to get probabilistic variables for i in range(n_elts): np_prob_var[MUAX, i] = np.interp( muax_rand, np_distr[MUAX_CDF_ARRAY][i], np_distr[MUAX_ARRAY][i]) np_prob_var[MULAT_HT, i] = np.interp( mul_rand[i], np_distr[MULAT_CDF_ARRAY_HT][i], np_distr[MULAT_ARRAY_HT][i]) np_prob_var[MULAT_OP, i] = np.interp( mul_rand[i], np_distr[MULAT_CDF_ARRAY_OP][i], np_distr[MULAT_ARRAY_OP][i]) np_prob_var[HOOS, i] = np.interp( hoos_rand[i], np_distr[HOOS_CDF_ARRAY][i], np_distr[HOOS_ARRAY][i]) return np_prob_var
[docs] def preprocess_case(np_prob_var, np_scen, np_ends, bl_det = False): """ Preprocess case based on scenario definition and probabilistic variables. Parameters ---------- np_prob_var : numpy.ndarray Probabilistic variables for the case (shape: 4 x n_elements). np_scen : numpy.ndarray Scenario data array (rows = properties, columns = elements). np_ends : numpy.ndarray End boundary condition array (rows = properties, columns = ends). bl_det : bool, optional If True, use mean values; if False, use sampled values. Default is False. Returns ------- np_case : numpy.ndarray Case array (21 x n_elements) with rows: - 0 : MUAX - 1 : MULAT_HT - 2 : MULAT_OP - 3 : HOOS - 4 : CBF_HT - 5 : CBF_OP - 6 : LFF_INST - 7 : LFF_HT - 8 : LFF_OP - 9 : RFF_INST - 10 : RFF_HT - 11 : RFF_OP - 12 : EAF_INST - 13 : EAF_HT - 14 : EAF_P_OP - 15 : EAF_OP - 16 : EAF_OP_UNBUCK - 17 : LDELTAF_HT - 18 : RDELTAF_HT - 19 : LDELTAF_OP - 20 : RDELTAF_OP Notes ----- Computes CBF and frictional forces based on route type and boundary conditions, and builds effective axial force (EAF) profiles for installation, hydrotest, and operation. """ # Extracting the number of elements from the scenario array n_elts = np_scen[KP].size # Creating an empty array to store processed case-specific data np_case = np.empty((21, np_scen.shape[1])) if bl_det: # Alocating mean values to probabilistic variables np_case[MUAX] = np_scen[MUAX_MEAN] np_case[MULAT_HT] = np_scen[MULAT_HT_MEAN] np_case[MULAT_OP] = np_scen[MULAT_OP_MEAN] np_case[HOOS] = np.where(np_scen[ROUTE_TYPE] < 4, np_scen[HOOS_MEAN], np_scen[CBF_RCM]) else: # Copying probabilistic variables to the case array np_case[MUAX] = np_prob_var[MUAX] np_case[MULAT_HT] = np_prob_var[MULAT_HT] np_case[MULAT_OP] = np_prob_var[MULAT_OP] np_case[HOOS] = np_prob_var[HOOS] # Calculating buckling forces based on route types for i in range(n_elts): if np_scen[ROUTE_TYPE, i] == 1: # Straight section np_case[CBF_HT, i] = np_case[HOOS, i] * np_scen[SCHAR_HT, i] \ * np_case[MULAT_HT, i]**0.5 np_case[CBF_OP, i] = np_case[HOOS, i] * np_scen[SCHAR_OP, i] \ * np_case[MULAT_OP, i]**0.5 elif np_scen[ROUTE_TYPE, i] == 2: # Curve section np_case[CBF_HT, i] = np_case[HOOS, i] * np_case[MULAT_HT, i] \ * np_scen[SW_HT, i] * np_scen[BEND_RADIUS, i] np_case[CBF_OP, i] = np_case[HOOS, i] * np_case[MULAT_OP, i] \ * np_scen[SW_OP, i] * np_scen[BEND_RADIUS, i] elif np_scen[ROUTE_TYPE, i] == 3: # Sleeper np_case[CBF_HT, i] = np_case[HOOS, i] * np_scen[SV_HT, i] np_case[CBF_OP, i] = np_case[HOOS, i] * np_scen[SV_OP, i] elif np_scen[ROUTE_TYPE, i] == 4: # RCM np_case[CBF_HT, i] = np_case[HOOS, i] np_case[CBF_OP, i] = np_case[HOOS, i] # Calculate friction forces accounting for boundary conditions np_case[LFF_INST], np_case[RFF_INST], junk1, junk2 = calc_fric_forces( -np_case[MUAX] * np_scen[SW_INST] * np_scen[LENGTH], np_ends[REAC_INST, 0], np_ends[REAC_INST, 1]) np_case[LFF_HT], np_case[RFF_HT], np_case[LDELTAF_HT], np_case[RDELTAF_HT] = calc_fric_forces( np_case[MUAX] * np_scen[SW_HT] * np_scen[LENGTH], np_ends[REAC_HT, 0], np_ends[REAC_HT, 1]) np_case[LFF_OP], np_case[RFF_OP], np_case[LDELTAF_OP], np_case[RDELTAF_OP] = calc_fric_forces( np_case[MUAX] * np_scen[SW_OP] * np_scen[LENGTH], np_ends[REAC_OP, 0], np_ends[REAC_OP, 1]) # Adjust friction forces based on boundary conditions if np_ends[ROUTE_TYPE_BC, 0] == 2: # Fixed BC at "left" end side np_case[LFF_INST] = np.full(n_elts, -9.0E+09) np_case[LFF_HT] = np.full(n_elts, 9.0E+09) np_case[LFF_OP] = np.full(n_elts, 9.0E+09) if np_ends[ROUTE_TYPE_BC, 1] == 2: # Fixed BC at "right" end side np_case[RFF_INST] = np.full(n_elts, -9.0E+09) np_case[RFF_HT] = np.full(n_elts, 9.0E+09) np_case[RFF_OP] = np.full(n_elts, 9.0E+09) # Build up frictional effective forces np_case[EAF_INST] = np.maximum( np.maximum(np_case[LFF_INST], np_case[RFF_INST]), np_scen[RLT]) np_case[EAF_HT] = np.minimum( np.minimum(np_case[LFF_HT], np_case[RFF_HT]), np_scen[FRF_HT]) np_case[EAF_P_OP] = np.minimum( np.minimum(np_case[LFF_OP], np_case[RFF_OP]), np_scen[FRF_P_OP]) np_case[EAF_OP] = np.minimum( np.minimum(np_case[LFF_OP], np_case[RFF_OP]), np_scen[FRF_OP]) np_case[EAF_OP_UNBUCK] = np.minimum( np.minimum(np_case[LFF_OP], np_case[RFF_OP]), np_scen[FRF_OP]) return np_case
[docs] def calc_fric_forces(np_array, restr_left, restr_right): """ Accumulate friction forces from each end of the array. The calculated values are at the center of each element, accounting for the average increase between the element and its surrounding elements. Parameters ---------- np_array : numpy.ndarray Array containing the local friction force for each element without accounting for surrounding values. restr_left : float Constant force to be added at the left of the force profile. restr_right : float Constant force to be added at the right of the force profile. Returns ------- np_cumsum_left: numpy.ndarray Cumulated friction force from the left. np_cumsum_right: numpy.ndarray Cumulated friction force from the right. np_rolling_mean_left: numpy.ndarray Average friction force increment from the left. np_rolling_mean_right: numpy.ndarray Average friction force increment from the right. """ # Cumulate the effective axial friction forces ret = np.cumsum(np_array, dtype=float) # Calculate the average increase between each element and its surrounding elements ret[2:] = ret[2:] - ret[:-2] # Compute the rolling mean for the left and right ends np_rolling_mean_left = np.append(restr_left + np_array[0] / 2.0, ret[1:] / 2.0) np_rolling_mean_right = np.append(ret[1:] / 2.0, restr_right + np_array[-1] / 2.0) # Compute the cumulative effective axial forces from the left and right np_cumsum_left = np.cumsum(np_rolling_mean_left) np_cumsum_right = np.cumsum(np_rolling_mean_right[::-1])[::-1] return np_cumsum_left, np_cumsum_right, np_rolling_mean_left, np_rolling_mean_right
@jit(nopython=True) def solve_case(np_scen, np_case): """ Calculate effective force profiles during hydrotest and operation and identify buckle properties. Parameters ---------- np_scen : numpy.ndarray Scenario array (rows = properties, columns = elements). np_case : numpy.ndarray Case array (rows = properties, columns = elements). Returns ------- np_pp_buckle : numpy.ndarray 2D array (n_buckles x 8) with columns: - 0 : SIM_PP - 1 : KP_PP - 2 : ROUTE_TYPE_PP - 3 : MUAX_PP - 4 : MULAT_OP_PP - 5 : HOOS_PP - 6 : CBF_OP_PP - 7 : VAS_PP np_case : numpy.ndarray Updated case array (21 x n_elements). vap_list : list[list] [element_index, KP, ESF_op] for virtual anchor points (sorted by KP). Notes ----- Buckle identification is performed in stages (HT, OP pressure-only, OP pressure+temperature), updating EAF profiles after each buckle is found. """ # Step 1: Get order and location of potential buckles from HT step np_beta = (np_case[CBF_HT] - np_case[EAF_INST]) / (np_scen[FRF_HT] - np_scen[RLT]) np_beta = np.around(np_beta, decimals = 6) sorted_beta_array = np.argsort(np_beta, kind = "mergesort") # Step 2: Compare local driving force during HT with CBF to identify actual buckles, # in the order of the potential buckles np_buckle_id = np.empty(0) for i in sorted_beta_array: if np_case[EAF_HT, i] >= np_case[CBF_HT, i]: np_buckle_id = np.append(np_buckle_id, i) np_ff_buckle = calc_friction_force_from_buckle(np_scen, np_case, i, "HT") np_case[EAF_HT] = np.minimum(np_ff_buckle, np_case[EAF_HT]) # Step 3a: Set up the EAF OP Pressure profile with the buckle locations from HT np_ff_buckle = calc_friction_force_from_buckle(np_scen, np_case, i, "OP") np_case[EAF_P_OP] = np.minimum(np_ff_buckle, np_case[EAF_P_OP]) # Step 5a: Set up the EAF OP profile with the buckle locations from HT np_case[EAF_OP] = np.minimum(np_ff_buckle, np_case[EAF_OP]) # Step 3b: Identify the location and order of additional potential buckles # during OP (Pressure only) np_beta = (np_case[CBF_OP] - np_case[EAF_INST]) / (np_scen[FRF_P_OP] - np_scen[RLT]) np_beta = np.around(np_beta, decimals = 6) sorted_beta_array = np.argsort(np_beta, kind = "mergesort") # Step 4: Compare the local driving force with the CBF during OP (pressure only) to identify # actual buckles, taking into account the actual buckles from HT for i in sorted_beta_array: if np_case[EAF_P_OP, i] >= np_case[CBF_OP, i]: np_buckle_id = np.append(np_buckle_id, i) np_ff_buckle = calc_friction_force_from_buckle(np_scen, np_case, i, "OP") np_case[EAF_P_OP] = np.minimum(np_ff_buckle, np_case[EAF_P_OP]) # Step 5a: Set up the EAF OP profile with the buckle locations from OP pressure only np_case[EAF_OP]=np.minimum(np_ff_buckle, np_case[EAF_OP]) # Step 5b: Calculate Beta2 to identify the order and location of the potential buckles from the # temperature in operation # TODO: Explain in the manual that this calculation assumes linear temperature variations. np_beta = ( np_case[CBF_OP] - np_case[EAF_INST] - ( np_scen[FRF_P_OP] - np_scen[RLT])) / np_scen[FRF_T_OP] np_beta = np.around(np_beta, decimals = 6) sorted_beta_array = np.argsort(np_beta, kind = "mergesort") # Step 6: Compare the local driving force with the CBF (pressure + temperature) to find # actual buckles for i in sorted_beta_array: if np_case[EAF_OP, i] >= np_case[CBF_OP, i]: np_buckle_id = np.append(np_buckle_id, i) np_ff_buckle = calc_friction_force_from_buckle(np_scen, np_case, i, "OP") np_case[EAF_OP] = np.minimum(np_ff_buckle, np_case[EAF_OP]) # Extract the number of elements from the scenario array n_elts = np_scen[KP].size # Drop buckle elements duplicated between hydrotest and operation steps # [can happen if the CBF < Buckle residual force] np_buckle_id = np.unique(np_buckle_id) # Initiate post-processing outputs array np_pp_buckle = np.empty((np_buckle_id.size, 8)) vap_list = [] if len(np_buckle_id) > 0: # At least one buckle detected for i_buckle in range(len(np_buckle_id)): ielt_buckle = np.sort(np_buckle_id.astype(np.int64))[i_buckle] # Identify possible VAP (surrounding buckling susceptibility areas) np_ff_buckle = calc_friction_force_from_buckle(np_scen, np_case, ielt_buckle, "OP") np_possible_vap_id = np.where( np.isclose(np_case[EAF_OP], np_ff_buckle.astype(np.float64)) == False)[0] # Add first and last elements to possible VAP for fixed end cases if not (0 in np_possible_vap_id): np_possible_vap_id = np.append(np_possible_vap_id, 0) if not (n_elts in np_possible_vap_id): np_possible_vap_id = np.append(np_possible_vap_id, n_elts) np_possible_vap_id=np.sort(np_possible_vap_id) # For each buckle, look for surrounding VAP and calculate KP if ielt_buckle <= np.min(np_possible_vap_id): ivap_left = ielt_buckle else: np_distance_left = np.absolute( np_possible_vap_id[np_possible_vap_id < ielt_buckle]-ielt_buckle) ivap_left = min( ielt_buckle, np_possible_vap_id[ np_possible_vap_id < ielt_buckle][np_distance_left.argmin()] + 1) if ielt_buckle >= np.max(np_possible_vap_id): ivap_right = ielt_buckle else: np_distance_right = np.absolute( np_possible_vap_id[np_possible_vap_id > ielt_buckle]-ielt_buckle) ivap_right = max( ielt_buckle, np_possible_vap_id[ np_possible_vap_id > ielt_buckle][np_distance_right.argmin()] - 1) if ivap_left < ivap_right: # Buckle detected # TODO: add switch to calculate KP of actual VAP instead of centroid of elements # TODO: ivap_left and ivap_right where VAP are located if ivap_left == 0: kp_vap_left = np_scen[KP][ivap_left] - np_scen[LENGTH][ivap_left] else: kp_vap_left = np_scen[KP][ivap_left] - np_scen[LENGTH][ivap_left] if ivap_right == (n_elts - 1): kp_vap_right = np_scen[KP][ivap_right] + np_scen[LENGTH][ivap_right] else: kp_vap_right = np_scen[KP][ivap_right] + np_scen[LENGTH][ivap_right] # Add new VAP to list if not already identified temp_list = [ivap_left, kp_vap_left, np_case[EAF_OP][ivap_left]] if temp_list not in vap_list: vap_list.append(temp_list) temp_list = [ivap_right, kp_vap_right, np_case[EAF_OP][ivap_right]] if temp_list not in vap_list: vap_list.append(temp_list) else: # not really triggered buckle kp_vap_right = np_scen[KP, ielt_buckle] kp_vap_left = kp_vap_right # Fill up the array of outputs for post-processing np_pp_buckle[i_buckle, KP_PP] = np_scen[KP, ielt_buckle] np_pp_buckle[i_buckle, ROUTE_TYPE_PP] = np_scen[ROUTE_TYPE, ielt_buckle] np_pp_buckle[i_buckle, MUAX_PP] = np_case[MUAX, ielt_buckle] np_pp_buckle[i_buckle, MULAT_OP_PP] = np_case[MULAT_OP, ielt_buckle] np_pp_buckle[i_buckle, HOOS_PP] = np_case[HOOS, ielt_buckle] np_pp_buckle[i_buckle, CBF_OP_PP] = np_case[CBF_OP, ielt_buckle] np_pp_buckle[i_buckle, VAS_PP] = kp_vap_right - kp_vap_left # End of condition on presence of buckle vap_list = sorted(vap_list, key = lambda x: x[1]) return np_pp_buckle, np_case, vap_list @jit(nopython=True) def calc_friction_force_from_buckle(np_scen, np_case, ielt_buckle, phase_str): """ Calculate the effective axial frictional forces for each element in a scenario. Parameters ---------- np_scen : numpy.ndarray 2D array containing scenario-specific data, where rows represent different parameters and columns represent elements. np_case : numpy.ndarray 2D array containing case-specific data, where rows represent different parameters and columns represent elements. ielt_buckle : int Index of the buckle element. phase_str : str Phase label indicating whether the calculation is for "HT" (hydrotest) or "OP" (operational) phase. Returns ------- np_ff_buckle : numpy.ndarray 1D array containing the computed friction forces for each element in the scenario. """ # Extract the number of elements from the scenario array n_elts = np_scen[KP].size if phase_str == "HT": cbf = np_case[CBF_HT, ielt_buckle] residual_buckle_force = np_scen[EAF_BUCKLE_HT, ielt_buckle] residual_buckle_length = np_scen[L_BUCKLE_HT, ielt_buckle] i_sw = SW_HT i_ldeltaf = LDELTAF_HT i_rdeltaf = RDELTAF_HT elif phase_str == "OP": cbf = np_case[CBF_OP, ielt_buckle] residual_buckle_force = np_scen[EAF_BUCKLE_OP, ielt_buckle] residual_buckle_length = np_scen[L_BUCKLE_OP, ielt_buckle] i_sw = SW_OP i_ldeltaf = LDELTAF_OP i_rdeltaf = RDELTAF_OP else: print(f'Unrecognised phase label {phase_str}') # The residual buckle force should be capped by the minimum CBF residual_buckle_force = min(residual_buckle_force, cbf) # Find the index of elements closer to "left" side of buckle kp_flat_left = np_scen[KP, ielt_buckle] - residual_buckle_length / 2.0 np_distance_left = np.absolute(np_scen[KP] - kp_flat_left) ielt_left = np_distance_left.argmin() if np_scen[KP,ielt_left] > kp_flat_left: ielt_left = max(0, ielt_left-1) # Find the index of elements closer to "right" side of buckle kp_flat_right = np_scen[KP, ielt_buckle] + residual_buckle_length / 2.0 np_distance_right = np.absolute(np_scen[KP] - kp_flat_right) ielt_right = np_distance_right.argmin() if np_scen[KP, ielt_right] < kp_flat_right: ielt_right = min(ielt_right + 1, n_elts - 1) # Calculate the incremental effective axial frictional forces eaf_left_centroid = residual_buckle_force + np_case[MUAX, ielt_left] \ * np_scen[i_sw, ielt_left] * (kp_flat_left - np_scen[KP, ielt_left]) eaf_right_centroid = residual_buckle_force + np_case[MUAX, ielt_right] \ * np_scen[i_sw, ielt_right] * (np_scen[KP, ielt_right] - kp_flat_right) # Compute rolling means for the left and right sections around the buckle np_rolling_mean_left = np.append(np.zeros(ielt_right+1-1), eaf_right_centroid) np_rolling_mean_left = np.append(np_rolling_mean_left, np_case[i_ldeltaf][(ielt_right+1):]) np_rolling_mean_right = np.append(np_case[i_rdeltaf][:ielt_left], eaf_left_centroid) np_rolling_mean_right = np.append(np_rolling_mean_right, np.zeros(n_elts - (ielt_left+1))) # Compute left and right cumulative sums of the effective axial forces for each element np_lff_buckle = np.cumsum(np_rolling_mean_left) np_rff_buckle = np.cumsum(np_rolling_mean_right[::-1])[::-1] # Determine the maximum effective axial frictional force for each element np_ff_buckle = np.maximum(np.maximum(np_lff_buckle,residual_buckle_force), np_rff_buckle) return np_ff_buckle
[docs] def run_monte_carlo(n_worker, n_sim, np_distr, np_scen, np_ends, fric_sampling, bl_verbose = False): """ Set to run the Monte-Carlo simulations using multiple workers and gather the properties of the buckles that have triggered. Parameters ---------- n_worker : int Number of worker processes spawned in the multiprocess. n_sim : int Number of Monte-Carlo simulations to be run. np_distr: NumPy array 3-D numpy array containing the stochastic distributions (dim 1: properties, dim 2: elements, dim 3: values). np_scen : NumPy array Numpy array containing the design data along the pipeline route (mesh) that remains constant among deterministic and Monte-Carlo simulations (except the stochastic distributions). np_ends : NumPy array Returns a numpy array with the definition of the tie-ins at the pipeline ends. fric_sampling : int Switch to select the sampling method of the soil lateral friction factor (0 for 'per Element' or 1 'per OOS Reference Length'). Returns ---------- list_buckle_prop : List[np.ndarray] Array containing for each simulation the properties of the buckle(s) that has(ve) triggered. Other Parameters ---------------- bl_verbose : boolean, optional True if intermediate printouts are required (False by default). """ mp_output = Queue() processes = [] if bl_verbose: start_time = time.time() # Distribute simulations across worker processes for i_worker in range(n_worker): if i_worker < (n_worker - 1): processes.append(Process(target=exec_worker_stack, args=(int(i_worker*int(n_sim/n_worker)), int((i_worker+1)*int(n_sim/n_worker)), np_distr, np_scen, np_ends, fric_sampling, mp_output))) else: processes.append(Process(target=exec_worker_stack, args=(int(i_worker*int(n_sim/n_worker)), int(n_sim), np_distr, np_scen, np_ends, fric_sampling, mp_output))) # Start worker processes for p in processes: p.start() if bl_verbose: print(f' Time taken to start the queue and {n_worker} worker process: {time.time() - start_time:.1f}s') # Wait for output from worker processes while mp_output.qsize() == 0: time.sleep(1) buckle_prop = [] # Collect buckle properties from the output queue if bl_verbose: n_sim_display_list = [10**i for i in range(1+round(np.log10(n_sim)))] while mp_output: try: buckle_prop.append(mp_output.get(timeout=10)) except: break if bl_verbose: for n_sim_display in n_sim_display_list: if len(buckle_prop) >= n_sim_display: print(f' Time taken to solve {n_sim_display:.0f} / {n_sim:.0f} simulations: {time.time() - start_time:.1f}s') n_sim_display_list.remove(n_sim_display) return buckle_prop
[docs] def exec_worker_stack(n_sim_start, n_sim_end, np_distr, np_scen, np_ends, fric_sampling, mp_output): """ Run a stack of random simulations using a single worker and add the results to the post-processing queue mp_output. Parameters ---------- n_sim_start : int Number of the first simulation to be executed by the current worker. n_sim_end : int Number of the last simulation to be executed by the current worker (i.e., this simulation number is not executed in this call to the function). np_distr: NumPy array 3-D numpy array containing the stochastic distributions (dim 1: properties, dim 2: elements, dim 3: values). np_scen : NumPy array Numpy array containing the design data along the pipeline route (mesh) that remains constant among deterministic and Monte-Carlo simulations (except the stochastic distributions). np_ends : NumPy array Returns a numpy array with the definition of the tie-ins at the pipeline ends. fric_sampling : int Switch to select the sampling method of the soil lateral friction factor (0 for 'per Element' or 1 'per OOS Reference Length'). mp_output: Queue Results are added to this Queue for subsequent post-processing. """ # Iterate over the range of simulations assigned to the worker for i_sim in range(n_sim_start, n_sim_end): # Sample randomness for the current simulation np_prob_var = sample_randomness_case(fric_sampling, np_distr, np_scen) # Preprocess the simulation case np_case = preprocess_case(np_prob_var, np_scen, np_ends, bl_det = False) # Solve the simulation case to find potential buckle properties np_pp_buckle, junk1, junk2 = solve_case(np_scen, np_case) # Get the number of buckles triggered in the simulation n_buckle = np_pp_buckle.shape[0] # If buckles triggered, adjust the simulation number of the buckle and add to the queue if n_buckle > 0: for i_buckle in range(n_buckle): np_pp_buckle[i_buckle, SIM_PP] = i_sim mp_output.put(np_pp_buckle)