VTScada-HistoricalTools/main.py

206 lines
7.3 KiB
Python
Raw Normal View History

import csv
import toml
import sys
import requests
from typing import List, Union
from datetime import datetime, timezone
# ----------------------
# Classes
# ----------------------
class HistoricalTag:
def __init__(self,
row: int,
tag_type: str,
name_source: str,
name_dest: str,
scale_factor: float,
interval: int,
precision: int,
deadband: float):
self.row = row
self.tag_type = tag_type
self.name_source = name_source
self.name_dest = name_dest
self.scale_factor = scale_factor
self.interval = interval
self.precision = precision
self.deadband = deadband
def __repr__(self):
return f"({self.row}, {self.tag_type}, {self.name_source}, {self.name_dest}, {self.scale_factor}, {self.interval}, {self.precision}, {self.deadband})"
# ----------------------
# Functions
# ----------------------
def compress_and_scale_real(values: List[Union[int, float, None]], deadband: float, scale_factor: float, precision: int) -> List[Union[int, float, None]]:
compressed_values = []
working_value = None
for value_pair in values:
timestamp, value = value_pair
if value is None:
continue
if working_value is None or abs(value - working_value) > deadband:
working_value = value
scaled_value = round(value * scale_factor, precision)
compressed_values.append((timestamp, scaled_value))
return compressed_values
def compress_boolean(values: List[Union[int, float, None]]) -> List[Union[int, float, None]]:
compressed_values = []
working_value = None
for value_pair in values:
timestamp, value = value_pair
if value is None:
continue
if working_value is None or value != working_value:
working_value = value
compressed_values.append((timestamp, value))
return compressed_values
def print_text(text: str):
print(r'-------------------------------------------------------------------------------------------------------')
print(text)
print(r'-------------------------------------------------------------------------------------------------------')
def map_tags():
print("-- Mapping Tags --")
def query_vtscada_tag(historical_tag: HistoricalTag, ft_start_time: datetime, ft_end_time: datetime) -> List[Union[int, float, None]]:
# Query average only for real values (Analog in VTScada)
if historical_tag.tag_type == "real":
value_string = ":Value:Average"
# Otherwise, query the value at the start of the interval
else:
value_string = ":Value:ValueAtStart"
query = "SELECT Timestamp, '" + historical_tag.name_source + value_string + "' FROM History_" + \
str(historical_tag.interval) + "s" + " WHERE Timestamp BETWEEN " + \
ft_start_time + " AND " + ft_end_time
url = "http://" + server + ":" + realm_port + \
"/" + realm_name + "/REST/SQLQuery?query=" + query
print_text(url)
response = requests.get(url, auth=(application_user, application_pass))
returned = response.json()
return returned['results']['values']
def query_vtscada(historical_tags: List[HistoricalTag], start_time: datetime, end_time: datetime):
ft_start_time = "'" + str(start_time.astimezone(timezone.utc)) + "'"
ft_end_time = "'" + str(end_time.astimezone(timezone.utc)) + "'"
for tag in historical_tags:
print(tag.name_source)
values = query_vtscada_tag(tag, ft_start_time, ft_end_time)
if values is None:
print("No values found")
else:
if tag.tag_type == "real" or tag.tag_type == "integer":
compressed_values = compress_and_scale_real(
values, tag.deadband, tag.scale_factor, tag.precision)
else:
compressed_values = compress_boolean(values)
output_file = output_path + \
tag.name_source.replace('\\', '_') + ".csv"
write_values_to_file(output_file, compressed_values)
def write_values_to_file(output_file: str, values: List[Union[int, float, None]]):
with open(output_file, mode='w', newline='', encoding='utf-8') as csvfile:
csv_writer = csv.writer(csvfile)
for value_pair in values:
if value_pair[1] is not None:
csv_writer.writerow(value_pair)
# read_tags()
# ----------------------
# Read in the list of tags and set the mapping parameters for each tag and construct the groupings required for the
# query
def read_tags(file_path: str) -> List[HistoricalTag]:
historical_tags = []
with open(file_path, mode='r', encoding='utf-8-sig') as csvfile:
csv_reader = csv.reader(csvfile, delimiter=',')
next(csv_reader) # Skip the header row
for row, line in enumerate(csv_reader):
name_source, name_dest, tag_type, scale_factor, interval, precision, deadband = line
tag = HistoricalTag(row=row+1, tag_type=tag_type, name_source=name_source, name_dest=name_dest,
scale_factor=float(scale_factor), interval=int(interval), precision=int(precision), deadband=float(deadband))
historical_tags.append(tag)
return historical_tags
# ----------------------
# Main Section
# ----------------------
print(r' _ _ _____ _____ _______ ____ _____ _____ _____ _ _______ ____ ____ _ _____ ')
print(r'| | | |_ _|/ ____|__ __/ __ \| __ \|_ _/ ____| /\ | | |__ __/ __ \ / __ \| | / ____|')
print(r'| |__| | | | | (___ | | | | | | |__) | | || | / \ | | | | | | | | | | | | | (___ ')
print(r'| __ | | | \___ \ | | | | | | _ / | || | / /\ \ | | | | | | | | | | | | \___ \ ')
print(r'| | | |_| |_ ____) | | | | |__| | | \ \ _| || |____ / ____ \| |____ | | | |__| | |__| | |____ ____) |')
print(r'|_| |_|_____|_____/ |_| \____/|_| \_\_____\_____/_/ \_\______| |_| \____/ \____/|______|_____/ ')
config = toml.load("setup.toml")
tags_path = config['system']['tags_path']
output_path = config['system']['output_path']
system_timezone = config['system']['system_timezone']
application_user = config['user']['application_user']
application_pass = config['user']['application_pass']
server = config['vtscada']['server_name']
realm_port = config['vtscada']['realm_port']
realm_name = config['vtscada']['realm_name']
# timezone = pytz.timezone(system_timezone)
if len(sys.argv) == 2:
query_type = sys.argv[1]
print("")
historical_tags = read_tags(tags_path)
start_time = datetime(2023, 5, 1, 9, 0, 0)
end_time = datetime(2023, 5, 1, 23, 59, 59)
for tag in historical_tags:
print(tag)
if query_type == "VTScada":
print_text('VTScada Data Query')
query_vtscada(historical_tags, start_time, end_time)
elif query_type == "AVEVA":
print_text('AVEVA Historian - Not Implemented')
elif query_type == "ClearSCADA":
print_text('ClearSCADA - Not Implemented')
else:
print("Invalid arguments!")