VTScada-HistoricalTools/main.py

466 lines
16 KiB
Python

import csv
import toml
import sys
import os
import requests
import pyodbc
import subprocess
import time
from typing import List, Union
from datetime import datetime, timezone, timedelta
# LIMITATIONS - 10k rows per VTScada query - reccommend limit to minimum 10 second intervals for daily values
# ----------------------
# Classes
# ----------------------
# HistoricalTag
# ----------------------
class HistoricalTag:
def __init__(self,
row: int,
tag_type: str,
name_source: str,
name_dest: str,
scale_factor: float,
interval: int,
precision: int,
deadband: float):
self.row = row
self.tag_type = tag_type
self.name_source = name_source
self.name_dest = name_dest
self.scale_factor = scale_factor
self.interval = interval
self.precision = precision
self.deadband = deadband
def __repr__(self):
return f"({self.row}, {self.tag_type}, {self.name_source}, {self.name_dest}, {self.scale_factor}, {self.interval}, {self.precision}, {self.deadband})"
# ----------------------
# Functions
# ----------------------
# clearscada_generate_historical_ids()
# ----------------------
# Generates a list of historical IDs for found historic files
def clearscada_generate_historical_ids(historic_files: str):
ids = []
for directory in os.listdir(historic_files):
if os.fsdecode(directory).startswith("Historic "):
ids.append(int(directory[9:15]))
output_file = os.path.join(output_path, "CS_HistoricIDs.CSV")
with open(output_file, mode='w', newline='', encoding='utf-8') as csvfile:
csv_writer = csv.writer(csvfile)
for id in ids:
if id is not None:
csv_writer.writerow([str(id)])
# clearscada_query()
# ----------------------
# Query ClearSCADA raw historical files using the ClearSCADA command line tool to create
# csv data from the raw data files, then process and merge the data into VTScada formats
def clearscada_query(historical_tags: List[HistoricalTag], start_time: datetime, end_time: datetime):
dir_path = output_path + str(start_time.year) + "\\"
create_directory(dir_path)
current_start_time = start_time
current_end_time = end_time
start_week = weeks_since_date(current_start_time.timestamp())
end_week = weeks_since_date(current_end_time.timestamp())
historic_directories = []
tags = []
# Get a list of all directories of Historic files (format is Historic ID with ID padded with leading zeroes) needed which exist
for tag in historical_tags:
# For ClearSCADA, the tag source is the ID code
padded_id = f'{int(tag.name_source):06}'
# Check that directory exists and if so, add it to a list
path = os.path.join(historic_files, "Historic " + padded_id)
if os.path.exists(path):
historic_directories.append(path)
tags.append(tag)
zipped_directories = zip(historic_directories, tags)
# For each found historic directory execute the ClearSCADA CSV command
tag_mappings = []
for (path, tag) in zipped_directories:
# print(path, tag.name_dest)
command = os.path.join(install_location, "SCXCMD.exe")
for file in os.listdir(path):
if os.fsdecode(file).endswith(".HRD"):
week_number = int(file[2:8])
if week_number >= start_week and week_number <= end_week:
argument = os.path.join(path, file)
subprocess.run([command, "HISDUMP", argument])
# Process each directory of CSVs first into a list of values that can be pruned
values = []
output_file = ""
for file in os.listdir(path):
if os.fsdecode(file).endswith(".csv"):
csv_file = os.path.join(path, file)
values.extend(read_clearscada_file(csv_file))
# Values will have had their deadband and scaling processed, but remaining is excess frequency
if len(values) > 0:
values = postprocess_values(values)
output_file = prepare_file_for_tag(
tag, values, dir_path, current_end_time, True)
tag_mappings.append((output_file, tag.name_dest))
write_tagmapping_to_file(
dir_path + "TagMapping.csv", tag_mappings)
# main_directory = os.fsencode(historic_files)
# clearscada_read_file()
# ----------------------
# Read in a ClearSCADA CSV file converted from HRD into a list of timestamps and values
def clearscada_read_file(file_path: str) -> List[Union[int, float, None]]:
values = []
with open(file_path, mode='r', encoding='utf-8-sig') as csvfile:
csv_reader = csv.reader(csvfile, delimiter=',')
next(csv_reader) # Skip the header row
for row, line in enumerate(csv_reader):
if line[2] == "Good":
timestamp = datetime.timestamp(
datetime.strptime(line[0], "%d/%m/%Y %H:%M:%S"))
value = float(line[1])
values.append((timestamp, value))
return values
# compress_and_scale_real()
# ----------------------
# -- Deadband (only keeping values which change by the required amount)
# -- Precision (decimal places, cleaning up excess data from floating points)
# -- Scaling factor (applies the scaling factor to the value before assigning the precision)
def compress_and_scale_real(values: List[Union[int, float, None]], deadband: float, scale_factor: float, precision: int) -> List[Union[int, float, None]]:
compressed_values = []
working_value = None
for value_pair in values:
timestamp, value = value_pair
if value is None:
continue
if working_value is None or abs(value - working_value) > deadband:
working_value = value
scaled_value = round(value * scale_factor, precision)
compressed_values.append((timestamp, scaled_value))
return compressed_values
# compress_boolean()
# ----------------------
# Compress a set of timestamp and boolean values to transitions. For booleans, transitions are
# kept and the assumption is
# the interval will be fast enough to keep all transitions.
def compress_boolean(values: List[Union[int, float, None]]) -> List[Union[int, float, None]]:
compressed_values = []
working_value = None
for value_pair in values:
timestamp, value = value_pair
if value is None:
continue
if working_value is None or value != working_value:
working_value = value
compressed_values.append((timestamp, value))
return compressed_values
# create_directory()
# ----------------------
# Create a directory if it doesn't exist
def create_directory(path):
if not os.path.exists(path):
os.makedirs(path)
# postprocess_values()
# ----------------------
# Process a list of values assumed and clean up timestamps which are within the interval of the last
# timestamp. Values are assumed to already have been compressed
def postprocess_values(values: List[Union[int, float, None]]):
last_time = time.time()
processed_values = []
for (timestamp, value) in values:
timedelta = abs(last_time - timestamp)
if timedelta > 50:
processed_values.append((timestamp, value))
last_time = timestamp
last_time = timestamp
return processed_values
# prepare_file_for_tag()
# ----------------------
def prepare_file_for_tag(tag: HistoricalTag, values: List[Union[int, float, None]], dir_path: str, current_end_time: datetime, append=False) -> str:
if values is None:
print("No values found")
return ""
else:
output_file = ""
if tag.tag_type == "real" or tag.tag_type == "integer":
compressed_values = compress_and_scale_real(
values, tag.deadband, tag.scale_factor, tag.precision)
else:
compressed_values = compress_boolean(values)
if len(compressed_values) != 0:
output_file = tag.name_source.replace('\\', '_') + "_" + str(current_end_time.year) + str(
current_end_time.month) + str(current_end_time.day) + ".csv"
full_output_file = dir_path + output_file
write_values_to_file(full_output_file, compressed_values, True)
return output_file
# print_text()
# ----------------------
# Print formatting a text line for debugging and such
def print_text(text: str):
print(r'-------------------------------------------------------------------------------------------------------')
print(text)
print(r'-------------------------------------------------------------------------------------------------------')
# read_tags()
# ----------------------
# Read in the list of tags and set the mapping parameters for each tag and construct the groupings required for the
# query
def read_tags(file_path: str) -> List[HistoricalTag]:
historical_tags = []
with open(file_path, mode='r', encoding='utf-8-sig') as csvfile:
csv_reader = csv.reader(csvfile, delimiter=',')
next(csv_reader) # Skip the header row
for row, line in enumerate(csv_reader):
name_source, name_dest, tag_type, scale_factor, interval, precision, deadband = line
tag = HistoricalTag(row=row+1, tag_type=tag_type, name_source=name_source, name_dest=name_dest,
scale_factor=float(scale_factor), interval=int(interval), precision=int(precision), deadband=float(deadband))
historical_tags.append(tag)
return historical_tags
# vtscada_tag_query()
# ----------------------
# Given a HistoricalTag structure, query the tag's values from the start time to the end time
def vtscada_tag_query(historical_tag: HistoricalTag, ft_start_time: datetime, ft_end_time: datetime) -> List[Union[int, float, None]]:
# Query average only for real values (Analog in VTScada)
if historical_tag.tag_type == "real":
value_string = ":Value:Average"
# Otherwise, query the value at the start of the interval
else:
value_string = ":Value:ValueAtStart"
query = "SELECT Timestamp, '" + historical_tag.name_source + value_string + "' FROM History_" + \
str(historical_tag.interval) + "s" + " WHERE Timestamp BETWEEN " + \
ft_start_time + " AND " + ft_end_time
url = "http://" + server + ":" + realm_port + \
"/" + realm_name + "/REST/SQLQuery?query=" + query
# print_text(url)
response = requests.get(url, auth=(application_user, application_pass))
returned = response.json()
return returned['results']['values']
# vtscada_query()
# ----------------------
# Given the set of HistoricalTags and a start and end time, query the data of those tags from the
# REST interface
def vtscada_query(historical_tags: List[HistoricalTag], start_time: datetime, end_time: datetime):
current_start_time = start_time
current_end_time = start_time + timedelta(days=1)
while current_start_time < end_time:
print("Querying data for: " + str(current_start_time.year) + " " +
str(current_start_time.month) + " " + str(current_start_time.day))
dir_path = output_path + str(start_time.year) + "\\"
create_directory(dir_path)
ft_start_time = "'" + \
str(current_start_time.astimezone(timezone.utc)) + "'"
ft_end_time = "'" + \
str(current_end_time.astimezone(timezone.utc)) + "'"
tag_mappings = []
for tag in historical_tags:
values = vtscada_tag_query(tag, ft_start_time, ft_end_time)
output_file = prepare_file_for_tag(
tag, values, dir_path, current_end_time)
if output_file != "":
tag_mappings.append((output_file, tag.name_dest))
write_tagmapping_to_file(
dir_path + "TagMapping.csv", tag_mappings)
current_start_time += timedelta(days=1)
current_end_time += timedelta(days=1)
# write_tagmappings_to_file()
# ----------------------
# Create a new TagMapping.CSV file which contains the mapping of all tag names and files which
# contain their CSV data
def write_tagmapping_to_file(output_file: str, tag_mappings: List[str]):
with open(output_file, mode='a', newline='', encoding='utf-8') as csvfile:
csv_writer = csv.writer(csvfile)
for mapping in tag_mappings:
csv_writer.writerow(mapping)
# write_values_to_file()
# ----------------------
# Given a full path name of a file and list of timestamp, value pairs, write the values to a
# CSV file with each pair on its own row.
def write_values_to_file(output_file: str, values: List[Union[int, float, None]], append=False):
if append:
csv_mode = 'a'
else:
csv_mode = 'w'
with open(output_file, mode=csv_mode, newline='', encoding='utf-8') as csvfile:
csv_writer = csv.writer(csvfile)
for value_pair in values:
timestamp, value = value_pair
if value is not None:
utc_dt = datetime.utcfromtimestamp(timestamp)
formatted_timestamp = utc_dt.strftime(
'%Y-%m-%d %H:%M:%S.%f')[:-3]
csv_writer.writerow([formatted_timestamp, value])
# weeks_since_date()
# ----------------------
# Returns the number of weeks since the given timestamp, or defaults to December 25th, 1600
def weeks_since_date(timestamp, date=(1600, 12, 25)):
dt = datetime.utcfromtimestamp(timestamp)
start_date = datetime(*date)
delta = dt - start_date
weeks = delta.days // 7
return weeks
# ----------------------
# Main Section
# ----------------------
print(r' _ _ _____ _____ _______ ____ _____ _____ _____ _ _______ ____ ____ _ _____ ')
print(r'| | | |_ _|/ ____|__ __/ __ \| __ \|_ _/ ____| /\ | | |__ __/ __ \ / __ \| | / ____|')
print(r'| |__| | | | | (___ | | | | | | |__) | | || | / \ | | | | | | | | | | | | | (___ ')
print(r'| __ | | | \___ \ | | | | | | _ / | || | / /\ \ | | | | | | | | | | | | \___ \ ')
print(r'| | | |_| |_ ____) | | | | |__| | | \ \ _| || |____ / ____ \| |____ | | | |__| | |__| | |____ ____) |')
print(r'|_| |_|_____|_____/ |_| \____/|_| \_\_____\_____/_/ \_\______| |_| \____/ \____/|______|_____/ ')
config = toml.load("setup.toml")
tags_path = config['system']['tags_path']
output_path = config['system']['output_path']
system_timezone = config['system']['system_timezone']
application_user = config['user']['application_user']
application_pass = config['user']['application_pass']
if len(sys.argv) == 4:
query_type = sys.argv[1]
year, month, day = map(int, str(sys.argv[2]).split("-"))
start_time = datetime(year, month, day, 0, 0, 0)
year, month, day = map(int, str(sys.argv[3]).split("-"))
end_time = datetime(year, month, day, 0, 0, 0)
print("")
historical_tags = read_tags(tags_path)
for tag in historical_tags:
print(tag)
if query_type == "VTScada":
print_text('VTScada Data Query')
server = config['vtscada']['server_name']
realm_port = config['vtscada']['realm_port']
realm_name = config['vtscada']['realm_name']
vtscada_query(historical_tags, start_time, end_time)
elif query_type == "AVEVA":
print_text('AVEVA Historian - Not Implemented')
elif query_type == "ClearSCADA":
print_text('ClearSCADA - Query Raw Historic Files')
historic_files = config['clearscada']['historic_files']
install_location = config['clearscada']['install_location']
delete_processed = config['clearscada']['delete_processed']
clearscada_generate_historical_ids(historic_files)
clearscada_query(historical_tags, start_time, end_time)
else:
print("Invalid arguments!")