import geopandas as gpd
import pandas as pd
from shapely.geometry import LineString,MultiLineString
import re
from collections import Counter
import numpy as np
[docs]
class Streets_adj():
'''
A class used to represent and manipulate street geometries.
This class provides methods to round the coordinates of street geometries and to add a boolean column
indicating possible routes.
Attributes
----------
gdf : GeoDataFrame
A GeoDataFrame containing the street geometries.
Methods
-------
round_streets():
Rounds the coordinates of street geometries to 3 decimal places.
add_bool_column():
Adds a boolean column indicating possible routes.
'''
def __init__(self, path):
'''
Initializes the Streets_adj class with a GeoDataFrame of street geometries.
Parameters
----------
path : str
The file path to the shapefile containing the street geometries.
'''
self.gdf = gpd.read_file(path)
[docs]
def round_streets(self):
'''
Rounds the coordinates of street geometries to 3 decimal places.
This method processes the geometries in the GeoDataFrame, converting any MultiLineString geometries
to LineString geometries, and then rounds their coordinates to 3 decimal places.
Notes
-----
- If any MultiLineString geometries are found, they are converted to LineString geometries by taking
the first component of the MultiLineString.
- A warning is printed if any MultiLineString geometries are found and processed.
'''
# Helper functions
def convert_multilinestring_to_linestring(geometry, x):
'''
Converts a MultiLineString geometry to a LineString geometry.
Parameters
----------
geometry : shapely.geometry.MultiLineString or shapely.geometry.LineString
The geometry to be converted.
x : int
A counter for tracking the number of MultiLineString geometries found.
Returns
-------
shapely.geometry.LineString
The converted LineString geometry.
int
The updated counter.
'''
if isinstance(geometry, MultiLineString):
x += 1
return LineString(list(geometry.geoms)[0].coords), x
else:
return geometry, x
def round_coordinates(line):
'''
Rounds the coordinates of a LineString geometry to 3 decimal places.
Parameters
----------
line : shapely.geometry.LineString
The LineString geometry to be rounded.
Returns
-------
shapely.geometry.LineString
The rounded LineString geometry.
'''
rounded_coords = [(round(x, 3), round(y, 3)) for x, y in line.coords]
return LineString(rounded_coords)
streets = self.gdf
x = 0
streets['geometry'], x = zip(*streets['geometry'].apply(lambda geom: convert_multilinestring_to_linestring(geom, x)))
streets['geometry'] = streets['geometry'].apply(round_coordinates)
if max(x) > 0:
print('At least one street geometry is a MultiLineString! Continuing with the first LineString as the street. Check the street geometry if necessary.')
self.gdf = streets
[docs]
def add_bool_column(self):
'''
Adds a boolean column to the GeoDataFrame indicating possible routes.
This method adds a new column 'Moegliche_Route' (= possible route) to the GeoDataFrame, initialized with the value 1 for all rows.
'''
self.gdf['Moegliche_Route'] = 1
[docs]
class Buildings_adj():
'''
A class to represent and manipulate building geometries and attributes.
This class provides methods for adding load profiles, filtering buildings, adding power attributes,
classifying buildings by age, and merging building data with custom aggregations. It also allows for
the integration of custom heat demand data based on building characteristics.
Attributes
----------
gdf : GeoDataFrame
A GeoDataFrame containing building geometries and associated attributes.
heat_att : str
The attribute name for heat data in the GeoDataFrame.
Methods
-------
add_Vlh_Loadprofile(excel_data):
Adds full load hours (Vlh) and load profiles to the buildings based on external Excel data.
drop_unwanted():
Removes buildings that do not have a load profile.
add_power():
Adds a power attribute to the buildings based on the heat attribute and full load hours.
extract_year(date_str):
Extracts the year from a date string.
add_BAK(bins, labels):
Classifies buildings into age groups based on the provided bins and labels.
add_LANUV_age_and_type():
Extracts building age and type from the 'GEBAEUDETY' attribute and adds them as new columns.
merge_buildings():
Merges building geometries and attributes, performing custom aggregations.
add_custom_heat_demand(building_data):
Adds custom heat demand data to the existing GeoDataFrame based on building characteristics.
'''
def __init__(self, path, heat_att):
'''
Initializes the Buildings_adj class with a GeoDataFrame of building geometries and attributes.
Parameters
----------
path : str
The file path to the shapefile containing the building geometries.
heat_att : str
The attribute name for heat data.
'''
self.gdf = gpd.read_file(path)
self.heat_att = heat_att
[docs]
def add_Vlh_Loadprofile(self, excel_data):
'''
Adds full load hours and load profiles to the buildings.
This method merges the building data with external Excel data containing full load hours (Volllaststunden Vlh)
and load profiles based on the 'citygml_fu' attribute of the buildings.
Parameters
----------
excel_data : DataFrame
A DataFrame containing the load profile data with 'Funktion', 'Lastprofil', and 'Vlh' columns.
'''
buildings = self.gdf
excel_data['Funktion'] = excel_data['Funktion'].astype(str)
buildings['GFK_last_four'] = buildings['citygml_fu'].str[-4:]
buildings = buildings.merge(excel_data[['Funktion', 'Lastprofil', 'Vlh']], left_on='GFK_last_four', right_on='Funktion', how='left')
# Delete the temporary column and the 'Funktion' column
buildings.drop(columns=['GFK_last_four'], inplace=True)
try:
buildings.drop(columns=['Funktion'], inplace=True)
except:
buildings.drop(columns=['Funktion_y'], inplace=True)
# EFH and MFH share the same ALKIS-function 1010
buildings['Lastprofil'] = np.where(
(buildings['Lastprofil'] == 'EFH') & (buildings['type'] != 'EFH'),
'MFH',
buildings['Lastprofil']
)
self.gdf = buildings
[docs]
def drop_unwanted(self):
'''
Drops buildings that do not have a load profile.
This method removes all buildings that do not have a load profile, as these are buildings that
are not needed or have other issues.
'''
b = self.gdf
b = b[b['Lastprofil'].notna()]
b = b.copy() # Suppress a false warning
self.gdf = b
[docs]
def add_power(self):
'''
Adds a power attribute to the buildings.
This method calculates the power attribute for each building based on the heat attribute and full load hours.
If full load hours (Vlh) are zero, it uses a default value of 1600.
'''
buildings = self.gdf
buildings['power_th'] = buildings[self.heat_att] / buildings['Vlh'].where(buildings['Vlh'] != 0, 1600) # Default to 1600 if Vlh is 0
self.gdf = buildings
[docs]
def add_BAK(self,bins,labels):
'''
Adds building age classification based on the provided bins and labels.
This method classifies buildings into age groups based on the 'validFrom' attribute using the
provided bins and labels.
Parameters
----------
bins : list of int
The bin edges for classifying buildings by age.
labels : list of str
The labels for the age bins.
Examples
--------
>>> bins = [1800, 1900, 1950, 2000, 2024]
>>> labels = ["1800-1899", "1900-1949", "1950-1999", "2000-2024"]
'''
# Convert the validFrom attribute to year
self.gdf['jahr'] = self.gdf['validFrom'].apply(self.extract_year)
# Classify buildings into age groups (Baualtersklassen BAK)
self.gdf['BAK'] = pd.cut(self.gdf['jahr'], bins=bins, labels=labels, right=True)
self.gdf['BAK'] = self.gdf['BAK'].astype(str)
self.gdf.drop(columns=['jahr'], inplace = True)
[docs]
def add_LANUV_age_and_type(self):
'''
This method extracts the building age and type from the 'GEBAEUDETY' attribute and adds it as two new columns.
'''
self.gdf[['type', 'age_LANUV']] = self.gdf['GEBAEUDETY'].str.split('_', expand=True)
# set type = NWG (Nichtwohngebaeude) for all NWG
self.gdf.loc[self.gdf['WG_NWG'] == 'NWG', 'type'] = 'NWG'
[docs]
def merge_buildings(self):
'''
Merges building geometries and attributes, performing custom aggregations.
This method dissolves the building geometries based on 'Flurstueck', 'citygml_fu', and 'Fortschrei'
attributes and performs custom aggregations on the attributes.
'''
# Aggregation functions
def custom_agg_mix_str(s):
'''
Aggregates string attributes by returning the unique value or 'mix' if there are multiple unique values.
Parameters
----------
s : Series
The series to aggregate.
Returns
-------
str
The aggregated value.
'''
unique_vals = s.unique()
if len(unique_vals) == 1:
return unique_vals[0]
else:
return 'mix'
def custom_agg_mix_numeric(s):
'''
Aggregates numeric attributes by returning the unique value or None if there are multiple unique values.
Parameters
----------
s : Series
The series to aggregate.
Returns
-------
int, float, or None
The aggregated value.
'''
unique_vals = s.unique()
if len(unique_vals) == 1:
return unique_vals[0]
else:
return None
def custom_agg_most_common(s):
'''
Aggregates by returning the most common value.
Parameters
----------
s : Series
The series to aggregate.
Returns
-------
int, float, or str
The most common value.
'''
most_common = s.mode()
if len(most_common) > 0:
return most_common.iloc[0]
else:
return s.iloc[0] # Fallback if no mode is found (should not occur in practice)
def mode_or_string(x):
'''
Aggregates by returning the most common value or a comma-separated string if there are ties.
Parameters
----------
x : Series
The series to aggregate.
Returns
-------
str
The aggregated value.
'''
counts = Counter(x)
max_count = max(counts.values())
max_list = [val for val, count in counts.items() if count == max_count]
if len(max_list) == 1:
return str(max_list[0])
else:
sorted_list = sorted(max_list)
return ', '.join(map(str, sorted_list))
def weighted_average(s, weights):
'''
Calculates the weighted average.
Parameters
----------
s : Series
Series of values (e.g., RW_spez).
weights : Series
Series of weights (e.g., NF).
Returns
-------
float
The weighted average.
'''
return (s * weights).sum() / weights.sum()
grouped_gdf = self.gdf.dissolve(
by=['Flurstueck', 'citygml_fu', 'Fortschrei', 'type'],
as_index=False,
aggfunc={
'Fest_ID': 'first',
'Nutzung': 'first',
'NF': 'sum',
'RW_spez': lambda x: weighted_average(x, self.gdf.loc[x.index, 'NF']),
'RW': 'sum',
'WW_spez': lambda x: weighted_average(x, self.gdf.loc[x.index, 'NF']),
'WW': 'sum',
'RW_WW_spez': lambda x: weighted_average(x, self.gdf.loc[x.index, 'NF']),
'RW_WW': 'sum',
'age_LANUV': mode_or_string,
})
self.gdf = grouped_gdf
[docs]
def add_custom_heat_demand(self, wg_data, nwg_data):
'''
Adds custom heat demand data to the existing GeoDataFrame based on building characteristics from two datasets.
This method merges the existing GeoDataFrame with two external DataFrames (`wg_data` and `nwg_data`) to assign
specific heat demand values for each building type (e.g., MFH or EFH) or function. The specific heat demand
values are used to calculate the total heat demand for each building.
Parameters
----------
wg_data : pd.DataFrame
A DataFrame containing building-specific heat demand data. It must include the following columns:
- 'Baualtersklasse': Building age class.
- 'Waerme_MFH kWh/m²·a': Specific heat demand for multi-family houses (MFH) in kWh/m²·a.
- 'Waerme_EFH kWh/m²·a': Specific heat demand for single-family houses (EFH) in kWh/m²·a.
nwg_data : pd.DataFrame
A DataFrame containing additional building data. It must include the following columns:
- 'Funktion': Function classification of the building.
- 'WVBRpEBF': Specific heat demand value based on the building function.
Notes
-----
The method assumes that 'Lastprofil' in the GeoDataFrame specifies the building type as either 'MFH' or 'EFH'.
Buildings not classified as 'MFH' or 'EFH' will receive their heat demand value from the 'WVBRpEBF' column in
`nwg_data`. If 'WVBRpEBF' is not available, the resulting demand will be NaN.
The total heat demand is calculated by multiplying the net floor area ('NF') by the assigned specific heat demand.
'''
buildings = self.gdf
# merge with wg_data
merge1 = buildings.merge(
wg_data[['Baualtersklasse', 'Waerme_MFH kWh/m²·a', 'Waerme_EFH kWh/m²·a']],
left_on='BAK',
right_on='Baualtersklasse',
how='left'
)
# merge with nwg_data
merge1['GFK_last_four'] = merge1['citygml_fu'].str[-4:]
merged_df = merge1.merge(
nwg_data[['Funktion', 'WVBRpEBF']],
left_on='GFK_last_four',
right_on='Funktion',
how='left'
)
# Conditional assignment of specific heat demands
merged_df['Spez_Waermebedarf'] = np.where(
merged_df['Lastprofil'] == 'MFH',
merged_df['Waerme_MFH kWh/m²·a'],
np.where(
merged_df['Lastprofil'] == 'EFH',
merged_df['Waerme_EFH kWh/m²·a'],
merged_df['WVBRpEBF']
)
)
# try to delete unwanted columns
try:
merged_df.drop(columns=['Baualtersklasse', 'Waerme_MFH kWh/m²·a', 'Waerme_EFH kWh/m²·a', 'Funktion', 'WVBRpEBF', 'GFK_last_four'], inplace=True)
except:
pass
# calculate 'Waermebedarf'
merged_df['Waermebedarf'] = merged_df['NF'] * merged_df['Spez_Waermebedarf']
self.gdf = merged_df
def add_connect_option(self):
buildings = self.gdf
buildings['Anschluss'] = 1
[docs]
def rename_and_order_columns(self):
'''renames and order the columns of the buildings data frame.'''
old_df = self.gdf
buildings = pd.DataFrame({
'new_ID': old_df['new_ID'],
'Anschluss': old_df['Anschluss'],
'Funktion': old_df['citygml_fu'],
'Nutzung': old_df['Nutzung'],
'typ': old_df['type'],
'NF [m²]': old_df['NF'],
'RW_spez [kWh/a*m²]': old_df['RW_spez'],
'RW [kWh/a]': old_df['RW'],
'WW_spez [kWh/a*m²]': old_df['WW_spez'],
'WW [kWh/a]': old_df['WW'],
'RW_WW_spez [kWh/a*m²]': old_df['RW_WW_spez'],
'RW_WW [kWh/a]': old_df['RW_WW'],
'Leistung_th [kW]': old_df['power_th'],
'Vlh [h]': old_df['Vlh'],
'Lastprofil': old_df['Lastprofil'],
'Alter_LANUV': old_df['age_LANUV'],
'Alter_Flurstueck': old_df['validFrom'].str[:4],
'BAK nach Flurstueck': old_df['BAK'],
'Spez_WB [kWh/a*m²]': old_df['Spez_Waermebedarf'],
'WB [kWh/a]': old_df['Waermebedarf'],
'geometry': old_df['geometry']
})
# change into geodataframe
buildings = gpd.GeoDataFrame(buildings, geometry='geometry', crs=old_df.crs)
self.gdf = buildings
[docs]
class Parcels_adj():
'''
A class used to represent and manipulate parcel geometries.
This class provides a method to initialize a GeoDataFrame of parcel geometries from a shapefile.
Attributes
----------
gdf : GeoDataFrame
A GeoDataFrame containing the parcel geometries.
Methods
-------
__init__(path):
Initializes the Parcels_adj class with a GeoDataFrame of parcel geometries.
'''
def __init__(self, path):
'''
Initializes the Parcels_adj class with a GeoDataFrame of parcel geometries.
Parameters
----------
path : str
The file path to the shapefile containing the parcel geometries.
'''
self.gdf = gpd.read_file(path)
[docs]
def spatial_join(shape1, shape2, attributes):
'''
Performs a spatial join to add attributes from shape2 to the best fitting feature in shape1.
This function finds the best fitting feature in `shape2` that intersects with each feature in `shape1`
based on the intersection area. It then adds the specified attributes from `shape2` to `shape1`.
Parameters
----------
shape1 : GeoDataFrame
The GeoDataFrame to which attributes will be added.
shape2 : GeoDataFrame
The GeoDataFrame from which attributes will be sourced.
attributes : list of str
List of attribute names to be transferred from `shape2` to `shape1`.
Returns
-------
GeoDataFrame
The updated `shape1` GeoDataFrame with the specified attributes added from `shape2`.
Notes
-----
If columns named 'index_left' or 'index_right' exist in either `shape1` or `shape2`,
they will be removed to avoid conflicts during the spatial join.
If an attribute specified in the `attributes` list does not exist in `shape2`, the function
will attempt to use a column named `{attribute}_left` instead and will print a message
indicating the update.
Examples
--------
>>> shape1 = gpd.read_file("path/to/shape1.shp")
>>> shape2 = gpd.read_file("path/to/shape2.shp")
>>> attributes = ["attr1", "attr2"]
>>> updated_shape1 = spatial_join(shape1, shape2, attributes)
'''
# Überprüfen, ob Spalten index_left und index_right vorhanden sind und sie gegebenenfalls entfernen
if 'index_left' in shape1.columns:
shape1 = shape1.drop(columns=['index_left'])
print('index_left was removed from shape1 to execute the spatial join')
if 'index_right' in shape1.columns:
shape1 = shape1.drop(columns=['index_right'])
print('index_right was removed from shape1 to execute the spatial join')
if 'index_left' in shape2.columns:
shape2 = shape2.drop(columns=['index_left'])
print('index_left was removed from shape2 to execute the spatial join')
if 'index_right' in shape2.columns:
shape2 = shape2.drop(columns=['index_right'])
print('index_right was removed from shape2 to execute the spatial join')
# Räumlichen Join durchführen
joined = gpd.sjoin(shape1, shape2, how='inner', predicate='intersects')
# Schnittfläche für jedes überlappende Paar berechnen
joined['intersection_area'] = joined.apply(lambda row: shape1.geometry.iloc[row.name].intersection(shape2.geometry.iloc[row['index_right']]).area, axis=1)
# Ergebnisse basierend auf der Schnittfläche sortieren
sorted_joined = joined.sort_values(by='intersection_area', ascending=False)
# Den Eintrag mit der größten Schnittfläche für jedes Gebäude-Feature behalten
max_intersection = sorted_joined.groupby(sorted_joined.index).first()
# Attribute übertragen
for attr in attributes:
try:
shape1[attr] = max_intersection[attr]
except:
shape1[attr] = max_intersection[(attr+'_left')]
print(f'{attr} got updated during spatial join')
return shape1