From 1abed2227ec8af5be90fd9290b412a891f3ed5bf Mon Sep 17 00:00:00 2001 From: Michael Van Ryn Date: Mon, 1 May 2023 15:02:42 -0600 Subject: [PATCH] Adding analog scaling, compression functions and boolean compression functions --- input/tags.csv | 10 ++-- main.py | 153 +++++++++++++++++++++++++++++++------------------ setup.toml | 3 +- 3 files changed, 103 insertions(+), 63 deletions(-) diff --git a/input/tags.csv b/input/tags.csv index a1e7abe..4830092 100644 --- a/input/tags.csv +++ b/input/tags.csv @@ -1,5 +1,5 @@ -Source Name,Destination Name,Data Type,Scale Factor,Interval (s),Precision -temp\old_value1,temp\new_value1,real,1,20,2 -temp\old_value2,temp\new_value2,integer,10,100,0 -temp\old_value3,temp\new_value3,real,1.5,100,2 -temp\old_value4,temp\new_value4,boolean,1,20,0 +Source Name,Destination Name,Data Type,Scale Factor,Interval (s),Precision,Deadband +temp\old_value1,temp\new_value1,real,1,20,2,0 +temp\old_value2,temp\new_value2,integer,10,100,0,0 +temp\old_value3,temp\new_value3,real,1.5,100,2,0 +temp\old_value4,temp\new_value4,boolean,1,9,0,0 diff --git a/main.py b/main.py index 1fbd698..5331271 100644 --- a/main.py +++ b/main.py @@ -2,20 +2,13 @@ import csv import toml import sys import requests -import pytz -from enum import Enum, auto -#from strnum import StrEnum -from typing import List -from datetime import timedelta, time, datetime +from typing import List, Union +from datetime import datetime, timezone # ---------------------- # Classes # ---------------------- -#class TagDataType(StrNum): -# BOOLEAN = auto() -# INTEGER = auto() -# FLOAT = auto() -# STRING = () + class HistoricalTag: def __init__(self, @@ -25,7 +18,8 @@ class HistoricalTag: name_dest: str, scale_factor: float, interval: int, - precision: int): + precision: int, + deadband: float): self.row = row self.tag_type = tag_type self.name_source = name_source @@ -33,75 +27,120 @@ class HistoricalTag: self.scale_factor = scale_factor self.interval = interval self.precision = precision + self.deadband = deadband def __repr__(self): - return f"({self.row}, {self.tag_type}, {self.name_source}, {self.name_dest}, {self.scale_factor}, {self.interval}, {self.precision})" + return f"({self.row}, {self.tag_type}, {self.name_source}, {self.name_dest}, {self.scale_factor}, {self.interval}, {self.precision}, {self.deadband})" # ---------------------- # Functions # ---------------------- -# format_time_of_day() -# ------------------ -# day - python datetime -# timezone - pytz timezone -# minute_offset - # of minutes to offset the -# ------------------ -# Converts timestamps like YYYY-MM-DDTHH:MM:ss.000000Z to -# timestamps in the form of YYYY-MM-DD HH:MM:ss +def compress_and_scale_real(values: List[Union[int, float, None]], deadband: float, scale_factor: float, precision: int) -> List[Union[int, float, None]]: + compressed_values = [] + working_value = None + + for value_pair in values: + timestamp, value = value_pair + + if value is None: + continue + + if working_value is None or abs(value - working_value) > deadband: + working_value = value + scaled_value = round(value * scale_factor, precision) + compressed_values.append((timestamp, scaled_value)) + + return compressed_values + + +def compress_boolean(values: List[Union[int, float, None]]) -> List[Union[int, float, None]]: + compressed_values = [] + working_value = None + + for value_pair in values: + timestamp, value = value_pair + + if value is None: + continue + + if working_value is None or value != working_value: + working_value = value + compressed_values.append((timestamp, value)) + + return compressed_values -def format_time_of_day(day, timezone, minute_offset): - format = '%Y-%m-%dT%H:%M:%S.%fZ' - midnight = timezone.localize(datetime.combine( - day, time()) + timedelta(minutes=minute_offset), is_dst=None) - midnight = midnight.astimezone(pytz.utc).strftime( - format).replace(".000000Z", "") - return "'" + midnight.replace("T", " ") + "'" def print_text(text: str): print(r'-------------------------------------------------------------------------------------------------------') print(text) print(r'-------------------------------------------------------------------------------------------------------') + def map_tags(): print("-- Mapping Tags --") + +def query_vtscada_tag(historical_tag: HistoricalTag, ft_start_time: datetime, ft_end_time: datetime) -> List[Union[int, float, None]]: + # Query average only for real values (Analog in VTScada) + if historical_tag.tag_type == "real": + value_string = ":Value:Average" + # Otherwise, query the value at the start of the interval + else: + value_string = ":Value:ValueAtStart" + + query = "SELECT Timestamp, '" + historical_tag.name_source + value_string + "' FROM History_" + \ + str(historical_tag.interval) + "s" + " WHERE Timestamp BETWEEN " + \ + ft_start_time + " AND " + ft_end_time + + url = "http://" + server + ":" + realm_port + \ + "/" + realm_name + "/REST/SQLQuery?query=" + query + + print_text(url) + + response = requests.get(url, auth=(application_user, application_pass)) + returned = response.json() + + return returned['results']['values'] + + def query_vtscada(historical_tags: List[HistoricalTag], start_time: datetime, end_time: datetime): - - ft_start_time = format_time_of_day(start_time, timezone, 0) - - ft_end_time = format_time_of_day(end_time, timezone, 0) + ft_start_time = "'" + str(start_time.astimezone(timezone.utc)) + "'" + ft_end_time = "'" + str(end_time.astimezone(timezone.utc)) + "'" for tag in historical_tags: - query = "SELECT Timestamp, '" + tag.name_source + ":Value:Average' FROM History_" + str(tag.interval) + "s" + \ - " WHERE Timestamp BETWEEN " + ft_start_time + " AND " + ft_end_time - - url = "http://" + server + ":" + realm_port + \ - "/" + realm_name + "/REST/SQLQuery?query=" + query - - print_text(url) + print(tag.name_source) + values = query_vtscada_tag(tag, ft_start_time, ft_end_time) - response = requests.get(url, auth=(application_user, application_pass)) - returned = response.json() + if values is None: + print("No values found") + else: + if tag.tag_type == "real" or tag.tag_type == "integer": + compressed_values = compress_and_scale_real( + values, tag.deadband, tag.scale_factor, tag.precision) + else: + compressed_values = compress_boolean(values) - values = returned['results']['values'] + output_file = output_path + \ + tag.name_source.replace('\\', '_') + ".csv" + write_values_to_file(output_file, compressed_values) - output_file = tag.name_source.replace('\\','_') + ".csv" - print(output_file) - with open(output_file, mode='w', newline='', encoding='utf-8') as csvfile: - csv_writer = csv.writer(csvfile) - - for value_pair in values: - if value_pair[1] is not None: - csv_writer.writerow(value_pair) +def write_values_to_file(output_file: str, values: List[Union[int, float, None]]): + with open(output_file, mode='w', newline='', encoding='utf-8') as csvfile: + csv_writer = csv.writer(csvfile) + for value_pair in values: + if value_pair[1] is not None: + csv_writer.writerow(value_pair) # read_tags() # ---------------------- # Read in the list of tags and set the mapping parameters for each tag and construct the groupings required for the -# query +# query + + def read_tags(file_path: str) -> List[HistoricalTag]: historical_tags = [] @@ -110,9 +149,9 @@ def read_tags(file_path: str) -> List[HistoricalTag]: next(csv_reader) # Skip the header row for row, line in enumerate(csv_reader): - name_source, name_dest, tag_type, scale_factor, interval, precision = line + name_source, name_dest, tag_type, scale_factor, interval, precision, deadband = line tag = HistoricalTag(row=row+1, tag_type=tag_type, name_source=name_source, name_dest=name_dest, - scale_factor=float(scale_factor), interval=int(interval), precision=int(precision)) + scale_factor=float(scale_factor), interval=int(interval), precision=int(precision), deadband=float(deadband)) historical_tags.append(tag) return historical_tags @@ -121,6 +160,7 @@ def read_tags(file_path: str) -> List[HistoricalTag]: # Main Section # ---------------------- + print(r' _ _ _____ _____ _______ ____ _____ _____ _____ _ _______ ____ ____ _ _____ ') print(r'| | | |_ _|/ ____|__ __/ __ \| __ \|_ _/ ____| /\ | | |__ __/ __ \ / __ \| | / ____|') print(r'| |__| | | | | (___ | | | | | | |__) | | || | / \ | | | | | | | | | | | | | (___ ') @@ -131,7 +171,7 @@ print(r'|_| |_|_____|_____/ |_| \____/|_| \_\_____\_____/_/ \_\______| config = toml.load("setup.toml") tags_path = config['system']['tags_path'] -timezone = config['system']['system_timezone'] +output_path = config['system']['output_path'] system_timezone = config['system']['system_timezone'] application_user = config['user']['application_user'] application_pass = config['user']['application_pass'] @@ -140,7 +180,7 @@ server = config['vtscada']['server_name'] realm_port = config['vtscada']['realm_port'] realm_name = config['vtscada']['realm_name'] -timezone = pytz.timezone(system_timezone) +# timezone = pytz.timezone(system_timezone) if len(sys.argv) == 2: query_type = sys.argv[1] @@ -148,8 +188,8 @@ if len(sys.argv) == 2: print("") historical_tags = read_tags(tags_path) - start_time = datetime(2023, 4, 30, 0, 0, 0) - end_time = datetime(2023, 5, 2, 23, 59, 59) + start_time = datetime(2023, 5, 1, 9, 0, 0) + end_time = datetime(2023, 5, 1, 23, 59, 59) for tag in historical_tags: print(tag) @@ -163,4 +203,3 @@ if len(sys.argv) == 2: print_text('ClearSCADA - Not Implemented') else: print("Invalid arguments!") - diff --git a/setup.toml b/setup.toml index ecd9cac..124aa67 100644 --- a/setup.toml +++ b/setup.toml @@ -1,10 +1,11 @@ [system] tags_path = "input\\tags.csv" +output_path = "output\\" # Must be a valid Time Zone, ex: # Canada/Mountain # Canada/Pacific -system_timezone = "Canada/Saskatchewan" +system_timezone = "Canada/Mountain" [vtscada] server_name = "lb-vanryn"