Source code for buckpy.buckpy_preprocessing_current

"""
This module contains the pre-processing functions of BuckPy.
"""
import time
import numpy as np
import pandas as pd
from scipy.stats import lognorm
import pysubsea as ss

[docs] def calc_lognorm_hoos(type_elt, length_elt, hoos_mean, hoos_std, length_ref, rcm_charac): """ Compute the parameters of the horizontal out-of-straightness (HOOS) lognormal distribution for different types of elements (e.g., Straight, Bend, Sleeper, RCM). This function takes into account the scaling factor of the HOOS distribution. For RCM, the HOOS factor is not a factor but the critical buckling force. Parameters ---------- type_elt : str Type of the element. length_elt : float Length of the element. hoos_mean : float Mean of the HOOS distribution. hoos_std : float Standard deviation of the HOOS distribution. length_ref : float Reference length. rcm_charac : float Characteristic buckling force for the Residual Curvature Method (RCM). Returns ------- x_range : numpy.ndarray An array of values representing the range of the friction factor distribution between probabilities of exceedance between 0.01% and 99.99%. cdf_range : numpy.ndarray An array of cumulative density function (CDF) values corresponding to `x_range`. Notes ----- This function computes the parameters of a lognormal distribution for different types of elements such as Straight, Bend, Sleeper, and RCM (Residual Curvature Method). It calculates the cumulative density function (CDF) for the generated range of values based on the HOOS distribution parameters. """ # Extract the type of element (e.g., Straight, Bend, Sleeper, RCM) type_elt_split = type_elt.split(" ")[0] # Compute the ratio of the reference length to the element length n = length_ref / length_elt if type_elt_split == "Straight" or type_elt_split == "Bend": # Calculate parameters for straight or bend elements shape_hoos = np.sqrt(np.log(1 + hoos_std**2 / hoos_mean**2)) scale_hoos = np.log(hoos_mean**2 / (np.sqrt(hoos_mean**2 + hoos_std**2))) # Define the range of the HOOS distribution hoos_lower = 0.0 hoos_upper = 20.0 x = np.linspace(hoos_lower, hoos_upper, 200000) # Calculate the cumulative density function (CDF) considering the scaling factor cdf = 1-(1-lognorm.cdf(x, shape_hoos, 0.0, np.exp(scale_hoos)))**(1/n) # Generate a range of CDF values cdf_range = np.arange(0.0, 1.0, 0.0001) # Interpolate to get the corresponding values of the distribution x_range = np.interp(cdf_range, cdf, x) elif type_elt_split == "Sleeper": # Calculate parameters for sleeper elements shape_hoos = np.sqrt(np.log(1 + hoos_std**2 / hoos_mean**2)) scale_hoos = np.log(hoos_mean**2 / (np.sqrt(hoos_mean**2 + hoos_std**2))) # Calculate the lower and upper bounds of the distribution for sleeper elements hoos_lower = lognorm(shape_hoos, 0.0, np.exp(scale_hoos)).ppf(0.0001) hoos_upper = lognorm(shape_hoos, 0.0, np.exp(scale_hoos)).ppf(0.9999) # Generate a range of values within the distribution x_range = np.linspace(hoos_lower, hoos_upper, 10000) # Compute the cumulative density function (CDF) for the generated range cdf_range = lognorm.cdf(x_range, shape_hoos, 0.0, np.exp(scale_hoos)) elif type_elt_split == "RCM": # Calculate parameters for RCM elements shape_hoos = np.sqrt(np.log(1 + hoos_std**2 / hoos_mean**2)) scale_hoos = np.log(hoos_mean**2 / (np.sqrt(hoos_mean**2 + hoos_std**2))) scale_hoos = scale_hoos + np.log(rcm_charac) # Calculate the lower and upper bounds of the distribution for RCM elements hoos_lower = lognorm(shape_hoos, 0.0, np.exp(scale_hoos)).ppf(0.0001) hoos_upper = lognorm(shape_hoos, 0.0, np.exp(scale_hoos)).ppf(0.9999) # Generate a range of values within the distribution x_range = np.linspace(hoos_lower, hoos_upper, 10000) # Compute the cumulative density function (CDF) for the generated range cdf_range = lognorm.cdf(x_range, shape_hoos, 0.0, np.exp(scale_hoos)) return x_range, cdf_range
[docs] class PreProcessor: """ Class to handle the pre-processing of scenario data for BuckPy simulations. This class reads scenario data from an Excel file, extracts and processes route, pipe, operating, and soil data, and calculates scenario data. It also converts the scenario data and end boundary conditions to NumPy arrays for Monte Carlo simulations and processes post-processing data. The class includes methods for calculating expanded KP values, creating element arrays, interpolating distributions, and handling various preprocessing tasks. """ def __init__(self, work_dir, file_name, pipeline, scenario, bl_verbose): """ Method to initialize the PreProcessor class with the necessary parameters and attributes. Parameters ---------- work_dir : str Directory where the Excel file is located. file_name : str Name of the Excel file. pipeline : str Identifier of the pipeline. scenario : int Identifier of the scenario. bl_verbose : bool True if intermediate printouts are required (False by default). Returns ------- None """ # Initialize attributes for data storage self.work_dir = work_dir self.file_name = file_name self.pipeline = pipeline self.scenario = scenario self.bl_verbose = bl_verbose # Initialize attributes for storing dataframes and arrays self.scen_df = None self.route_df = None self.route_ends_df = None self.mitigation_df = None self.soil_zoning_df = None self.pipe_df = None self.soil_df = None self.oper_df = None self.pp_df = None # Initialize attributes for storing NumPy arrays used in Monte Carlo simulations self.scen_np = None self.dist_np = None self.ends_np = None
[docs] def run(self): """ Import scenario data from an Excel file and preprocess it. Parameters ---------- work_dir : str Directory where the Excel file is located. file_name : str Name of the Excel file. pipeline : str Identifier of the pipeline. scenario : int Identifier of the scenario. bl_verbose : bool, optional True if intermediate printouts are required. Returns ------- scen_np : numpy.ndarray NumPy array containing the scenario data for Monte Carlo simulations. dist_np : numpy.ndarray NumPy array containing the distribution data for Monte Carlo simulations. ends_np : numpy.ndarray NumPy array containing the end boundary conditions for Monte Carlo simulations. scen_df : pandas.DataFrame DataFrame containing the scenario data for deterministic simulations. pp_df : pandas.DataFrame DataFrame containing the post-processing data for the scenario. Notes ----- This function reads scenario data from an Excel file, extracts and processes route, pipe, operating, and soil data, and calculates scenario data. It also converts the scenario data and end boundary conditions to NumPy arrays for Monte Carlo simulations and processes post-processing data. The function prints out the time taken to create the main dataframe if bl_verbose is set to True. Other Parameters ---------------- bl_verbose : boolean, optional True if intermediate printouts are required (False by default). """ # Print out in the terminal that the assembly of the main dataframe has started if self.bl_verbose: print("1. Assembly of the main dataframe") # Starting time of the pre-processing module start_time = time.time() # Read data from the input Excel file sheets = pd.read_excel(rf"{self.work_dir}/{self.file_name}", sheet_name=None) self.scen_df = sheets["Scenario"] self.route_df = sheets["Route"] self.mitigation_df = sheets["Mitigation"] self.soil_zoning_df = sheets["Soil Zoning"] self.pipe_df = sheets["Pipe"] self.soil_df = sheets["Soils"] self.oper_df = sheets["Operating"] self.pp_df = sheets["Post-Processing"] # Filter scenario dataframe based on pipeline and scenario self.scen_df = self.scen_df.loc[ (self.scen_df["Pipeline"] == self.pipeline) & (self.scen_df["Scenario"] == self.scenario) ].copy() # Extract simulation parameters from the scenario dataframe layout = self.scen_df["Layout Set"].values[0] mitigation = self.scen_df["Mitigation Set"].values[0] loadcase = self.scen_df["Loadcase Set"].values[0] # Filter route data based on layout self.route_df = self.route_df.loc[ (self.route_df["Pipeline"] == self.pipeline) & (self.route_df["Layout Set"] == layout) ].copy() # Ensure mitigation-driven columns exist on route rows before segmentation for col in ["Sleeper Height", "RCM Buckling Force"]: if col not in self.route_df.columns: self.route_df[col] = np.nan self.route_df[["KP From", "KP To"]] = ( self.route_df[["KP From", "KP To"]].astype(float) ) # Filter mitigation data based on mitigation self.mitigation_df = self.mitigation_df.loc[ (self.mitigation_df["Pipeline"] == self.pipeline) & (self.mitigation_df["Mitigation Set"] == mitigation) ].copy() self.mitigation_df[["KP From", "KP To", "Sleeper Height", "RCM Buckling Force"]] = ( self.mitigation_df[["KP From", "KP To", "Sleeper Height", "RCM Buckling Force"]] .astype(float) ) # Filter soil zoning data based on soil zoning self.soil_zoning_df = self.soil_zoning_df.loc[ (self.soil_zoning_df["Pipeline"] == self.pipeline) & (self.soil_zoning_df["Route Layout"] == layout) ].copy() self.soil_zoning_df[["KP From", "KP To"]] = ( self.soil_zoning_df[["KP From", "KP To"]].astype(float) ) # Postprocess route data based on route, mitigation and soil zoning data self.calc_route_data() # Postprocess pipe data and calculate pipe properties self.pipe_df = self.pipe_df.loc[ (self.pipe_df["Pipeline"] == self.pipeline) ].copy() self.calc_pipe_data() # Postprocess soil data and calculate friction factor distributions self.soil_df = self.soil_df.loc[ (self.soil_df["Pipeline"] == self.pipeline) ].copy() self.calc_soil_data() # Postprocess operating data and calculate operating profiles and operating data self.oper_df = self.oper_df.loc[ (self.oper_df["Pipeline"] == self.pipeline) & (self.oper_df["Loadcase Set"] == loadcase) ].copy() self.calc_oper_data() # Postprocess scenario data self.calc_scenario_data() # Define the NumPy arrays used in the Monte Carlo Simulations self.calc_monte_carlo_data() # Process post-processing data based on pipeline, layout and mitigation mask = ( (self.pp_df["Pipeline"] == self.pipeline) & (self.pp_df["Layout Set"] == layout) ) if pd.isna(mitigation): mask &= self.pp_df["Mitigation Set"].isna() else: mask &= self.pp_df["Mitigation Set"] == mitigation self.pp_df = self.pp_df.loc[mask].copy() self.calc_pp_data() # Ensure mitigation-driven columns exist on route rows after segmentation if "Sleeper Height" not in self.route_df.columns: self.route_df["Sleeper Height"] = np.nan if "RCM Buckling Force" not in self.route_df.columns: self.route_df["RCM Buckling Force"] = np.nan # Set "Bend Radius" to NaN for rows where "Sleeper Height" or "RCM Buckling Force" are not NaN self.route_df.loc[~self.route_df["Sleeper Height"].isna(), "Bend Radius"] = np.nan self.route_df.loc[~self.route_df["RCM Buckling Force"].isna(), "Bend Radius"] = np.nan # Select specific columns for route data output cols = [ "Pipeline", "Layout Set", "Pipe Set", "Friction Set", "Route Type", "Point ID From", "Point ID To", "KP From", "KP To", "Bend Radius", "Sleeper Height", "RCM Buckling Force", "HOOS Mean", "HOOS STD", "HOOS Reference Length", "Residual Buckle Force Hydrotest", "Residual Buckle Length Hydrotest", "Residual Buckle Force Operation", "Residual Buckle Length Operation", "Reaction Installation", "Reaction Hydrotest", "Reaction Operation" ] self.route_df = self.route_df[cols].copy() # Print out in the terminal time taken to create main dataframe if self.bl_verbose: print(f" Time taken to create main dataframe: {time.time() - start_time:.1f}s") return self.scen_np, self.dist_np, self.ends_np, self.scen_df, self.route_df, self.pp_df
[docs] def calc_route_data(self): """ Extract and process route data for calculations. Parameters ---------- route_df : pandas.DataFrame DataFrame containing route data. mitigation_df : pandas.DataFrame DataFrame containing mitigation data. soil_zoning_df : pandas.DataFrame DataFrame containing soil zoning data. Returns ------- route_df : pandas.DataFrame DataFrame containing route data and calculated route data. route_ends_df : pandas.DataFrame DataFrame containing end boundary conditions. Notes ----- This function extracts route ends and route data based on layout, mitigation, and soil_zoning. It selects specific columns for route ends data. Route Type is converted from string tofloat for numerical representation. Route ends data is converted to a NumPy array for efficient processing. """ # Extract route ends based on layout self.route_ends_df = self.route_df.iloc[[0, -1]] # Select specific columns for route ends data self.route_ends_df = self.route_ends_df[[ "Route Type", "KP From", "KP To", "Reaction Installation", "Reaction Hydrotest", "Reaction Operation" ]] # Convert "Route Type" from string to float for numerical representation self.route_ends_df.loc[self.route_ends_df["Route Type"] == "Spool", "Route Type"] = 1 self.route_ends_df.loc[self.route_ends_df["Route Type"] == "Fixed", "Route Type"] = 2 self.route_ends_df["Route Type"] = self.route_ends_df["Route Type"].astype(float) # Extract route data based on layout self.route_df = self.route_df.iloc[1:-1].copy() # Combine rows from route and mitigation, then sort by KP From self.apply_route_mitigation() # Extract soil zoning data based on soil_zoning self.apply_route_soil_zoning()
[docs] def calc_pipe_data(self): """ Calculate properties of pipes. Parameters ---------- pipe_df : pandas.DataFrame DataFrame containing the pipe data. Returns ------- pipe_df : pandas.DataFrame DataFrame containing the pipe data and calculated pipe properties. Notes ----- This function computes the inner diameter (ID), cross-sectional area (As), inner area (Ai), moment of inertia (I), hydrotest characteristic buckling force (SChar HT), and operation characteristic buckling force (SChar OP) of the pipe. """ # Compute the inner diameter (ID) of the pipe self.pipe_df["ID"] = self.pipe_df["OD"] - 2.0 * self.pipe_df["WT"] # Compute the cross-sectional area (As) of the pipe self.pipe_df["As"] = np.pi / 4.0 * (self.pipe_df["OD"] ** 2 - self.pipe_df["ID"] ** 2) # Compute the inner area (Ai) of the pipe self.pipe_df["Ai"] = np.pi / 4.0 * self.pipe_df["ID"] ** 2 # Compute the moment of inertia (I) of the pipe self.pipe_df["I"] = np.pi / 64.0 * (self.pipe_df["OD"] ** 4 - self.pipe_df["ID"] ** 4) # Compute the hydrotest characteristic buckling force (SChar HT) of the pipe self.pipe_df["SChar HT"] = 2.26 * (self.pipe_df["E"] * self.pipe_df["As"]) ** 0.25 * (self.pipe_df["E"] * self.pipe_df["I"]) ** 0.25 * self.pipe_df["sw Hydrotest"] ** 0.5 # Compute the operation characteristic buckling force (SChar OP) of the pipe self.pipe_df["SChar OP"] = 2.26 * (self.pipe_df["E"] * self.pipe_df["As"]) ** 0.25 * (self.pipe_df["E"] * self.pipe_df["I"]) ** 0.25 * self.pipe_df["sw Operation"] ** 0.5
[docs] def calc_soil_data(self): """ Calculate soil data and axial and lateral friction factor distributions and assign them to DataFrame columns. Parameters ---------- soil_df : pandas.DataFrame DataFrame containing soil data. Returns ------- soil_df : pandas.DataFrame DataFrame containing soil data and calculated friction factor distributions. Notes ----- This function computes lognormal distributions for axial and lateral friction factors and assigns them to DataFrame columns. """ # Compute lognormal distributions for axial friction and assign to DataFrame result = ss.LBSoilDistributions( friction_factor_le=self.soil_df["Axial LE"], friction_factor_be=self.soil_df["Axial BE"], friction_factor_he=self.soil_df["Axial HE"], friction_factor_fit_type=self.soil_df["Axial Fit Bounds"] ).friction_distribution_parameters() self.soil_df["Axial Mean"], self.soil_df["Axial STD"] = result[:2] muax_array = np.asarray(result[-2]) muax_cdf = np.asarray(result[-1]) self.soil_df["muax Array"] = list(np.atleast_2d(muax_array)) self.soil_df["muax CDF Array"] = list(np.atleast_2d(muax_cdf)) # Compute lognormal distributions for lateral hydrotest friction and assign to DataFrame result = ss.LBSoilDistributions( friction_factor_le=self.soil_df["Lateral Hydrotest LE"], friction_factor_be=self.soil_df["Lateral Hydrotest BE"], friction_factor_he=self.soil_df["Lateral Hydrotest HE"], friction_factor_fit_type=self.soil_df["Lateral Hydrotest Fit Bounds"] ).friction_distribution_parameters() self.soil_df["Lateral Hydrotest Mean"], self.soil_df["Lateral Hydrotest STD"] = result[:2] mul_ht_array = np.asarray(result[-2]) mul_ht_cdf = np.asarray(result[-1]) self.soil_df["mul HT Array"] = list(np.atleast_2d(mul_ht_array)) self.soil_df["mul HT CDF Array"] = list(np.atleast_2d(mul_ht_cdf)) # Compute lognormal distributions for lateral operation friction and assign to DataFrame result = ss.LBSoilDistributions( friction_factor_le=self.soil_df["Lateral Operation LE"], friction_factor_be=self.soil_df["Lateral Operation BE"], friction_factor_he=self.soil_df["Lateral Operation HE"], friction_factor_fit_type=self.soil_df["Lateral Operation Fit Bounds"] ).friction_distribution_parameters() self.soil_df["Lateral Operation Mean"], self.soil_df["Lateral Operation STD"] = result[:2] mul_op_array = np.asarray(result[-2]) mul_op_cdf = np.asarray(result[-1]) self.soil_df["mul OP Array"] = list(np.atleast_2d(mul_op_array)) self.soil_df["mul OP CDF Array"] = list(np.atleast_2d(mul_op_cdf))
[docs] def calc_oper_data(self): """ Calculate operating data and process it. Parameters ---------- oper_df : pandas.DataFrame DataFrame containing the operating data. route_ends_df : pandas.DataFrame DataFrame containing the end boundary conditions. Returns ------- df : pandas.DataFrame DataFrame containing the operating data and calculated operating data. Notes ----- This function filters oper_df DataFrame based on loadcase, and "KP To". It calculates rolling mean and difference, assigns the "Length" column, resets the index, and drops rows with NaN values before returning the preprocessed DataFrame. """ # Select the "Point ID From" and "KP To" columns route_df_temp = self.route_df[["Point ID From", "KP To"]].reset_index(drop = True).copy() # Add the end row of route and the start KP end_row = pd.DataFrame({"Point ID From": "End", "KP To": np.nan}, index = [99999]) route_df_temp = pd.concat([route_df_temp, end_row], ignore_index = True) # Shift KP column 1 downwards and assign 0.0 to the first KP route_df_temp["KP To"] = route_df_temp["KP To"].shift().fillna(0.0) # Expand the KP array with 1000 intervals from 1000 to nearest maximum KP route_df_temp = self.build_oper_kp_mesh_from_route(route_df_temp) # Create the elements between each KP points elem_array_temp = self.build_oper_element_kp_array(route_df_temp) # Interpolate the RLT, pressure and temperature using KP and operating profile self.interpolate_oper_profile_on_kp(elem_array_temp) # Filter oper_df DataFrame based on loadcase and "KP To" self.oper_df = self.oper_df.loc[ self.oper_df["KP"] <= self.route_ends_df["KP To"].iloc[-1] ].copy() # Calculate the rolling mean of oper_df grouped by Loadcase Set df_rolling_mean = self.oper_df.rolling(2).mean() # Calculate the rolling difference of oper_df grouped by Loadcase Set df_rolling_difference = self.oper_df.rolling(2).max() - self.oper_df.rolling(2).min() # Assign the "Length" column in df_rolling_mean df_rolling_mean["Length"] = df_rolling_difference["KP"] # Reset the index of df_rolling_mean and drop the "level_2" index level df_rolling_mean = df_rolling_mean.reset_index(drop=True) # Drop rows with NaN values df_rolling_mean = df_rolling_mean.dropna() self.oper_df = df_rolling_mean.copy()
[docs] def calc_scenario_data(self): """ Calculate scenario data based on route, pipe, operating, and soil data. Parameters ---------- route_df : pandas.DataFrame DataFrame containing route data. pipe_df : pandas.DataFrame DataFrame containing pipe data. oper_df : pandas.DataFrame DataFrame containing operating data. soil_df : pandas.DataFrame DataFrame containing soil data. Returns ------- df: pandas.DataFrame DataFrame containing the calculated scenario data. Notes ----- This function merges route, pipe, operating, and soil data to compute various scenario parameters. It calculates various attributes such as lognormal distributions, buckling forces, and section counts. The resulting DataFrame includes a subset of calculated columns and is filled with 0 for missing values. """ # Merge operating data with route data using an asof merge to align KPs and route segments temp_df = pd.merge_asof( left=self.oper_df, right=self.route_df, left_on="KP", right_on="KP From", direction="backward", ) # Merge resulting DataFrame with pipe data based on Pipe Set temp_df = pd.merge( left=temp_df, right=self.pipe_df, left_on="Pipe Set", right_on="Pipe Set" ) # Merge resulting DataFrame with soil data based on Friction Set temp_df = pd.merge( left=temp_df, right=self.soil_df, left_on="Friction Set", right_on="Friction Set" ) # Compute lognormal distributions for soil properties and assign to DataFrame columns temp_df["HOOS X Array"], temp_df["HOOS CDF Array"] = zip( *temp_df.apply( lambda x: calc_lognorm_hoos( x["Route Type"], x["Length"], x["HOOS Mean"], x["HOOS STD"], x["HOOS Reference Length"], x.get("RCM Buckling Force", np.nan), ), axis=1 ).apply(np.array) ) # Compute various buckling forces based on calculated parameters temp_df["FRF HT"] = ( temp_df["RLT"] + temp_df["E"] * temp_df["Alpha"] * temp_df["As"] * (temp_df["Temperature Hydrotest"] - temp_df["Temperature Installation"]) + (1 - 2 * temp_df["Poisson"]) * (temp_df["Pressure Hydrotest"] - temp_df["Pressure Installation"]) * temp_df["Ai"] ) temp_df["FRF OP"] = ( temp_df["RLT"] + temp_df["E"] * temp_df["Alpha"] * temp_df["As"] * (temp_df["Temperature Operation"] - temp_df["Temperature Installation"]) + (1 - 2 * temp_df["Poisson"]) * (temp_df["Pressure Operation"] - temp_df["Pressure Installation"]) * temp_df["Ai"] ) temp_df["FRF OP Pressure"] = ( temp_df["RLT"] + (1 - 2 * temp_df["Poisson"]) * temp_df["Pressure Operation"] * temp_df["Ai"] ) temp_df["FRF OP Temperature"] = ( temp_df["E"] * temp_df["As"] * temp_df["Alpha"] * (temp_df["Temperature Operation"] - temp_df["Temperature Installation"]) ) # Calculate the hydrotest and operation buckling forces (Sv) sleeper_height = temp_df.get("Sleeper Height", pd.Series(np.nan, index=temp_df.index)) temp_df["Sv HT"] = 4.0 * np.sqrt(temp_df["E"] * temp_df["I"] * temp_df["sw Hydrotest"] / sleeper_height) temp_df["Sv OP"] = 4.0 * np.sqrt(temp_df["E"] * temp_df["I"] * temp_df["sw Operation"] / sleeper_height) # Calculate section-related parameters temp_df["KP Section"] = temp_df["KP"] - temp_df["KP From"] temp_df["Reference Section"] = (temp_df["KP Section"] / temp_df["HOOS Reference Length"]).apply(np.floor) temp_df["Section Count"] = 0.0 temp_df.loc[ (temp_df["Route Type"] != temp_df["Route Type"].shift()) | (temp_df["Reference Section"] != temp_df["Reference Section"].shift()), "Section Count" ] = 1.0 temp_df["Section Count"] = temp_df["Section Count"].cumsum() # Calculate the residual buckle length and force for hydrotest and operation if "RCM Buckling Force" not in temp_df.columns: temp_df["RCM Buckling Force"] = np.nan # Select relevant columns and rename them for clarity temp_df = temp_df[[ "KP", "Length", "Route Type", "KP From", "KP To", "Point ID From", "Point ID To", "Bend Radius", "muax Array", "muax CDF Array", "mul HT Array", "mul HT CDF Array", "mul OP Array", "mul OP CDF Array", "HOOS X Array", "HOOS CDF Array", "sw Installation", "sw Hydrotest", "sw Operation", "SChar HT", "SChar OP", "Sv HT", "Sv OP", "RCM Buckling Force", "RLT", "FRF HT", "FRF OP Pressure", "FRF OP Temperature", "FRF OP", "Residual Buckle Length Hydrotest", "Residual Buckle Force Hydrotest", "Residual Buckle Length Operation", "Residual Buckle Force Operation", "Section Count", "KP Section", "Reference Section", "Axial Mean", "Lateral Hydrotest Mean", "Lateral Operation Mean", "HOOS Mean" ]] temp_df = temp_df.rename(columns={ "sw Installation": "sw IN", "sw Hydrotest": "sw HT", "sw Operation": "sw OP", "Residual Buckle Length Hydrotest": "buckleLength HT", "Residual Buckle Force Hydrotest": "buckleEAF HT", "Residual Buckle Length Operation": "buckleLength OP", "Residual Buckle Force Operation": "buckleEAF OP" }) # Convert route type strings to numerical representation temp_df.loc[temp_df["Route Type"] == "Straight", "Route Type"] = 1 temp_df.loc[temp_df["Route Type"] == "Bend", "Route Type"] = 2 temp_df.loc[temp_df["Route Type"] == "Sleeper", "Route Type"] = 3 temp_df.loc[temp_df["Route Type"] == "RCM", "Route Type"] = 4 temp_df["Route Type"] = temp_df["Route Type"].astype(float) # Fill missing values with 0 temp_df = temp_df.fillna(0) # Add scenario parameters to the DataFrame temp_df["Pipeline"] = self.scen_df["Pipeline"].values[0] temp_df["Scenario"] = self.scen_df["Scenario"].values[0] temp_df["Layout Set"] = self.scen_df["Layout Set"].values[0] temp_df["Simulations"] = self.scen_df["Simulations"].values[0] temp_df["Friction Sampling"] = self.scen_df["Friction Sampling"].values[0] temp_df["Char. Friction Prob."] = self.scen_df["Char. Friction Prob."].values[0] self.scen_df = temp_df.copy()
[docs] def calc_pp_data(self): """ Calculate post-processing data set for a given layout set. Parameters ---------- df : pandas.DataFrame DataFrame containing post-processing data. np_array : numpy.ndarray NumPy array containing pipeline end boundary conditions. pipeline_id : str Identifier of the pipeline. layout_set : str Identifier of the layout set. Returns ------- df : pandas.DataFrame DataFrame containing calculated post-processing data. Notes ----- This function filters the DataFrame based on the layout set. It resets the index, renames columns, and selects relevant columns. Adjusts the last 'KP_to' value if it is smaller than the maximum value in np_array. Converts data types of columns to appropriate numeric types. """ # Reset index, rename columns, and select relevant columns self.pp_df = self.pp_df.reset_index(drop=True).rename(columns={ 'Post-Processing Set': 'pp_set', 'KP From': 'KP_from', 'KP To': 'KP_to', 'Post-Processing Description': 'description' }) self.pp_df = self.pp_df[ ['pp_set', 'KP_from', 'KP_to', 'description', 'Characteristic VAS Probability'] ] # Convert columns to appropriate numeric types self.pp_df['pp_set'] = self.pp_df['pp_set'].astype(np.int64) self.pp_df['KP_from'] = self.pp_df['KP_from'].astype(np.float64) self.pp_df['KP_to'] = self.pp_df['KP_to'].astype(np.float64)
[docs] def calc_monte_carlo_data(self): """ Convert the scenario data and end boundary conditions data to NumPy arrays for Monte Carlo simulations. Parameters ---------- scen_df : pandas.DataFrame DataFrame containing the scenario data. route_ends_df : pandas.DataFrame DataFrame containing the end boundary conditions data. Returns ------- dist_np : numpy.ndarray 2D array with probabilistic distributions (rows) along the route mesh (columns). scen_np : numpy.ndarray 2D array with scenario properties (rows) along the route mesh (columns). ends_np : numpy.ndarray 2D array with end properties (rows) for the ends. Notes ----- The arrays have the following row layout (index : meaning): scen_np: - 0 : KP - 1 : LENGTH - 2 : ROUTE_TYPE - 3 : BEND_RADIUS - 4 : SW_INST - 5 : SW_HT - 6 : SW_OP - 7 : SCHAR_HT - 8 : SCHAR_OP - 9 : SV_HT - 10 : SV_OP - 11 : CBF_RCM - 12 : RLT - 13 : FRF_HT - 14 : FRF_P_OP - 15 : FRF_T_OP - 16 : FRF_OP - 17 : L_BUCKLE_HT - 18 : EAF_BUCKLE_HT - 19 : L_BUCKLE_OP - 20 : EAF_BUCKLE_OP - 21 : SECTION_ID - 22 : SECTION_KP - 23 : SECTION_REF - 24 : MUAX_MEAN - 25 : MULAT_HT_MEAN - 26 : MULAT_OP_MEAN - 27 : HOOS_MEAN dist_np: - 0 : MUAX_ARRAY - 1 : MUAX_CDF_ARRAY - 2 : MULAT_ARRAY_HT - 3 : MULAT_CDF_ARRAY_HT - 4 : MULAT_ARRAY_OP - 5 : MULAT_CDF_ARRAY_OP - 6 : HOOS_ARRAY - 7 : HOOS_CDF_ARRAY ends_np: - 0 : ROUTE_TYPE - 1 : KP_FROM - 2 : KP_TO - 3 : REAC_INST - 4 : REAC_HT - 5 : REAC_OP """ # Create a list to store the distribution arrays and define their column labels dist_list = [] dist_list_columns = [ "muax Array", "muax CDF Array", "mul HT Array", "mul HT CDF Array", "mul OP Array", "mul OP CDF Array", "HOOS X Array", "HOOS CDF Array" ] # Loop through the distribution columns and convert each column to a list for list_label in dist_list_columns: dist_list_temp = [] for i in range(self.scen_df[list_label].size): dist_list_temp.append(self.scen_df[list_label][i]) dist_list.append(dist_list_temp) # Convert the list of distribution arrays to a NumPy array self.dist_np = np.array(dist_list, dtype="float64") # Add extra columns to remove dist_array_columns_drop = [ "Pipeline", "Scenario", "Simulations", "Friction Sampling", "Char. Friction Prob.", "KP From", "KP To", "Point ID From", "Point ID To" ] dist_array_columns_drop = np.append(dist_array_columns_drop, dist_list_columns) # Convert scenario properties to numpy array self.scen_np = self.scen_df.drop(dist_array_columns_drop, axis=1).to_numpy().transpose() # Convert end properties to numpy array self.ends_np = self.route_ends_df.to_numpy().transpose()
[docs] def apply_route_mitigation(self): """ Function to combine rows from route and mitigation, then sort by KP From. Parameters ---------- route_df : pandas Dataframe Dataframe containing the route data. mitigation_df : pandas Dataframe Dataframe containing the mitigation data. Returns ------- route_df : pandas Dataframe Dataframe containing the combined route and mitigation data, sorted by KP From. """ rows = [] for _, r in self.route_df.iterrows(): # Route segment start and end KP and point IDs seg_start = r["KP From"] seg_end = r["KP To"] seg_from_point = r["Point ID From"] # Mitigation rows that overlap this route segment overlaps = self.mitigation_df[ (self.mitigation_df["KP To"] > seg_start) & (self.mitigation_df["KP From"] < seg_end) ].sort_values("KP From") for _, m in overlaps.iterrows(): # Calculate the overlapping KP range between the route and mitigation m_from = max(seg_start, m["KP From"]) m_to = min(seg_end, m["KP To"]) if m_to <= m_from: continue # Part before mitigation if m_from > seg_start: pre = r.copy() pre["KP From"] = seg_start pre["KP To"] = m_from pre["Point ID From"] = seg_from_point pre["Point ID To"] = m["Point ID From"] rows.append(pre) # Mitigation part (override key fields from mitigation) mid = r.copy() mid["KP From"] = m_from mid["KP To"] = m_to # Copy every mitigation column except the KP boundaries, which are determined # by the overlap with the route segment. for col in m.index: if col not in {"KP From", "KP To"}: mid[col] = m[col] rows.append(mid) seg_start = m_to seg_from_point = m["Point ID To"] # Part after last mitigation if seg_start < seg_end: post = r.copy() post["KP From"] = seg_start post["KP To"] = seg_end post["Point ID From"] = seg_from_point rows.append(post) self.route_df = ( pd.DataFrame(rows) .sort_values("KP From", kind="mergesort") .reset_index(drop=True) )
[docs] def apply_route_soil_zoning(self): """ Function to combine rows from route and soil zoning, then sort by KP From. Parameters ---------- route_df : pandas Dataframe Dataframe containing the route data. soil_zoning_df : pandas Dataframe Dataframe containing the soil zoning data. Returns ------- route_df : pandas Dataframe Dataframe containing the combined route and soil zoning data, sorted by KP From. """ # Copy the route and soil zoning dataframes route = self.route_df.copy() zones_all = self.soil_zoning_df.copy() zones = zones_all.iloc[1:].copy() base_friction = zones_all.iloc[0]["Friction Set"] rows = [] for _, r in route.iterrows(): original_start = r["KP From"] original_end = r["KP To"] seg_start = r["KP From"] seg_end = r["KP To"] current_friction = base_friction # Zones overlapping this route segment overlaps = zones[ (zones["KP To"] > seg_start) & (zones["KP From"] < seg_end) ].sort_values("KP From") # No overlap: keep whole segment with current/base friction if overlaps.empty: row = r.copy() row["Friction Set"] = current_friction rows.append(row) continue for _, z in overlaps.iterrows(): z_from = max(seg_start, z["KP From"]) z_to = min(seg_end, z["KP To"]) if z_to <= z_from: continue # Before zone: keep previous friction if z_from > seg_start: pre = r.copy() pre["KP From"] = seg_start pre["KP To"] = z_from pre["Friction Set"] = current_friction pre["Point ID From"] = ( r["Point ID From"] if seg_start == original_start else "Soil Change" ) pre["Point ID To"] = "Soil Change" rows.append(pre) # Inside zone: apply zone friction mid = r.copy() mid["KP From"] = z_from mid["KP To"] = z_to mid["Friction Set"] = z["Friction Set"] mid["Point ID From"] = ( r["Point ID From"] if z_from == original_start else "Soil Change" ) mid["Point ID To"] = ( r["Point ID To"] if z_to == original_end else "Soil Change" ) rows.append(mid) seg_start = z_to current_friction = z["Friction Set"] # Tail after last overlapping zone if seg_start < seg_end: post = r.copy() post["KP From"] = seg_start post["KP To"] = seg_end post["Friction Set"] = current_friction post["Point ID From"] = ( r["Point ID From"] if seg_start == original_start else "Soil Change" ) post["Point ID To"] = r["Point ID To"] rows.append(post) self.route_df = ( pd.DataFrame(rows) .sort_values("KP From", kind="mergesort") .reset_index(drop=True) )
[docs] def build_oper_kp_mesh_from_route(self, route_df): """ Function to expand the KP array with 1000 intervals from 1000 to nearest maximum KP. Parameters ---------- route_df : pandas Dataframe Dataframe containing the route data. Returns ------- route_df : pandas Dataframe Dataframe containing the route data with expanded KP values, calculated lengths, element numbers, and element sizes. """ # Rename kp_col to "KP From" route_df = route_df.rename(columns = {"KP To": "KP From"}) # Expand the KP array with 1000 intervals from 1000 to nearest maximum KP max_kp = np.floor(route_df["KP From"].max() / 1000.0) * 1000.0 kp_array = np.arange(1000, max_kp + 1.0, 1000) # Create a dataframe for the expanded kp expand_df = pd.DataFrame({"Point ID From": [np.nan] * len(kp_array), "KP From": kp_array}) route_df = pd.concat( [route_df, expand_df], ignore_index = True ).sort_values(by = "KP From").drop_duplicates("KP From").reset_index(drop = True).ffill() # Calculate relative length between KP and KP To route_df["KP To"] = route_df["KP From"].shift(-1) route_df = route_df.dropna() route_df["Length"] = route_df["KP To"] - route_df["KP From"] # Calculate element number and element size route_df["Elem No."] = np.ceil(route_df["Length"] / 100.0) route_df["Elem Size"] = route_df["Length"] / route_df["Elem No."] return route_df
[docs] def build_oper_element_kp_array(self, route_df): """ Function to create element array based on KP, KP TO and element number. Parameters ---------- route_df : pandas Dataframe Dataframe containing the route data with expanded KP values, calculated lengths, element numbers, and element sizes. Returns ------- elem_array : numpy Array """ # Create the elements between each KP points elem_values = [] for _, x in route_df.iterrows(): elem_values.extend( np.linspace(x["KP From"], x["KP To"], int(x["Elem No."] + 1.0)) ) # Convert the list of element values to a NumPy array, remove duplicates and NaN values elem_array = np.array(elem_values, dtype=float) elem_array = np.unique(elem_array) elem_array = elem_array[~np.isnan(elem_array)] return elem_array
[docs] def interpolate_oper_profile_on_kp(self, elem_array): """ Function to interpolate the RLT, pressure and temperature using KP and operating profile. Parameters ---------- elem_array : numpy Array Array containing the KP values for interpolation. Returns ------- oper_df : pandas Dataframe Dataframe containing the interpolated RLT, pressure and temperature values based on KP and operating profile. """ # Define the columns to interpolate interp_columns = [ "Pressure Installation", "Pressure Hydrotest", "Pressure Operation", "Temperature Installation", "Temperature Hydrotest", "Temperature Operation", "RLT", ] # Create a dataframe for the interpolated values interp_df= pd.DataFrame({"KP": elem_array}) # Interpolate the RLT, pressure and temperature using KP and operating profile for column in interp_columns: interp_df[column] = np.interp(interp_df["KP"], self.oper_df["KP"], self.oper_df[column]) self.oper_df = interp_df.copy()