river-sim/main.py
2024-12-02 10:18:14 +08:00

207 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import struct
import pandas as pd
import geopandas as gpd
from datetime import datetime, timedelta
import json
from datetime import datetime, timedelta
import numpy as np
from shapely.geometry import mapping
from color import create_color_gradient_mapping, get_color_from_value1
x_min = 10
x_max = 3500
a = 0.25
n = 19
val = (np.linspace((x_min**a), (x_max**a), n)) ** (1 / a)
hours = 168
# 生成颜色映射
color_mapping = create_color_gradient_mapping(val)
def convert_coordinates(coord_list):
result = []
for coord in coord_list:
result.extend([coord[0], coord[1], 0])
return result
def read_binary_file(file_path):
df = pd.DataFrame()
with open(file_path, "rb") as file:
index = 0
# 读取前8个字节表示数据列数
column_bytes = file.read(8)
if not column_bytes:
return # 文件结束
column_count = struct.unpack("<d", column_bytes)[0]
column_count_int = int(column_count)
print(f"Column Count: {column_count_int}")
# 读取接下来的8个字节表示日期
date_bytes = file.read(8)
date = struct.unpack("<d", date_bytes)[0]
print(f"Date: {date}")
while True:
# print("index", index)
if index > hours:
break
time_bytes = file.read(8)
time = struct.unpack("<d", time_bytes)[0]
# print(f"time: {time}")
# 读取数据矩阵
data_size = column_count_int * 8 # 每个数据点8个字节
data_bytes = file.read(data_size)
data_matrix = struct.unpack(f"<{column_count_int}d", data_bytes)
data_matrix = [data_matrix]
new_df = pd.DataFrame(data_matrix)
df = pd.concat([df, new_df], ignore_index=True)
# print(f"Data Matrix: {data_matrix}")
index = index + 1
return df
def read_shp(shpfile_path, level=4):
river_data = gpd.read_file(shpfile_path)
return river_data[river_data["Type"] == level]
def get_color(runoff, min_runoff=0, max_runoff=10):
"""根据径流量获取颜色,从蓝到红的渐变"""
normalized = (runoff - min_runoff) / (max_runoff - min_runoff)
red = int(normalized * 255)
blue = int((1 - normalized) * 255)
return [red, 0, blue, 255]
def format_time(dt):
"""格式化时间为ISO字符串"""
return dt.strftime("%Y-%m-%dT%H:%M:%SZ")
def generate_river_czml(river_df, runoff_df, level=4, start_time=None):
"""
生成河网径流时序变化的CZML文件
Parameters:
-----------
river_df : GeoDataFrame
河网数据包含geometry列(LINESTRING)
runoff_df : DataFrame
径流数据shape为(时间数, 河段数)
start_time : datetime, optional
起始时间,默认为'2010-07-20 00:00:00'
Returns:
--------
list : CZML文档
"""
if start_time is None:
start_time = datetime(2010, 7, 20)
# 创建CZML文档
czml = [
{
"id": "document",
"name": "River Flow Visualization",
"version": "1.0",
"clock": {
"interval": f"{format_time(start_time)}/{format_time(start_time + timedelta(hours=hours))}",
"currentTime": format_time(start_time),
"multiplier": 3600, # 1小时/秒
"range": "LOOP_STOP",
"step": "SYSTEM_CLOCK_MULTIPLIER",
},
}
]
print(river_df.index)
# 为每条河段创建一个entity
for idx in river_df.index:
river_id = river_df.loc[idx, "Index"]
print("river_index", river_id)
coords = mapping(river_df.loc[idx, "geometry"])["coordinates"]
# new_coords = [coord for point in coords for coord in point]
# print(new_coords)
new_coords = []
for coord in coords:
new_coords.extend([[round(x, 5), round(y, 5), 0] for x, y in coord])
# 获取该河段的所有时间点的径流数据
river_runoff = runoff_df.iloc[:, idx - 1].values
max_runoff = max(river_runoff)
# # 创建时间序列的颜色数据
color_property = {"epoch": format_time(start_time), "rgba": []}
for hour, runoff in enumerate(river_runoff):
time_offset = hour * 3600 # 转换为秒
value = round(runoff / 86400, 3)
# print("value", value)
color = get_color_from_value1(abs(value), color_mapping)
color_property["rgba"].extend([time_offset] + list(color))
# 创建河段实体
entity = {
"id": f"{river_id}",
"name": f"River Segment {idx}",
"polyline": {
"positions": {
"cartographicDegrees": [
coord for point in new_coords for coord in point
]
},
"material": {"solidColor": {"color": color_property}},
"width": level * 2,
"clampToGround": True, # 贴地显示
"classificationType": "BOTH", # 同时在地形和3D切片上显示
"zIndex": 1, # 控制叠加顺序
"distanceDisplayCondition": {
"nearFar": [0, 1000000] # 显示距离范围(米)
},
"heightReference": "CLAMP_TO_GROUND", # 确保贴地
},
}
czml.append(entity)
return czml
def save_czml(czml_data, filename):
"""保存CZML数据到文件"""
with open(filename, "w") as f:
json.dump(czml_data, f, indent=2)
# 使用示例
"""
# 生成CZML数据
czml_data = generate_river_czml(river_df, runoff_df)
# 保存到文件
save_czml(czml_data, 'river_flow.czml')
"""
if __name__ == "__main__":
level = 4
# 读取文件
river_df = read_shp("lz.in/1.geojson", level)
runoff_df = read_binary_file("lz.out/lz.rivqdown.dat")
czml_data = generate_river_czml(river_df, runoff_df, level)
save_czml(czml_data, f"river_flow_{level}.czml")
print(f"CZML文件已生成river_flow_{level}.czml")