"""
This module contains the pre-processing functions of BuckPy.
"""
import time
import numpy as np
import pandas as pd
from scipy.stats import lognorm
import pysubsea as ss
[docs]
def calc_lognorm_hoos(type_elt, length_elt, hoos_mean, hoos_std, length_ref, rcm_charac):
"""
Compute the parameters of the horizontal out-of-straightness (HOOS) lognormal distribution
for different types of elements (e.g., Straight, Bend, Sleeper, RCM). This function takes into
account the scaling factor of the HOOS distribution. For RCM, the HOOS factor is not a factor
but the critical buckling force.
Parameters
----------
type_elt : str
Type of the element.
length_elt : float
Length of the element.
hoos_mean : float
Mean of the HOOS distribution.
hoos_std : float
Standard deviation of the HOOS distribution.
length_ref : float
Reference length.
rcm_charac : float
Characteristic buckling force for the Residual Curvature Method (RCM).
Returns
-------
x_range : numpy.ndarray
An array of values representing the range of the friction factor distribution
between probabilities of exceedance between 0.01% and 99.99%.
cdf_range : numpy.ndarray
An array of cumulative density function (CDF) values corresponding to `x_range`.
Notes
-----
This function computes the parameters of a lognormal distribution for different types of
elements such as Straight, Bend, Sleeper, and RCM (Residual Curvature Method). It
calculates the cumulative density function (CDF) for the generated range of values
based on the HOOS distribution parameters.
"""
# Extract the type of element (e.g., Straight, Bend, Sleeper, RCM)
type_elt_split = type_elt.split(" ")[0]
# Compute the ratio of the reference length to the element length
n = length_ref / length_elt
if type_elt_split == "Straight" or type_elt_split == "Bend":
# Calculate parameters for straight or bend elements
shape_hoos = np.sqrt(np.log(1 + hoos_std**2 / hoos_mean**2))
scale_hoos = np.log(hoos_mean**2 / (np.sqrt(hoos_mean**2 + hoos_std**2)))
# Define the range of the HOOS distribution
hoos_lower = 0.0
hoos_upper = 20.0
x = np.linspace(hoos_lower, hoos_upper, 200000)
# Calculate the cumulative density function (CDF) considering the scaling factor
cdf = 1-(1-lognorm.cdf(x, shape_hoos, 0.0, np.exp(scale_hoos)))**(1/n)
# Generate a range of CDF values
cdf_range = np.arange(0.0, 1.0, 0.0001)
# Interpolate to get the corresponding values of the distribution
x_range = np.interp(cdf_range, cdf, x)
elif type_elt_split == "Sleeper":
# Calculate parameters for sleeper elements
shape_hoos = np.sqrt(np.log(1 + hoos_std**2 / hoos_mean**2))
scale_hoos = np.log(hoos_mean**2 / (np.sqrt(hoos_mean**2 + hoos_std**2)))
# Calculate the lower and upper bounds of the distribution for sleeper elements
hoos_lower = lognorm(shape_hoos, 0.0, np.exp(scale_hoos)).ppf(0.0001)
hoos_upper = lognorm(shape_hoos, 0.0, np.exp(scale_hoos)).ppf(0.9999)
# Generate a range of values within the distribution
x_range = np.linspace(hoos_lower, hoos_upper, 10000)
# Compute the cumulative density function (CDF) for the generated range
cdf_range = lognorm.cdf(x_range, shape_hoos, 0.0, np.exp(scale_hoos))
elif type_elt_split == "RCM":
# Calculate parameters for RCM elements
shape_hoos = np.sqrt(np.log(1 + hoos_std**2 / hoos_mean**2))
scale_hoos = np.log(hoos_mean**2 / (np.sqrt(hoos_mean**2 + hoos_std**2)))
scale_hoos = scale_hoos + np.log(rcm_charac)
# Calculate the lower and upper bounds of the distribution for RCM elements
hoos_lower = lognorm(shape_hoos, 0.0, np.exp(scale_hoos)).ppf(0.0001)
hoos_upper = lognorm(shape_hoos, 0.0, np.exp(scale_hoos)).ppf(0.9999)
# Generate a range of values within the distribution
x_range = np.linspace(hoos_lower, hoos_upper, 10000)
# Compute the cumulative density function (CDF) for the generated range
cdf_range = lognorm.cdf(x_range, shape_hoos, 0.0, np.exp(scale_hoos))
return x_range, cdf_range
[docs]
class PreProcessor:
"""
Class to handle the pre-processing of scenario data for BuckPy simulations. This class reads
scenario data from an Excel file, extracts and processes route, pipe, operating, and soil data,
and calculates scenario data. It also converts the scenario data and end boundary conditions
to NumPy arrays for Monte Carlo simulations and processes post-processing data.
The class includes methods for calculating expanded KP values, creating element arrays,
interpolating distributions, and handling various preprocessing tasks.
"""
def __init__(self, work_dir, file_name, pipeline, scenario, bl_verbose):
"""
Method to initialize the PreProcessor class with the necessary parameters and attributes.
Parameters
----------
work_dir : str
Directory where the Excel file is located.
file_name : str
Name of the Excel file.
pipeline : str
Identifier of the pipeline.
scenario : int
Identifier of the scenario.
bl_verbose : bool
True if intermediate printouts are required (False by default).
Returns
-------
None
"""
# Initialize attributes for data storage
self.work_dir = work_dir
self.file_name = file_name
self.pipeline = pipeline
self.scenario = scenario
self.bl_verbose = bl_verbose
# Initialize attributes for storing dataframes and arrays
self.scen_df = None
self.route_df = None
self.route_ends_df = None
self.mitigation_df = None
self.soil_zoning_df = None
self.pipe_df = None
self.soil_df = None
self.oper_df = None
self.pp_df = None
# Initialize attributes for storing NumPy arrays used in Monte Carlo simulations
self.scen_np = None
self.dist_np = None
self.ends_np = None
[docs]
def run(self):
"""
Import scenario data from an Excel file and preprocess it.
Parameters
----------
work_dir : str
Directory where the Excel file is located.
file_name : str
Name of the Excel file.
pipeline : str
Identifier of the pipeline.
scenario : int
Identifier of the scenario.
bl_verbose : bool, optional
True if intermediate printouts are required.
Returns
-------
scen_np : numpy.ndarray
NumPy array containing the scenario data for Monte Carlo simulations.
dist_np : numpy.ndarray
NumPy array containing the distribution data for Monte Carlo simulations.
ends_np : numpy.ndarray
NumPy array containing the end boundary conditions for Monte Carlo simulations.
scen_df : pandas.DataFrame
DataFrame containing the scenario data for deterministic simulations.
pp_df : pandas.DataFrame
DataFrame containing the post-processing data for the scenario.
Notes
-----
This function reads scenario data from an Excel file, extracts and processes route,
pipe, operating, and soil data, and calculates scenario data.
It also converts the scenario data and end boundary conditions to NumPy arrays for
Monte Carlo simulations and processes post-processing data.
The function prints out the time taken to create the main dataframe
if bl_verbose is set to True.
Other Parameters
----------------
bl_verbose : boolean, optional
True if intermediate printouts are required (False by default).
"""
# Print out in the terminal that the assembly of the main dataframe has started
if self.bl_verbose:
print("1. Assembly of the main dataframe")
# Starting time of the pre-processing module
start_time = time.time()
# Read data from the input Excel file
sheets = pd.read_excel(rf"{self.work_dir}/{self.file_name}", sheet_name=None)
self.scen_df = sheets["Scenario"]
self.route_df = sheets["Route"]
self.mitigation_df = sheets["Mitigation"]
self.soil_zoning_df = sheets["Soil Zoning"]
self.pipe_df = sheets["Pipe"]
self.soil_df = sheets["Soils"]
self.oper_df = sheets["Operating"]
self.pp_df = sheets["Post-Processing"]
# Filter scenario dataframe based on pipeline and scenario
self.scen_df = self.scen_df.loc[
(self.scen_df["Pipeline"] == self.pipeline) &
(self.scen_df["Scenario"] == self.scenario)
].copy()
# Extract simulation parameters from the scenario dataframe
layout = self.scen_df["Layout Set"].values[0]
mitigation = self.scen_df["Mitigation Set"].values[0]
loadcase = self.scen_df["Loadcase Set"].values[0]
# Filter route data based on layout
self.route_df = self.route_df.loc[
(self.route_df["Pipeline"] == self.pipeline) &
(self.route_df["Layout Set"] == layout)
].copy()
# Ensure mitigation-driven columns exist on route rows before segmentation
for col in ["Sleeper Height", "RCM Buckling Force"]:
if col not in self.route_df.columns:
self.route_df[col] = np.nan
self.route_df[["KP From", "KP To"]] = (
self.route_df[["KP From", "KP To"]].astype(float)
)
# Filter mitigation data based on mitigation
self.mitigation_df = self.mitigation_df.loc[
(self.mitigation_df["Pipeline"] == self.pipeline) &
(self.mitigation_df["Mitigation Set"] == mitigation)
].copy()
self.mitigation_df[["KP From", "KP To", "Sleeper Height", "RCM Buckling Force"]] = (
self.mitigation_df[["KP From", "KP To", "Sleeper Height", "RCM Buckling Force"]]
.astype(float)
)
# Filter soil zoning data based on soil zoning
self.soil_zoning_df = self.soil_zoning_df.loc[
(self.soil_zoning_df["Pipeline"] == self.pipeline) &
(self.soil_zoning_df["Route Layout"] == layout)
].copy()
self.soil_zoning_df[["KP From", "KP To"]] = (
self.soil_zoning_df[["KP From", "KP To"]].astype(float)
)
# Postprocess route data based on route, mitigation and soil zoning data
self.calc_route_data()
# Postprocess pipe data and calculate pipe properties
self.pipe_df = self.pipe_df.loc[
(self.pipe_df["Pipeline"] == self.pipeline)
].copy()
self.calc_pipe_data()
# Postprocess soil data and calculate friction factor distributions
self.soil_df = self.soil_df.loc[
(self.soil_df["Pipeline"] == self.pipeline)
].copy()
self.calc_soil_data()
# Postprocess operating data and calculate operating profiles and operating data
self.oper_df = self.oper_df.loc[
(self.oper_df["Pipeline"] == self.pipeline) &
(self.oper_df["Loadcase Set"] == loadcase)
].copy()
self.calc_oper_data()
# Postprocess scenario data
self.calc_scenario_data()
# Define the NumPy arrays used in the Monte Carlo Simulations
self.calc_monte_carlo_data()
# Process post-processing data based on pipeline, layout and mitigation
mask = (
(self.pp_df["Pipeline"] == self.pipeline) &
(self.pp_df["Layout Set"] == layout)
)
if pd.isna(mitigation):
mask &= self.pp_df["Mitigation Set"].isna()
else:
mask &= self.pp_df["Mitigation Set"] == mitigation
self.pp_df = self.pp_df.loc[mask].copy()
self.calc_pp_data()
# Ensure mitigation-driven columns exist on route rows after segmentation
if "Sleeper Height" not in self.route_df.columns:
self.route_df["Sleeper Height"] = np.nan
if "RCM Buckling Force" not in self.route_df.columns:
self.route_df["RCM Buckling Force"] = np.nan
# Set "Bend Radius" to NaN for rows where "Sleeper Height" or "RCM Buckling Force" are not NaN
self.route_df.loc[~self.route_df["Sleeper Height"].isna(), "Bend Radius"] = np.nan
self.route_df.loc[~self.route_df["RCM Buckling Force"].isna(), "Bend Radius"] = np.nan
# Select specific columns for route data output
cols = [
"Pipeline", "Layout Set", "Pipe Set", "Friction Set", "Route Type", "Point ID From",
"Point ID To", "KP From", "KP To", "Bend Radius", "Sleeper Height",
"RCM Buckling Force", "HOOS Mean", "HOOS STD", "HOOS Reference Length",
"Residual Buckle Force Hydrotest", "Residual Buckle Length Hydrotest",
"Residual Buckle Force Operation", "Residual Buckle Length Operation",
"Reaction Installation", "Reaction Hydrotest", "Reaction Operation"
]
self.route_df = self.route_df[cols].copy()
# Print out in the terminal time taken to create main dataframe
if self.bl_verbose:
print(f" Time taken to create main dataframe: {time.time() - start_time:.1f}s")
return self.scen_np, self.dist_np, self.ends_np, self.scen_df, self.route_df, self.pp_df
[docs]
def calc_route_data(self):
"""
Extract and process route data for calculations.
Parameters
----------
route_df : pandas.DataFrame
DataFrame containing route data.
mitigation_df : pandas.DataFrame
DataFrame containing mitigation data.
soil_zoning_df : pandas.DataFrame
DataFrame containing soil zoning data.
Returns
-------
route_df : pandas.DataFrame
DataFrame containing route data and calculated route data.
route_ends_df : pandas.DataFrame
DataFrame containing end boundary conditions.
Notes
-----
This function extracts route ends and route data based on layout,
mitigation, and soil_zoning. It selects specific columns for route ends data.
Route Type is converted from string tofloat for numerical representation. Route ends
data is converted to a NumPy array for efficient processing.
"""
# Extract route ends based on layout
self.route_ends_df = self.route_df.iloc[[0, -1]]
# Select specific columns for route ends data
self.route_ends_df = self.route_ends_df[[
"Route Type",
"KP From",
"KP To",
"Reaction Installation",
"Reaction Hydrotest",
"Reaction Operation"
]]
# Convert "Route Type" from string to float for numerical representation
self.route_ends_df.loc[self.route_ends_df["Route Type"] == "Spool", "Route Type"] = 1
self.route_ends_df.loc[self.route_ends_df["Route Type"] == "Fixed", "Route Type"] = 2
self.route_ends_df["Route Type"] = self.route_ends_df["Route Type"].astype(float)
# Extract route data based on layout
self.route_df = self.route_df.iloc[1:-1].copy()
# Combine rows from route and mitigation, then sort by KP From
self.apply_route_mitigation()
# Extract soil zoning data based on soil_zoning
self.apply_route_soil_zoning()
[docs]
def calc_pipe_data(self):
"""
Calculate properties of pipes.
Parameters
----------
pipe_df : pandas.DataFrame
DataFrame containing the pipe data.
Returns
-------
pipe_df : pandas.DataFrame
DataFrame containing the pipe data and calculated pipe properties.
Notes
-----
This function computes the inner diameter (ID), cross-sectional area (As), inner area (Ai),
moment of inertia (I), hydrotest characteristic buckling force (SChar HT),
and operation characteristic buckling force (SChar OP) of the pipe.
"""
# Compute the inner diameter (ID) of the pipe
self.pipe_df["ID"] = self.pipe_df["OD"] - 2.0 * self.pipe_df["WT"]
# Compute the cross-sectional area (As) of the pipe
self.pipe_df["As"] = np.pi / 4.0 * (self.pipe_df["OD"] ** 2 - self.pipe_df["ID"] ** 2)
# Compute the inner area (Ai) of the pipe
self.pipe_df["Ai"] = np.pi / 4.0 * self.pipe_df["ID"] ** 2
# Compute the moment of inertia (I) of the pipe
self.pipe_df["I"] = np.pi / 64.0 * (self.pipe_df["OD"] ** 4 - self.pipe_df["ID"] ** 4)
# Compute the hydrotest characteristic buckling force (SChar HT) of the pipe
self.pipe_df["SChar HT"] = 2.26 * (self.pipe_df["E"] * self.pipe_df["As"]) ** 0.25 * (self.pipe_df["E"] * self.pipe_df["I"]) ** 0.25 * self.pipe_df["sw Hydrotest"] ** 0.5
# Compute the operation characteristic buckling force (SChar OP) of the pipe
self.pipe_df["SChar OP"] = 2.26 * (self.pipe_df["E"] * self.pipe_df["As"]) ** 0.25 * (self.pipe_df["E"] * self.pipe_df["I"]) ** 0.25 * self.pipe_df["sw Operation"] ** 0.5
[docs]
def calc_soil_data(self):
"""
Calculate soil data and axial and lateral friction factor distributions
and assign them to DataFrame columns.
Parameters
----------
soil_df : pandas.DataFrame
DataFrame containing soil data.
Returns
-------
soil_df : pandas.DataFrame
DataFrame containing soil data and calculated friction factor distributions.
Notes
-----
This function computes lognormal distributions for axial and lateral
friction factors and assigns them to DataFrame columns.
"""
# Compute lognormal distributions for axial friction and assign to DataFrame
result = ss.LBSoilDistributions(
friction_factor_le=self.soil_df["Axial LE"],
friction_factor_be=self.soil_df["Axial BE"],
friction_factor_he=self.soil_df["Axial HE"],
friction_factor_fit_type=self.soil_df["Axial Fit Bounds"]
).friction_distribution_parameters()
self.soil_df["Axial Mean"], self.soil_df["Axial STD"] = result[:2]
muax_array = np.asarray(result[-2])
muax_cdf = np.asarray(result[-1])
self.soil_df["muax Array"] = list(np.atleast_2d(muax_array))
self.soil_df["muax CDF Array"] = list(np.atleast_2d(muax_cdf))
# Compute lognormal distributions for lateral hydrotest friction and assign to DataFrame
result = ss.LBSoilDistributions(
friction_factor_le=self.soil_df["Lateral Hydrotest LE"],
friction_factor_be=self.soil_df["Lateral Hydrotest BE"],
friction_factor_he=self.soil_df["Lateral Hydrotest HE"],
friction_factor_fit_type=self.soil_df["Lateral Hydrotest Fit Bounds"]
).friction_distribution_parameters()
self.soil_df["Lateral Hydrotest Mean"], self.soil_df["Lateral Hydrotest STD"] = result[:2]
mul_ht_array = np.asarray(result[-2])
mul_ht_cdf = np.asarray(result[-1])
self.soil_df["mul HT Array"] = list(np.atleast_2d(mul_ht_array))
self.soil_df["mul HT CDF Array"] = list(np.atleast_2d(mul_ht_cdf))
# Compute lognormal distributions for lateral operation friction and assign to DataFrame
result = ss.LBSoilDistributions(
friction_factor_le=self.soil_df["Lateral Operation LE"],
friction_factor_be=self.soil_df["Lateral Operation BE"],
friction_factor_he=self.soil_df["Lateral Operation HE"],
friction_factor_fit_type=self.soil_df["Lateral Operation Fit Bounds"]
).friction_distribution_parameters()
self.soil_df["Lateral Operation Mean"], self.soil_df["Lateral Operation STD"] = result[:2]
mul_op_array = np.asarray(result[-2])
mul_op_cdf = np.asarray(result[-1])
self.soil_df["mul OP Array"] = list(np.atleast_2d(mul_op_array))
self.soil_df["mul OP CDF Array"] = list(np.atleast_2d(mul_op_cdf))
[docs]
def calc_oper_data(self):
"""
Calculate operating data and process it.
Parameters
----------
oper_df : pandas.DataFrame
DataFrame containing the operating data.
route_ends_df : pandas.DataFrame
DataFrame containing the end boundary conditions.
Returns
-------
df : pandas.DataFrame
DataFrame containing the operating data and calculated operating data.
Notes
-----
This function filters oper_df DataFrame based on loadcase, and "KP To".
It calculates rolling mean and difference, assigns the "Length" column,
resets the index, and drops rows with NaN values before returning the
preprocessed DataFrame.
"""
# Select the "Point ID From" and "KP To" columns
route_df_temp = self.route_df[["Point ID From", "KP To"]].reset_index(drop = True).copy()
# Add the end row of route and the start KP
end_row = pd.DataFrame({"Point ID From": "End", "KP To": np.nan}, index = [99999])
route_df_temp = pd.concat([route_df_temp, end_row], ignore_index = True)
# Shift KP column 1 downwards and assign 0.0 to the first KP
route_df_temp["KP To"] = route_df_temp["KP To"].shift().fillna(0.0)
# Expand the KP array with 1000 intervals from 1000 to nearest maximum KP
route_df_temp = self.build_oper_kp_mesh_from_route(route_df_temp)
# Create the elements between each KP points
elem_array_temp = self.build_oper_element_kp_array(route_df_temp)
# Interpolate the RLT, pressure and temperature using KP and operating profile
self.interpolate_oper_profile_on_kp(elem_array_temp)
# Filter oper_df DataFrame based on loadcase and "KP To"
self.oper_df = self.oper_df.loc[
self.oper_df["KP"] <= self.route_ends_df["KP To"].iloc[-1]
].copy()
# Calculate the rolling mean of oper_df grouped by Loadcase Set
df_rolling_mean = self.oper_df.rolling(2).mean()
# Calculate the rolling difference of oper_df grouped by Loadcase Set
df_rolling_difference = self.oper_df.rolling(2).max() - self.oper_df.rolling(2).min()
# Assign the "Length" column in df_rolling_mean
df_rolling_mean["Length"] = df_rolling_difference["KP"]
# Reset the index of df_rolling_mean and drop the "level_2" index level
df_rolling_mean = df_rolling_mean.reset_index(drop=True)
# Drop rows with NaN values
df_rolling_mean = df_rolling_mean.dropna()
self.oper_df = df_rolling_mean.copy()
[docs]
def calc_scenario_data(self):
"""
Calculate scenario data based on route, pipe, operating, and soil data.
Parameters
----------
route_df : pandas.DataFrame
DataFrame containing route data.
pipe_df : pandas.DataFrame
DataFrame containing pipe data.
oper_df : pandas.DataFrame
DataFrame containing operating data.
soil_df : pandas.DataFrame
DataFrame containing soil data.
Returns
-------
df: pandas.DataFrame
DataFrame containing the calculated scenario data.
Notes
-----
This function merges route, pipe, operating, and soil data to compute various scenario
parameters. It calculates various attributes such as lognormal distributions,
buckling forces, and section counts. The resulting DataFrame includes a subset of
calculated columns and is filled with 0 for missing values.
"""
# Merge operating data with route data using an asof merge to align KPs and route segments
temp_df = pd.merge_asof(
left=self.oper_df,
right=self.route_df,
left_on="KP",
right_on="KP From",
direction="backward",
)
# Merge resulting DataFrame with pipe data based on Pipe Set
temp_df = pd.merge(
left=temp_df,
right=self.pipe_df,
left_on="Pipe Set",
right_on="Pipe Set"
)
# Merge resulting DataFrame with soil data based on Friction Set
temp_df = pd.merge(
left=temp_df,
right=self.soil_df,
left_on="Friction Set",
right_on="Friction Set"
)
# Compute lognormal distributions for soil properties and assign to DataFrame columns
temp_df["HOOS X Array"], temp_df["HOOS CDF Array"] = zip(
*temp_df.apply(
lambda x: calc_lognorm_hoos(
x["Route Type"],
x["Length"],
x["HOOS Mean"],
x["HOOS STD"],
x["HOOS Reference Length"],
x.get("RCM Buckling Force", np.nan),
),
axis=1
).apply(np.array)
)
# Compute various buckling forces based on calculated parameters
temp_df["FRF HT"] = (
temp_df["RLT"] +
temp_df["E"] * temp_df["Alpha"] * temp_df["As"] * (temp_df["Temperature Hydrotest"] - temp_df["Temperature Installation"]) +
(1 - 2 * temp_df["Poisson"]) * (temp_df["Pressure Hydrotest"] - temp_df["Pressure Installation"]) * temp_df["Ai"]
)
temp_df["FRF OP"] = (
temp_df["RLT"] +
temp_df["E"] * temp_df["Alpha"] * temp_df["As"] * (temp_df["Temperature Operation"] - temp_df["Temperature Installation"]) +
(1 - 2 * temp_df["Poisson"]) * (temp_df["Pressure Operation"] - temp_df["Pressure Installation"]) * temp_df["Ai"]
)
temp_df["FRF OP Pressure"] = (
temp_df["RLT"] +
(1 - 2 * temp_df["Poisson"]) * temp_df["Pressure Operation"] * temp_df["Ai"]
)
temp_df["FRF OP Temperature"] = (
temp_df["E"] * temp_df["As"] * temp_df["Alpha"] * (temp_df["Temperature Operation"] - temp_df["Temperature Installation"])
)
# Calculate the hydrotest and operation buckling forces (Sv)
sleeper_height = temp_df.get("Sleeper Height", pd.Series(np.nan, index=temp_df.index))
temp_df["Sv HT"] = 4.0 * np.sqrt(temp_df["E"] * temp_df["I"] * temp_df["sw Hydrotest"] / sleeper_height)
temp_df["Sv OP"] = 4.0 * np.sqrt(temp_df["E"] * temp_df["I"] * temp_df["sw Operation"] / sleeper_height)
# Calculate section-related parameters
temp_df["KP Section"] = temp_df["KP"] - temp_df["KP From"]
temp_df["Reference Section"] = (temp_df["KP Section"] / temp_df["HOOS Reference Length"]).apply(np.floor)
temp_df["Section Count"] = 0.0
temp_df.loc[
(temp_df["Route Type"] != temp_df["Route Type"].shift()) |
(temp_df["Reference Section"] != temp_df["Reference Section"].shift()), "Section Count"
] = 1.0
temp_df["Section Count"] = temp_df["Section Count"].cumsum()
# Calculate the residual buckle length and force for hydrotest and operation
if "RCM Buckling Force" not in temp_df.columns:
temp_df["RCM Buckling Force"] = np.nan
# Select relevant columns and rename them for clarity
temp_df = temp_df[[
"KP", "Length", "Route Type", "KP From", "KP To", "Point ID From", "Point ID To",
"Bend Radius", "muax Array", "muax CDF Array",
"mul HT Array", "mul HT CDF Array", "mul OP Array", "mul OP CDF Array",
"HOOS X Array", "HOOS CDF Array", "sw Installation", "sw Hydrotest", "sw Operation",
"SChar HT", "SChar OP", "Sv HT", "Sv OP", "RCM Buckling Force", "RLT", "FRF HT",
"FRF OP Pressure", "FRF OP Temperature", "FRF OP", "Residual Buckle Length Hydrotest",
"Residual Buckle Force Hydrotest", "Residual Buckle Length Operation",
"Residual Buckle Force Operation", "Section Count", "KP Section", "Reference Section",
"Axial Mean", "Lateral Hydrotest Mean", "Lateral Operation Mean", "HOOS Mean"
]]
temp_df = temp_df.rename(columns={
"sw Installation": "sw IN",
"sw Hydrotest": "sw HT",
"sw Operation": "sw OP",
"Residual Buckle Length Hydrotest": "buckleLength HT",
"Residual Buckle Force Hydrotest": "buckleEAF HT",
"Residual Buckle Length Operation": "buckleLength OP",
"Residual Buckle Force Operation": "buckleEAF OP"
})
# Convert route type strings to numerical representation
temp_df.loc[temp_df["Route Type"] == "Straight", "Route Type"] = 1
temp_df.loc[temp_df["Route Type"] == "Bend", "Route Type"] = 2
temp_df.loc[temp_df["Route Type"] == "Sleeper", "Route Type"] = 3
temp_df.loc[temp_df["Route Type"] == "RCM", "Route Type"] = 4
temp_df["Route Type"] = temp_df["Route Type"].astype(float)
# Fill missing values with 0
temp_df = temp_df.fillna(0)
# Add scenario parameters to the DataFrame
temp_df["Pipeline"] = self.scen_df["Pipeline"].values[0]
temp_df["Scenario"] = self.scen_df["Scenario"].values[0]
temp_df["Layout Set"] = self.scen_df["Layout Set"].values[0]
temp_df["Simulations"] = self.scen_df["Simulations"].values[0]
temp_df["Friction Sampling"] = self.scen_df["Friction Sampling"].values[0]
temp_df["Char. Friction Prob."] = self.scen_df["Char. Friction Prob."].values[0]
self.scen_df = temp_df.copy()
[docs]
def calc_pp_data(self):
"""
Calculate post-processing data set for a given layout set.
Parameters
----------
df : pandas.DataFrame
DataFrame containing post-processing data.
np_array : numpy.ndarray
NumPy array containing pipeline end boundary conditions.
pipeline_id : str
Identifier of the pipeline.
layout_set : str
Identifier of the layout set.
Returns
-------
df : pandas.DataFrame
DataFrame containing calculated post-processing data.
Notes
-----
This function filters the DataFrame based on the layout set. It resets the index, renames
columns, and selects relevant columns. Adjusts the last 'KP_to' value if it is smaller
than the maximum value in np_array. Converts data types of columns to appropriate numeric
types.
"""
# Reset index, rename columns, and select relevant columns
self.pp_df = self.pp_df.reset_index(drop=True).rename(columns={
'Post-Processing Set': 'pp_set',
'KP From': 'KP_from',
'KP To': 'KP_to',
'Post-Processing Description': 'description'
})
self.pp_df = self.pp_df[
['pp_set', 'KP_from', 'KP_to', 'description', 'Characteristic VAS Probability']
]
# Convert columns to appropriate numeric types
self.pp_df['pp_set'] = self.pp_df['pp_set'].astype(np.int64)
self.pp_df['KP_from'] = self.pp_df['KP_from'].astype(np.float64)
self.pp_df['KP_to'] = self.pp_df['KP_to'].astype(np.float64)
[docs]
def calc_monte_carlo_data(self):
"""
Convert the scenario data and end boundary conditions data to NumPy arrays for
Monte Carlo simulations.
Parameters
----------
scen_df : pandas.DataFrame
DataFrame containing the scenario data.
route_ends_df : pandas.DataFrame
DataFrame containing the end boundary conditions data.
Returns
-------
dist_np : numpy.ndarray
2D array with probabilistic distributions (rows) along the route mesh (columns).
scen_np : numpy.ndarray
2D array with scenario properties (rows) along the route mesh (columns).
ends_np : numpy.ndarray
2D array with end properties (rows) for the ends.
Notes
-----
The arrays have the following row layout (index : meaning):
scen_np:
- 0 : KP
- 1 : LENGTH
- 2 : ROUTE_TYPE
- 3 : BEND_RADIUS
- 4 : SW_INST
- 5 : SW_HT
- 6 : SW_OP
- 7 : SCHAR_HT
- 8 : SCHAR_OP
- 9 : SV_HT
- 10 : SV_OP
- 11 : CBF_RCM
- 12 : RLT
- 13 : FRF_HT
- 14 : FRF_P_OP
- 15 : FRF_T_OP
- 16 : FRF_OP
- 17 : L_BUCKLE_HT
- 18 : EAF_BUCKLE_HT
- 19 : L_BUCKLE_OP
- 20 : EAF_BUCKLE_OP
- 21 : SECTION_ID
- 22 : SECTION_KP
- 23 : SECTION_REF
- 24 : MUAX_MEAN
- 25 : MULAT_HT_MEAN
- 26 : MULAT_OP_MEAN
- 27 : HOOS_MEAN
dist_np:
- 0 : MUAX_ARRAY
- 1 : MUAX_CDF_ARRAY
- 2 : MULAT_ARRAY_HT
- 3 : MULAT_CDF_ARRAY_HT
- 4 : MULAT_ARRAY_OP
- 5 : MULAT_CDF_ARRAY_OP
- 6 : HOOS_ARRAY
- 7 : HOOS_CDF_ARRAY
ends_np:
- 0 : ROUTE_TYPE
- 1 : KP_FROM
- 2 : KP_TO
- 3 : REAC_INST
- 4 : REAC_HT
- 5 : REAC_OP
"""
# Create a list to store the distribution arrays and define their column labels
dist_list = []
dist_list_columns = [
"muax Array",
"muax CDF Array",
"mul HT Array",
"mul HT CDF Array",
"mul OP Array",
"mul OP CDF Array",
"HOOS X Array",
"HOOS CDF Array"
]
# Loop through the distribution columns and convert each column to a list
for list_label in dist_list_columns:
dist_list_temp = []
for i in range(self.scen_df[list_label].size):
dist_list_temp.append(self.scen_df[list_label][i])
dist_list.append(dist_list_temp)
# Convert the list of distribution arrays to a NumPy array
self.dist_np = np.array(dist_list, dtype="float64")
# Add extra columns to remove
dist_array_columns_drop = [
"Pipeline", "Scenario", "Simulations", "Friction Sampling", "Char. Friction Prob.",
"KP From", "KP To", "Point ID From", "Point ID To"
]
dist_array_columns_drop = np.append(dist_array_columns_drop, dist_list_columns)
# Convert scenario properties to numpy array
self.scen_np = self.scen_df.drop(dist_array_columns_drop, axis=1).to_numpy().transpose()
# Convert end properties to numpy array
self.ends_np = self.route_ends_df.to_numpy().transpose()
[docs]
def apply_route_mitigation(self):
"""
Function to combine rows from route and mitigation, then sort by KP From.
Parameters
----------
route_df : pandas Dataframe
Dataframe containing the route data.
mitigation_df : pandas Dataframe
Dataframe containing the mitigation data.
Returns
-------
route_df : pandas Dataframe
Dataframe containing the combined route and mitigation data, sorted by KP From.
"""
rows = []
for _, r in self.route_df.iterrows():
# Route segment start and end KP and point IDs
seg_start = r["KP From"]
seg_end = r["KP To"]
seg_from_point = r["Point ID From"]
# Mitigation rows that overlap this route segment
overlaps = self.mitigation_df[
(self.mitigation_df["KP To"] > seg_start) &
(self.mitigation_df["KP From"] < seg_end)
].sort_values("KP From")
for _, m in overlaps.iterrows():
# Calculate the overlapping KP range between the route and mitigation
m_from = max(seg_start, m["KP From"])
m_to = min(seg_end, m["KP To"])
if m_to <= m_from:
continue
# Part before mitigation
if m_from > seg_start:
pre = r.copy()
pre["KP From"] = seg_start
pre["KP To"] = m_from
pre["Point ID From"] = seg_from_point
pre["Point ID To"] = m["Point ID From"]
rows.append(pre)
# Mitigation part (override key fields from mitigation)
mid = r.copy()
mid["KP From"] = m_from
mid["KP To"] = m_to
# Copy every mitigation column except the KP boundaries, which are determined
# by the overlap with the route segment.
for col in m.index:
if col not in {"KP From", "KP To"}:
mid[col] = m[col]
rows.append(mid)
seg_start = m_to
seg_from_point = m["Point ID To"]
# Part after last mitigation
if seg_start < seg_end:
post = r.copy()
post["KP From"] = seg_start
post["KP To"] = seg_end
post["Point ID From"] = seg_from_point
rows.append(post)
self.route_df = (
pd.DataFrame(rows)
.sort_values("KP From", kind="mergesort")
.reset_index(drop=True)
)
[docs]
def apply_route_soil_zoning(self):
"""
Function to combine rows from route and soil zoning, then sort by KP From.
Parameters
----------
route_df : pandas Dataframe
Dataframe containing the route data.
soil_zoning_df : pandas Dataframe
Dataframe containing the soil zoning data.
Returns
-------
route_df : pandas Dataframe
Dataframe containing the combined route and soil zoning data, sorted by KP From.
"""
# Copy the route and soil zoning dataframes
route = self.route_df.copy()
zones_all = self.soil_zoning_df.copy()
zones = zones_all.iloc[1:].copy()
base_friction = zones_all.iloc[0]["Friction Set"]
rows = []
for _, r in route.iterrows():
original_start = r["KP From"]
original_end = r["KP To"]
seg_start = r["KP From"]
seg_end = r["KP To"]
current_friction = base_friction
# Zones overlapping this route segment
overlaps = zones[
(zones["KP To"] > seg_start) &
(zones["KP From"] < seg_end)
].sort_values("KP From")
# No overlap: keep whole segment with current/base friction
if overlaps.empty:
row = r.copy()
row["Friction Set"] = current_friction
rows.append(row)
continue
for _, z in overlaps.iterrows():
z_from = max(seg_start, z["KP From"])
z_to = min(seg_end, z["KP To"])
if z_to <= z_from:
continue
# Before zone: keep previous friction
if z_from > seg_start:
pre = r.copy()
pre["KP From"] = seg_start
pre["KP To"] = z_from
pre["Friction Set"] = current_friction
pre["Point ID From"] = (
r["Point ID From"] if seg_start == original_start else "Soil Change"
)
pre["Point ID To"] = "Soil Change"
rows.append(pre)
# Inside zone: apply zone friction
mid = r.copy()
mid["KP From"] = z_from
mid["KP To"] = z_to
mid["Friction Set"] = z["Friction Set"]
mid["Point ID From"] = (
r["Point ID From"] if z_from == original_start else "Soil Change"
)
mid["Point ID To"] = (
r["Point ID To"] if z_to == original_end else "Soil Change"
)
rows.append(mid)
seg_start = z_to
current_friction = z["Friction Set"]
# Tail after last overlapping zone
if seg_start < seg_end:
post = r.copy()
post["KP From"] = seg_start
post["KP To"] = seg_end
post["Friction Set"] = current_friction
post["Point ID From"] = (
r["Point ID From"] if seg_start == original_start else "Soil Change"
)
post["Point ID To"] = r["Point ID To"]
rows.append(post)
self.route_df = (
pd.DataFrame(rows)
.sort_values("KP From", kind="mergesort")
.reset_index(drop=True)
)
[docs]
def build_oper_kp_mesh_from_route(self, route_df):
"""
Function to expand the KP array with 1000 intervals from 1000 to nearest maximum KP.
Parameters
----------
route_df : pandas Dataframe
Dataframe containing the route data.
Returns
-------
route_df : pandas Dataframe
Dataframe containing the route data with expanded KP values, calculated lengths,
element numbers, and element sizes.
"""
# Rename kp_col to "KP From"
route_df = route_df.rename(columns = {"KP To": "KP From"})
# Expand the KP array with 1000 intervals from 1000 to nearest maximum KP
max_kp = np.floor(route_df["KP From"].max() / 1000.0) * 1000.0
kp_array = np.arange(1000, max_kp + 1.0, 1000)
# Create a dataframe for the expanded kp
expand_df = pd.DataFrame({"Point ID From": [np.nan] * len(kp_array), "KP From": kp_array})
route_df = pd.concat(
[route_df, expand_df], ignore_index = True
).sort_values(by = "KP From").drop_duplicates("KP From").reset_index(drop = True).ffill()
# Calculate relative length between KP and KP To
route_df["KP To"] = route_df["KP From"].shift(-1)
route_df = route_df.dropna()
route_df["Length"] = route_df["KP To"] - route_df["KP From"]
# Calculate element number and element size
route_df["Elem No."] = np.ceil(route_df["Length"] / 100.0)
route_df["Elem Size"] = route_df["Length"] / route_df["Elem No."]
return route_df
[docs]
def build_oper_element_kp_array(self, route_df):
"""
Function to create element array based on KP, KP TO and element number.
Parameters
----------
route_df : pandas Dataframe
Dataframe containing the route data with expanded KP values, calculated lengths,
element numbers, and element sizes.
Returns
-------
elem_array : numpy Array
"""
# Create the elements between each KP points
elem_values = []
for _, x in route_df.iterrows():
elem_values.extend(
np.linspace(x["KP From"], x["KP To"], int(x["Elem No."] + 1.0))
)
# Convert the list of element values to a NumPy array, remove duplicates and NaN values
elem_array = np.array(elem_values, dtype=float)
elem_array = np.unique(elem_array)
elem_array = elem_array[~np.isnan(elem_array)]
return elem_array
[docs]
def interpolate_oper_profile_on_kp(self, elem_array):
"""
Function to interpolate the RLT, pressure and temperature using KP and operating profile.
Parameters
----------
elem_array : numpy Array
Array containing the KP values for interpolation.
Returns
-------
oper_df : pandas Dataframe
Dataframe containing the interpolated RLT, pressure and temperature values based on KP and operating profile.
"""
# Define the columns to interpolate
interp_columns = [
"Pressure Installation",
"Pressure Hydrotest",
"Pressure Operation",
"Temperature Installation",
"Temperature Hydrotest",
"Temperature Operation",
"RLT",
]
# Create a dataframe for the interpolated values
interp_df= pd.DataFrame({"KP": elem_array})
# Interpolate the RLT, pressure and temperature using KP and operating profile
for column in interp_columns:
interp_df[column] = np.interp(interp_df["KP"], self.oper_df["KP"], self.oper_df[column])
self.oper_df = interp_df.copy()