VTScada-HistoricalTools/main.py

571 lines
20 KiB
Python
Raw Normal View History

import csv
import toml
import sys
import os
import requests
2023-06-14 11:21:58 -06:00
import pyodbc
import subprocess
import time
from typing import List, Union
from datetime import datetime, timezone, timedelta
# LIMITATIONS - 10k rows per VTScada query - reccommend limit to minimum 10 second intervals for daily values
# ----------------------
# Classes
# ----------------------
# HistoricalTag
# ----------------------
class HistoricalTag:
def __init__(self,
row: int,
tag_type: str,
name_source: str,
name_dest: str,
scale_factor: float,
interval: int,
precision: int,
deadband: float):
self.row = row
self.tag_type = tag_type
self.name_source = name_source
self.name_dest = name_dest
self.scale_factor = scale_factor
self.interval = interval
self.precision = precision
self.deadband = deadband
def __repr__(self):
return f"({self.row}, {self.tag_type}, {self.name_source}, {self.name_dest}, {self.scale_factor}, {self.interval}, {self.precision}, {self.deadband})"
# ----------------------
# AVEVA (Wonderware) Historian Functions
# ----------------------
# aveva_query()
# ----------------------
#
def aveva_query(historical_tags: List[HistoricalTag], start_time: datetime, end_time: datetime):
current_start_time = start_time
current_end_time = start_time
tag_mappings = []
dir_path = output_path + str(start_time.year) + "\\"
while current_end_time < end_time:
# Wonderware starts to buckle around 3 month queries, query each timeline 3 months at a time
if current_end_time + timedelta(days=92) > end_time:
current_end_time = end_time
else:
current_end_time = current_end_time + timedelta(days=92)
tag_mappings.append(aveva_query_date(historical_tags, current_start_time, current_end_time))
current_start_time = current_start_time + timedelta(days=92)
write_tagmapping_to_file(dir_path + "TagMapping.csv", tag_mappings)
# aveva_query_date()
# ----------------------
#
def aveva_query_date(historical_tags: List[HistoricalTag], start_time: datetime, end_time: datetime):
print("Querying data for: " + str(start_time.year) + " " +
str(start_time.month) + " " + str(start_time.day))
dir_path = output_path + str(start_time.year) + "\\"
create_directory(dir_path)
init_string = f'DRIVER={{ODBC Driver 18 for SQL Server}};SERVER={server};' + \
f'DATABASE={database_name};UID={application_user};PWD={application_pass};' + \
f'ENCRYPT=no'
print(init_string)
connection = pyodbc.connect(init_string)
tag_mappings = []
for tag in historical_tags:
if tag.tag_type == "real" or tag.tag_type == "integer":
retrieval_mode = '"Average"'
else:
retrieval_mode = '"Cyclic"'
ft_start_time = '"' + str(start_time.astimezone(timezone.utc)) + '"'
ft_end_time = '"' + str(end_time.astimezone(timezone.utc)) + '"'
ft_tag_str = '[' + str(tag.name_source) + ']'
query = f"SELECT * FROM OpenQuery(INSQL, 'SELECT DateTime, {ft_tag_str} FROM WideHistory WHERE DateTime >=" + \
f"{ft_start_time} AND DateTime <= {ft_end_time} AND wwRetrievalMode = {retrieval_mode}" + \
f"AND wwResolution = {str(tag.interval * 1000)}')"
cursor = connection.cursor()
print(query)
cursor.execute(query)
raw_values = cursor.fetchall()
values = []
for row in raw_values:
values.append((datetime.timestamp(row[0]), row[1]))
output_file = prepare_file_for_tag(tag, values, dir_path, end_time)
if output_file != "":
tag_mappings.append((output_file, tag.name_dest))
return tag_mappings
# ----------------------
# ClearSCADA Functions
# ----------------------
2023-06-14 11:21:58 -06:00
# clearscada_generate_historical_ids()
# ----------------------
# Generates a list of historical IDs for found historic files
def clearscada_generate_historical_ids(historic_files: str):
ids = []
for directory in os.listdir(historic_files):
if os.fsdecode(directory).startswith("Historic "):
ids.append(int(directory[9:15]))
output_file = os.path.join(output_path, "CS_HistoricIDs.CSV")
with open(output_file, mode='w', newline='', encoding='utf-8') as csvfile:
csv_writer = csv.writer(csvfile)
for id in ids:
if id is not None:
csv_writer.writerow([str(id)])
# clearscada_process()
2023-06-14 11:21:58 -06:00
# ----------------------
# Process ClearSCADA raw historical files using the ClearSCADA command line tool to create
2023-06-14 11:21:58 -06:00
# csv data from the raw data files, then process and merge the data into VTScada formats
def clearscada_process(historical_tags: List[HistoricalTag], start_time: datetime, end_time: datetime):
2023-06-14 11:21:58 -06:00
dir_path = output_path + str(start_time.year) + "\\"
create_directory(dir_path)
current_start_time = start_time
current_end_time = end_time
start_week = weeks_since_date(current_start_time.timestamp())
end_week = weeks_since_date(current_end_time.timestamp())
historic_directories = []
tags = []
# Get a list of all directories of Historic files (format is Historic ID with ID padded with leading zeroes) needed which exist
for tag in historical_tags:
# For ClearSCADA, the tag source is the ID code
padded_id = f'{int(tag.name_source):06}'
# Check that directory exists and if so, add it to a list
path = os.path.join(historic_files, "Historic " + padded_id)
if os.path.exists(path):
historic_directories.append(path)
tags.append(tag)
zipped_directories = zip(historic_directories, tags)
# For each found historic directory execute the ClearSCADA CSV command
print("Found: " + str(len(zipped_directories)) +
" historic directories matching tags")
2023-06-14 11:21:58 -06:00
tag_mappings = []
files = []
2023-06-14 11:21:58 -06:00
for (path, tag) in zipped_directories:
# print(path, tag.name_dest)
command = os.path.join(install_location, "SCXCMD.exe")
for file in os.listdir(path):
if os.fsdecode(file).endswith(".HRD"):
week_number = int(file[2:8])
if week_number >= start_week and week_number <= end_week:
files.append(file)
2023-06-14 11:21:58 -06:00
argument = os.path.join(path, file)
subprocess.run([command, "HISDUMP", argument])
# Process each directory of CSVs first into a list of values that can be pruned
values = []
output_file = ""
for file in os.listdir(path):
if os.fsdecode(file).endswith(".csv"):
csv_file = os.path.join(path, file)
values.extend(clearscada_read_file(csv_file))
files.append(file)
2023-06-14 11:21:58 -06:00
# Values will have had their deadband and scaling processed, but remaining is excess frequency
if len(values) > 0:
values = postprocess_values(values)
output_file = prepare_file_for_tag(
tag, values, dir_path, current_end_time, True)
tag_mappings.append((output_file, tag.name_dest))
# Delete files as they are processed
for file in files:
if file.endswith(".csv") or (delete_processed and file.endswith(".HRD")):
try:
os.remove(file)
except OSError as e:
print("Error: %s - %s." % (e.filename, e.strerror))
2023-06-14 11:21:58 -06:00
write_tagmapping_to_file(
dir_path + "TagMapping.csv", tag_mappings)
# clearscada_read_file()
# ----------------------
# Read in a ClearSCADA CSV file converted from HRD into a list of timestamps and values
2023-06-14 11:21:58 -06:00
def clearscada_read_file(file_path: str) -> List[Union[int, float, None]]:
values = []
with open(file_path, mode='r', encoding='utf-8-sig') as csvfile:
csv_reader = csv.reader(csvfile, delimiter=',')
next(csv_reader) # Skip the header row
for row, line in enumerate(csv_reader):
if line[2] == "Good":
timestamp = datetime.timestamp(
datetime.strptime(line[0], "%d/%m/%Y %H:%M:%S"))
value = float(line[1])
values.append((timestamp, value))
return values
# ----------------------
# VTScada Functions
# ----------------------
# vtscada_tag_query()
# ----------------------
# Given a HistoricalTag structure, query the tag's values from the start time to the end time
def vtscada_tag_query(historical_tag: HistoricalTag, ft_start_time: datetime, ft_end_time: datetime) -> List[Union[int, float, None]]:
# Query average only for real values (Analog in VTScada)
if historical_tag.tag_type == "real":
value_string = ":Value:Average"
# Otherwise, query the value at the start of the interval
else:
value_string = ":Value:ValueAtStart"
query = "SELECT Timestamp, '" + historical_tag.name_source + value_string + "' FROM History_" + \
str(historical_tag.interval) + "s" + " WHERE Timestamp BETWEEN " + \
ft_start_time + " AND " + ft_end_time
url = "http://" + server + ":" + realm_port + \
"/" + realm_name + "/REST/SQLQuery?query=" + query
# print_text(url)
response = requests.get(url, auth=(application_user, application_pass))
returned = response.json()
return returned['results']['values']
# vtscada_query()
# ----------------------
# Given the set of HistoricalTags and a start and end time, query the data of those tags from the
# REST interface
def vtscada_query(historical_tags: List[HistoricalTag], start_time: datetime, end_time: datetime):
current_start_time = start_time
current_end_time = start_time + timedelta(days=1)
while current_start_time < end_time:
print("Querying data for: " + str(current_start_time.year) + " " +
str(current_start_time.month) + " " + str(current_start_time.day))
dir_path = output_path + str(start_time.year) + "\\"
create_directory(dir_path)
ft_start_time = "'" + \
str(current_start_time.astimezone(timezone.utc)) + "'"
ft_end_time = "'" + \
str(current_end_time.astimezone(timezone.utc)) + "'"
tag_mappings = []
for tag in historical_tags:
values = vtscada_tag_query(tag, ft_start_time, ft_end_time)
output_file = prepare_file_for_tag(
tag, values, dir_path, current_end_time)
if output_file != "":
tag_mappings.append((output_file, tag.name_dest))
write_tagmapping_to_file(
dir_path + "TagMapping.csv", tag_mappings)
current_start_time += timedelta(days=1)
current_end_time += timedelta(days=1)
# ----------------------
# Common Functions
# ----------------------
# compress_and_scale_real()
# ----------------------
2023-06-14 11:21:58 -06:00
# -- Deadband (only keeping values which change by the required amount)
# -- Precision (decimal places, cleaning up excess data from floating points)
# -- Scaling factor (applies the scaling factor to the value before assigning the precision)
def compress_and_scale_real(values: List[Union[int, float, None]], deadband: float, scale_factor: float, precision: int) -> List[Union[int, float, None]]:
compressed_values = []
working_value = None
for value_pair in values:
timestamp, value = value_pair
if value is None:
continue
if working_value is None or abs(value - working_value) > deadband:
working_value = value
scaled_value = round(value * scale_factor, precision)
compressed_values.append((timestamp, scaled_value))
return compressed_values
# compress_boolean()
# ----------------------
2023-06-14 11:21:58 -06:00
# Compress a set of timestamp and boolean values to transitions. For booleans, transitions are
# kept and the assumption is
# the interval will be fast enough to keep all transitions.
def compress_boolean(values: List[Union[int, float, None]]) -> List[Union[int, float, None]]:
compressed_values = []
working_value = None
for value_pair in values:
timestamp, value = value_pair
if value is None:
continue
if working_value is None or value != working_value:
working_value = value
compressed_values.append((timestamp, value))
return compressed_values
# create_directory()
# ----------------------
2023-06-14 11:21:58 -06:00
# Create a directory if it doesn't exist
def create_directory(path):
if not os.path.exists(path):
os.makedirs(path)
2023-06-14 11:21:58 -06:00
# postprocess_values()
# ----------------------
# Process a list of values assumed and clean up timestamps which are within the interval of the last
# timestamp
2023-06-14 11:21:58 -06:00
def postprocess_values(values: List[Union[int, float, None]]):
last_time = time.time()
processed_values = []
for (timestamp, value) in values:
timedelta = abs(last_time - timestamp)
if timedelta > 50:
processed_values.append((timestamp, value))
last_time = timestamp
last_time = timestamp
return processed_values
# prepare_file_for_tag()
# ----------------------
# Helper function to call the correct compressing and processing functions for a given tag and getting it written to
# file
2023-06-14 11:21:58 -06:00
def prepare_file_for_tag(tag: HistoricalTag, values: List[Union[int, float, None]], dir_path: str, current_end_time: datetime, append=False) -> str:
if values is None:
print("No values found")
return ""
else:
output_file = ""
if tag.tag_type == "real" or tag.tag_type == "integer":
compressed_values = compress_and_scale_real(
values, tag.deadband, tag.scale_factor, tag.precision)
else:
compressed_values = compress_boolean(values)
if len(compressed_values) != 0:
output_file = tag.name_source.replace('\\', '_') + "_" + str(current_end_time.year) + str(
current_end_time.month) + str(current_end_time.day) + ".csv"
full_output_file = dir_path + output_file
write_values_to_file(full_output_file, compressed_values, True)
return output_file
# print_text()
# ----------------------
2023-06-14 11:21:58 -06:00
# Print formatting a text line for debugging and such
def print_text(text: str):
print(r'-------------------------------------------------------------------------------------------------------')
print(text)
print(r'-------------------------------------------------------------------------------------------------------')
2023-06-14 11:21:58 -06:00
# read_tags()
# ----------------------
2023-06-14 11:21:58 -06:00
# Read in the list of tags and set the mapping parameters for each tag and construct the groupings required for the
# query
def read_tags(file_path: str) -> List[HistoricalTag]:
historical_tags = []
with open(file_path, mode='r', encoding='utf-8-sig') as csvfile:
csv_reader = csv.reader(csvfile, delimiter=',')
next(csv_reader) # Skip the header row
for row, line in enumerate(csv_reader):
name_source, name_dest, tag_type, scale_factor, interval, precision, deadband = line
tag = HistoricalTag(row=row+1, tag_type=tag_type, name_source=name_source, name_dest=name_dest,
scale_factor=float(scale_factor), interval=int(interval), precision=int(precision), deadband=float(deadband))
historical_tags.append(tag)
return historical_tags
2023-06-14 11:21:58 -06:00
# write_tagmappings_to_file()
# ----------------------
# Create a new TagMapping.CSV file which contains the mapping of all tag names and files which
# contain their CSV data
2023-06-14 11:21:58 -06:00
def write_tagmapping_to_file(output_file: str, tag_mappings: List[str]):
with open(output_file, mode='a', newline='', encoding='utf-8') as csvfile:
csv_writer = csv.writer(csvfile)
2023-06-14 11:21:58 -06:00
for mapping in tag_mappings:
csv_writer.writerow(mapping)
# write_values_to_file()
# ----------------------
2023-06-14 11:21:58 -06:00
# Given a full path name of a file and list of timestamp, value pairs, write the values to a
# CSV file with each pair on its own row.
2023-06-14 11:21:58 -06:00
def write_values_to_file(output_file: str, values: List[Union[int, float, None]], append=False):
if append:
csv_mode = 'a'
else:
csv_mode = 'w'
with open(output_file, mode=csv_mode, newline='', encoding='utf-8') as csvfile:
csv_writer = csv.writer(csvfile)
for value_pair in values:
timestamp, value = value_pair
if value is not None:
utc_dt = datetime.utcfromtimestamp(timestamp)
formatted_timestamp = utc_dt.strftime(
'%Y-%m-%d %H:%M:%S.%f')[:-3]
csv_writer.writerow([formatted_timestamp, value])
2023-06-14 11:21:58 -06:00
# weeks_since_date()
# ----------------------
# Returns the number of weeks since the given timestamp, or defaults to January 1, 1601
# It looks like ClearSCADA documentation before the GeoSCADA rebrand erroneously used
# the number of weeks since December 25, 1600, which caused being off by 1.
def weeks_since_date(timestamp, date=(1601, 1, 1)):
2023-06-14 11:21:58 -06:00
dt = datetime.utcfromtimestamp(timestamp)
start_date = datetime(*date)
delta = dt - start_date
weeks = delta.days // 7
return weeks
# ----------------------
# Main Section
# ----------------------
print(r' _ _ _____ _____ _______ ____ _____ _____ _____ _ _______ ____ ____ _ _____ ')
print(r'| | | |_ _|/ ____|__ __/ __ \| __ \|_ _/ ____| /\ | | |__ __/ __ \ / __ \| | / ____|')
print(r'| |__| | | | | (___ | | | | | | |__) | | || | / \ | | | | | | | | | | | | | (___ ')
print(r'| __ | | | \___ \ | | | | | | _ / | || | / /\ \ | | | | | | | | | | | | \___ \ ')
print(r'| | | |_| |_ ____) | | | | |__| | | \ \ _| || |____ / ____ \| |____ | | | |__| | |__| | |____ ____) |')
print(r'|_| |_|_____|_____/ |_| \____/|_| \_\_____\_____/_/ \_\______| |_| \____/ \____/|______|_____/ ')
config = toml.load("setup.toml")
tags_path = config['system']['tags_path']
output_path = config['system']['output_path']
system_timezone = config['system']['system_timezone']
application_user = config['user']['application_user']
application_pass = config['user']['application_pass']
if len(sys.argv) == 4:
query_type = sys.argv[1]
year, month, day = map(int, str(sys.argv[2]).split("-"))
start_time = datetime(year, month, day, 0, 0, 0)
year, month, day = map(int, str(sys.argv[3]).split("-"))
end_time = datetime(year, month, day, 0, 0, 0)
print("")
historical_tags = read_tags(tags_path)
for tag in historical_tags:
print(tag)
if query_type == "VTScada":
print_text('VTScada Data Query')
2023-06-14 11:21:58 -06:00
server = config['vtscada']['server_name']
realm_port = config['vtscada']['realm_port']
realm_name = config['vtscada']['realm_name']
vtscada_query(historical_tags, start_time, end_time)
elif query_type == "AVEVA":
server = config['aveva']['server_name']
database_name = config['aveva']['database_name']
aveva_query(historical_tags, start_time, end_time)
elif query_type == "ClearSCADA":
2023-06-14 11:21:58 -06:00
print_text('ClearSCADA - Query Raw Historic Files')
historic_files = config['clearscada']['historic_files']
install_location = config['clearscada']['install_location']
delete_processed = config['clearscada']['delete_processed']
clearscada_generate_historical_ids(historic_files)
clearscada_process(historical_tags, start_time, end_time)
2023-06-14 11:21:58 -06:00
else:
print("Invalid arguments!")