From 06a6d5aa8818a7c2ee67fc852ffa64cd5c5700e6 Mon Sep 17 00:00:00 2001 From: Michael Van Ryn Date: Wed, 14 Jun 2023 11:21:58 -0600 Subject: [PATCH] Adding ClearSCADA raw data functions. --- README.md | 3 +- input/tags.csv | 32 +++--- main.py | 301 +++++++++++++++++++++++++++++++++++++++---------- setup.toml | 6 + 4 files changed, 266 insertions(+), 76 deletions(-) diff --git a/README.md b/README.md index a16a629..f1ed71c 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,3 @@ -# StandardTemplate +# VTScada-HistoricalTools +Key Point: In the tags list file, the Source Name field is the unique identifier for the tag name to query. In VTScada this can be something like ```temp\old_value1```. In ClearSCADA, it will be the unique point ID, ex. ```005152```. The leading zeroes can be left out as the script will pad them in front of the integere to determine the correct path. diff --git a/input/tags.csv b/input/tags.csv index b72b59a..70c87aa 100644 --- a/input/tags.csv +++ b/input/tags.csv @@ -1,17 +1,17 @@ Source Name,Destination Name,Data Type,Scale Factor,Interval (s),Precision,Deadband -temp\old_value1,temp\new_value1,real,1,20,2,0 -temp\old_value2,temp\new_value2,integer,10,100,0,0 -temp\old_value3,temp\new_value3,real,1.5,100,2,0 -temp\old_value4,temp\new_value4,boolean,1,9,0,0 -temp\old_value5,temp\new_value5,real,1,20,2,0 -temp\old_value6,temp\new_value6,integer,10,100,0,0 -temp\old_value7,temp\new_value7,real,1.5,100,2,0 -temp\old_value8,temp\new_value8,boolean,1,9,0,0 -temp\old_value9,temp\new_value9,real,1,10,2,0 -temp\old_value10,temp\new_value10,integer,1,10,0,0 -temp\old_value11,temp\new_value11,real,1,10,4,0 -temp\old_value12,temp\new_value12,boolean,1,10,0,0 -temp\old_value13,temp\new_value13,real,1,12,4,0 -temp\old_value14,temp\new_value14,integer,1,13,0,0 -temp\old_value15,temp\new_value15,real,1,14,4,0 -temp\old_value16,temp\new_value16,boolean,1,15,0,0 +11959,temp\new_value1,real,1,20,2,0.01 +5153,temp\new_value2,integer,1,100,0,0 +5154,temp\new_value3,real,1,30,0,1 +5155,temp\new_value4,boolean,1,9,0,0 +5,temp\new_value5,real,1,20,2,0 +6,temp\new_value6,integer,10,100,0,0 +227,temp\new_value7,real,1.5,100,2,0 +8,temp\new_value8,boolean,1,9,0,0 +9,temp\new_value9,real,1,10,2,0 +10,temp\new_value10,integer,1,10,0,0 +011,temp\new_value11,real,1,10,4,0 +12,temp\new_value12,boolean,1,10,0,0 +1113,temp\new_value13,real,1,12,4,0 +14,temp\new_value14,integer,1,13,0,0 +1665,temp\new_value15,real,1,14,4,0 +16,temp\new_value16,boolean,1,15,0,0 diff --git a/main.py b/main.py index d887278..b881ed3 100644 --- a/main.py +++ b/main.py @@ -3,6 +3,9 @@ import toml import sys import os import requests +import pyodbc +import subprocess +import time from typing import List, Union from datetime import datetime, timezone, timedelta @@ -38,13 +41,127 @@ class HistoricalTag: def __repr__(self): return f"({self.row}, {self.tag_type}, {self.name_source}, {self.name_dest}, {self.scale_factor}, {self.interval}, {self.precision}, {self.deadband})" - # ---------------------- # Functions # ---------------------- +# clearscada_generate_historical_ids() +# ---------------------- +# Generates a list of historical IDs for found historic files + + +def clearscada_generate_historical_ids(historic_files: str): + ids = [] + + for directory in os.listdir(historic_files): + if os.fsdecode(directory).startswith("Historic "): + ids.append(int(directory[9:15])) + + output_file = os.path.join(output_path, "CS_HistoricIDs.CSV") + + with open(output_file, mode='w', newline='', encoding='utf-8') as csvfile: + csv_writer = csv.writer(csvfile) + + for id in ids: + if id is not None: + csv_writer.writerow([str(id)]) + +# clearscada_query() +# ---------------------- +# Query ClearSCADA raw historical files using the ClearSCADA command line tool to create +# csv data from the raw data files, then process and merge the data into VTScada formats + + +def clearscada_query(historical_tags: List[HistoricalTag], start_time: datetime, end_time: datetime): + dir_path = output_path + str(start_time.year) + "\\" + create_directory(dir_path) + + current_start_time = start_time + current_end_time = end_time + + start_week = weeks_since_date(current_start_time.timestamp()) + end_week = weeks_since_date(current_end_time.timestamp()) + + historic_directories = [] + tags = [] + + # Get a list of all directories of Historic files (format is Historic ID with ID padded with leading zeroes) needed which exist + for tag in historical_tags: + # For ClearSCADA, the tag source is the ID code + padded_id = f'{int(tag.name_source):06}' + + # Check that directory exists and if so, add it to a list + path = os.path.join(historic_files, "Historic " + padded_id) + if os.path.exists(path): + historic_directories.append(path) + tags.append(tag) + + zipped_directories = zip(historic_directories, tags) + + # For each found historic directory execute the ClearSCADA CSV command + tag_mappings = [] + + for (path, tag) in zipped_directories: + # print(path, tag.name_dest) + + command = os.path.join(install_location, "SCXCMD.exe") + + for file in os.listdir(path): + if os.fsdecode(file).endswith(".HRD"): + week_number = int(file[2:8]) + if week_number >= start_week and week_number <= end_week: + argument = os.path.join(path, file) + subprocess.run([command, "HISDUMP", argument]) + + # Process each directory of CSVs first into a list of values that can be pruned + values = [] + output_file = "" + + for file in os.listdir(path): + if os.fsdecode(file).endswith(".csv"): + csv_file = os.path.join(path, file) + + values.extend(read_clearscada_file(csv_file)) + + # Values will have had their deadband and scaling processed, but remaining is excess frequency + if len(values) > 0: + values = postprocess_values(values) + output_file = prepare_file_for_tag( + tag, values, dir_path, current_end_time, True) + tag_mappings.append((output_file, tag.name_dest)) + + write_tagmapping_to_file( + dir_path + "TagMapping.csv", tag_mappings) + + # main_directory = os.fsencode(historic_files) + + +# clearscada_read_file() +# ---------------------- +# Read in a ClearSCADA CSV file converted from HRD into a list of timestamps and values +def clearscada_read_file(file_path: str) -> List[Union[int, float, None]]: + values = [] + + with open(file_path, mode='r', encoding='utf-8-sig') as csvfile: + csv_reader = csv.reader(csvfile, delimiter=',') + next(csv_reader) # Skip the header row + + for row, line in enumerate(csv_reader): + if line[2] == "Good": + timestamp = datetime.timestamp( + datetime.strptime(line[0], "%d/%m/%Y %H:%M:%S")) + value = float(line[1]) + values.append((timestamp, value)) + + return values + + # compress_and_scale_real() # ---------------------- +# -- Deadband (only keeping values which change by the required amount) +# -- Precision (decimal places, cleaning up excess data from floating points) +# -- Scaling factor (applies the scaling factor to the value before assigning the precision) + def compress_and_scale_real(values: List[Union[int, float, None]], deadband: float, scale_factor: float, precision: int) -> List[Union[int, float, None]]: compressed_values = [] working_value = None @@ -64,6 +181,9 @@ def compress_and_scale_real(values: List[Union[int, float, None]], deadband: flo # compress_boolean() # ---------------------- +# Compress a set of timestamp and boolean values to transitions. For booleans, transitions are +# kept and the assumption is +# the interval will be fast enough to keep all transitions. def compress_boolean(values: List[Union[int, float, None]]) -> List[Union[int, float, None]]: @@ -84,14 +204,64 @@ def compress_boolean(values: List[Union[int, float, None]]) -> List[Union[int, f # create_directory() # ---------------------- +# Create a directory if it doesn't exist def create_directory(path): if not os.path.exists(path): os.makedirs(path) +# postprocess_values() +# ---------------------- +# Process a list of values assumed and clean up timestamps which are within the interval of the last +# timestamp. Values are assumed to already have been compressed + + +def postprocess_values(values: List[Union[int, float, None]]): + + last_time = time.time() + + processed_values = [] + + for (timestamp, value) in values: + timedelta = abs(last_time - timestamp) + + if timedelta > 50: + processed_values.append((timestamp, value)) + last_time = timestamp + + last_time = timestamp + + return processed_values + +# prepare_file_for_tag() +# ---------------------- + + +def prepare_file_for_tag(tag: HistoricalTag, values: List[Union[int, float, None]], dir_path: str, current_end_time: datetime, append=False) -> str: + if values is None: + print("No values found") + return "" + else: + output_file = "" + + if tag.tag_type == "real" or tag.tag_type == "integer": + compressed_values = compress_and_scale_real( + values, tag.deadband, tag.scale_factor, tag.precision) + else: + compressed_values = compress_boolean(values) + + if len(compressed_values) != 0: + output_file = tag.name_source.replace('\\', '_') + "_" + str(current_end_time.year) + str( + current_end_time.month) + str(current_end_time.day) + ".csv" + full_output_file = dir_path + output_file + write_values_to_file(full_output_file, compressed_values, True) + + return output_file + # print_text() # ---------------------- +# Print formatting a text line for debugging and such def print_text(text: str): @@ -99,11 +269,33 @@ def print_text(text: str): print(text) print(r'-------------------------------------------------------------------------------------------------------') -# query_vtscada_tag() +# read_tags() # ---------------------- +# Read in the list of tags and set the mapping parameters for each tag and construct the groupings required for the +# query -def query_vtscada_tag(historical_tag: HistoricalTag, ft_start_time: datetime, ft_end_time: datetime) -> List[Union[int, float, None]]: +def read_tags(file_path: str) -> List[HistoricalTag]: + historical_tags = [] + + with open(file_path, mode='r', encoding='utf-8-sig') as csvfile: + csv_reader = csv.reader(csvfile, delimiter=',') + next(csv_reader) # Skip the header row + + for row, line in enumerate(csv_reader): + name_source, name_dest, tag_type, scale_factor, interval, precision, deadband = line + tag = HistoricalTag(row=row+1, tag_type=tag_type, name_source=name_source, name_dest=name_dest, + scale_factor=float(scale_factor), interval=int(interval), precision=int(precision), deadband=float(deadband)) + historical_tags.append(tag) + + return historical_tags + +# vtscada_tag_query() +# ---------------------- +# Given a HistoricalTag structure, query the tag's values from the start time to the end time + + +def vtscada_tag_query(historical_tag: HistoricalTag, ft_start_time: datetime, ft_end_time: datetime) -> List[Union[int, float, None]]: # Query average only for real values (Analog in VTScada) if historical_tag.tag_type == "real": value_string = ":Value:Average" @@ -125,11 +317,13 @@ def query_vtscada_tag(historical_tag: HistoricalTag, ft_start_time: datetime, ft return returned['results']['values'] -# query_vtscada() +# vtscada_query() # ---------------------- +# Given the set of HistoricalTags and a start and end time, query the data of those tags from the +# REST interface -def query_vtscada(historical_tags: List[HistoricalTag], start_time: datetime, end_time: datetime): +def vtscada_query(historical_tags: List[HistoricalTag], start_time: datetime, end_time: datetime): current_start_time = start_time current_end_time = start_time + timedelta(days=1) @@ -147,7 +341,7 @@ def query_vtscada(historical_tags: List[HistoricalTag], start_time: datetime, en tag_mappings = [] for tag in historical_tags: - values = query_vtscada_tag(tag, ft_start_time, ft_end_time) + values = vtscada_tag_query(tag, ft_start_time, ft_end_time) output_file = prepare_file_for_tag( tag, values, dir_path, current_end_time) @@ -160,37 +354,34 @@ def query_vtscada(historical_tags: List[HistoricalTag], start_time: datetime, en current_start_time += timedelta(days=1) current_end_time += timedelta(days=1) -# prepare_file_for_tag() + +# write_tagmappings_to_file() # ---------------------- +# Create a new TagMapping.CSV file which contains the mapping of all tag names and files which +# contain their CSV data -def prepare_file_for_tag(tag: HistoricalTag, values: List[Union[int, float, None]], dir_path: str, current_end_time: datetime) -> str: - if values is None: - print("No values found") - return "" - else: - output_file = "" +def write_tagmapping_to_file(output_file: str, tag_mappings: List[str]): + with open(output_file, mode='a', newline='', encoding='utf-8') as csvfile: + csv_writer = csv.writer(csvfile) - if tag.tag_type == "real" or tag.tag_type == "integer": - compressed_values = compress_and_scale_real( - values, tag.deadband, tag.scale_factor, tag.precision) - else: - compressed_values = compress_boolean(values) - - if len(compressed_values) != 0: - output_file = tag.name_source.replace('\\', '_') + "_" + str(current_end_time.year) + str( - current_end_time.month) + str(current_end_time.day) + ".csv" - full_output_file = dir_path + output_file - write_values_to_file(full_output_file, compressed_values) - - return output_file + for mapping in tag_mappings: + csv_writer.writerow(mapping) # write_values_to_file() # ---------------------- +# Given a full path name of a file and list of timestamp, value pairs, write the values to a +# CSV file with each pair on its own row. -def write_values_to_file(output_file: str, values: List[Union[int, float, None]]): - with open(output_file, mode='w', newline='', encoding='utf-8') as csvfile: +def write_values_to_file(output_file: str, values: List[Union[int, float, None]], append=False): + + if append: + csv_mode = 'a' + else: + csv_mode = 'w' + + with open(output_file, mode=csv_mode, newline='', encoding='utf-8') as csvfile: csv_writer = csv.writer(csvfile) for value_pair in values: @@ -201,37 +392,21 @@ def write_values_to_file(output_file: str, values: List[Union[int, float, None]] '%Y-%m-%d %H:%M:%S.%f')[:-3] csv_writer.writerow([formatted_timestamp, value]) -# write_tagmappings_to_file() + +# weeks_since_date() # ---------------------- +# Returns the number of weeks since the given timestamp, or defaults to December 25th, 1600 -def write_tagmapping_to_file(output_file: str, tag_mappings: List[str]): - with open(output_file, mode='a', newline='', encoding='utf-8') as csvfile: - csv_writer = csv.writer(csvfile) +def weeks_since_date(timestamp, date=(1600, 12, 25)): + dt = datetime.utcfromtimestamp(timestamp) + start_date = datetime(*date) + delta = dt - start_date + weeks = delta.days // 7 - for mapping in tag_mappings: - csv_writer.writerow(mapping) + return weeks -# read_tags() -# ---------------------- -# Read in the list of tags and set the mapping parameters for each tag and construct the groupings required for the -# query -def read_tags(file_path: str) -> List[HistoricalTag]: - historical_tags = [] - - with open(file_path, mode='r', encoding='utf-8-sig') as csvfile: - csv_reader = csv.reader(csvfile, delimiter=',') - next(csv_reader) # Skip the header row - - for row, line in enumerate(csv_reader): - name_source, name_dest, tag_type, scale_factor, interval, precision, deadband = line - tag = HistoricalTag(row=row+1, tag_type=tag_type, name_source=name_source, name_dest=name_dest, - scale_factor=float(scale_factor), interval=int(interval), precision=int(precision), deadband=float(deadband)) - historical_tags.append(tag) - - return historical_tags - # ---------------------- # Main Section # ---------------------- @@ -252,10 +427,6 @@ system_timezone = config['system']['system_timezone'] application_user = config['user']['application_user'] application_pass = config['user']['application_pass'] -server = config['vtscada']['server_name'] -realm_port = config['vtscada']['realm_port'] -realm_name = config['vtscada']['realm_name'] - if len(sys.argv) == 4: query_type = sys.argv[1] @@ -273,10 +444,22 @@ if len(sys.argv) == 4: if query_type == "VTScada": print_text('VTScada Data Query') - query_vtscada(historical_tags, start_time, end_time) + + server = config['vtscada']['server_name'] + realm_port = config['vtscada']['realm_port'] + realm_name = config['vtscada']['realm_name'] + + vtscada_query(historical_tags, start_time, end_time) elif query_type == "AVEVA": print_text('AVEVA Historian - Not Implemented') elif query_type == "ClearSCADA": - print_text('ClearSCADA - Not Implemented') + print_text('ClearSCADA - Query Raw Historic Files') + historic_files = config['clearscada']['historic_files'] + install_location = config['clearscada']['install_location'] + delete_processed = config['clearscada']['delete_processed'] + + clearscada_generate_historical_ids(historic_files) + clearscada_query(historical_tags, start_time, end_time) + else: print("Invalid arguments!") diff --git a/setup.toml b/setup.toml index 124aa67..4ec8d1c 100644 --- a/setup.toml +++ b/setup.toml @@ -12,6 +12,12 @@ server_name = "lb-vanryn" realm_port = "8888" realm_name = "RESTRealm" +[clearscada] +historic_files = "C:\\ProgramData\\Schneider Electric\\ClearSCADA\\Database\\HisFiles" +install_location = "C:\\Program Files (x86)\\Schneider Electric\\ClearSCADA" +# Set true to clear out the Historic files as they are processed +delete_processed = false + [user] application_user = "query" application_pass = "queryuser"