import csv import toml import sys import os import requests import pyodbc import subprocess import time from typing import List, Union from datetime import datetime, timezone, timedelta # LIMITATIONS - 10k rows per VTScada query - reccommend limit to minimum 10 second intervals for daily values # ---------------------- # Classes # ---------------------- # HistoricalTag # ---------------------- class HistoricalTag: def __init__(self, row: int, tag_type: str, name_source: str, name_dest: str, scale_factor: float, interval: int, precision: int, deadband: float): self.row = row self.tag_type = tag_type self.name_source = name_source self.name_dest = name_dest self.scale_factor = scale_factor self.interval = interval self.precision = precision self.deadband = deadband def __repr__(self): return f"({self.row}, {self.tag_type}, {self.name_source}, {self.name_dest}, {self.scale_factor}, {self.interval}, {self.precision}, {self.deadband})" # ---------------------- # AVEVA (Wonderware) Historian Functions # ---------------------- def aveva_query(historical_tags: List[HistoricalTag], start_time: datetime, end_time: datetime): print("Querying data for: " + str(start_time.year) + " " + str(start_time.month) + " " + str(start_time.day)) dir_path = output_path + str(start_time.year) + "\\" create_directory(dir_path) ft_start_time = "'" + str(start_time.astimezone(timezone.utc)) + "'" ft_end_time = "'" + str(end_time.astimezone(timezone.utc)) + "'" init_string = "driver={SQLOLEDB}; server=" + server + "; database=" + \ database_name + "; UID=" + application_user + "; PWD=" + application_pass + ";" print(init_string) # connection = pyodbc.connect(init_string) for tag in historical_tags: if tag.tag_type == "real" or tag.tag_type == "integer": retrieval_mode = "'Average'" else: retrieval_mode = "'Cyclic'" query = "SELECT * FROM OpenQuery(INSQL, 'SELECT DateTime, '" + tag.name_source + "' FROM WideHistory WHERE DateTime >= " + \ ft_start_time + " AND DateTime <= " + ft_end_time + " AND wwRetrievalMode = " + retrieval_mode + \ " AND wwResolution = " + str(tag.interval) + "')" print(query) # ---------------------- # ClearSCADA Functions # ---------------------- # clearscada_generate_historical_ids() # ---------------------- # Generates a list of historical IDs for found historic files def clearscada_generate_historical_ids(historic_files: str): ids = [] for directory in os.listdir(historic_files): if os.fsdecode(directory).startswith("Historic "): ids.append(int(directory[9:15])) output_file = os.path.join(output_path, "CS_HistoricIDs.CSV") with open(output_file, mode='w', newline='', encoding='utf-8') as csvfile: csv_writer = csv.writer(csvfile) for id in ids: if id is not None: csv_writer.writerow([str(id)]) # clearscada_process() # ---------------------- # Process ClearSCADA raw historical files using the ClearSCADA command line tool to create # csv data from the raw data files, then process and merge the data into VTScada formats def clearscada_process(historical_tags: List[HistoricalTag], start_time: datetime, end_time: datetime): dir_path = output_path + str(start_time.year) + "\\" create_directory(dir_path) current_start_time = start_time current_end_time = end_time start_week = weeks_since_date(current_start_time.timestamp()) end_week = weeks_since_date(current_end_time.timestamp()) historic_directories = [] tags = [] # Get a list of all directories of Historic files (format is Historic ID with ID padded with leading zeroes) needed which exist for tag in historical_tags: # For ClearSCADA, the tag source is the ID code padded_id = f'{int(tag.name_source):06}' # Check that directory exists and if so, add it to a list path = os.path.join(historic_files, "Historic " + padded_id) if os.path.exists(path): historic_directories.append(path) tags.append(tag) zipped_directories = zip(historic_directories, tags) # For each found historic directory execute the ClearSCADA CSV command print("Found: " + str(len(zipped_directories)) + " historic directories matching tags") tag_mappings = [] files = [] for (path, tag) in zipped_directories: # print(path, tag.name_dest) command = os.path.join(install_location, "SCXCMD.exe") for file in os.listdir(path): if os.fsdecode(file).endswith(".HRD"): week_number = int(file[2:8]) if week_number >= start_week and week_number <= end_week: files.append(file) argument = os.path.join(path, file) subprocess.run([command, "HISDUMP", argument]) # Process each directory of CSVs first into a list of values that can be pruned values = [] output_file = "" for file in os.listdir(path): if os.fsdecode(file).endswith(".csv"): csv_file = os.path.join(path, file) values.extend(clearscada_read_file(csv_file)) files.append(file) # Values will have had their deadband and scaling processed, but remaining is excess frequency if len(values) > 0: values = postprocess_values(values) output_file = prepare_file_for_tag( tag, values, dir_path, current_end_time, True) tag_mappings.append((output_file, tag.name_dest)) # Delete files as they are processed for file in files: if file.endswith(".csv") or (delete_processed and file.endswith(".HRD")): try: os.remove(file) except OSError as e: print("Error: %s - %s." % (e.filename, e.strerror)) write_tagmapping_to_file( dir_path + "TagMapping.csv", tag_mappings) # clearscada_read_file() # ---------------------- # Read in a ClearSCADA CSV file converted from HRD into a list of timestamps and values def clearscada_read_file(file_path: str) -> List[Union[int, float, None]]: values = [] with open(file_path, mode='r', encoding='utf-8-sig') as csvfile: csv_reader = csv.reader(csvfile, delimiter=',') next(csv_reader) # Skip the header row for row, line in enumerate(csv_reader): if line[2] == "Good": timestamp = datetime.timestamp( datetime.strptime(line[0], "%d/%m/%Y %H:%M:%S")) value = float(line[1]) values.append((timestamp, value)) return values # ---------------------- # VTScada Functions # ---------------------- # vtscada_tag_query() # ---------------------- # Given a HistoricalTag structure, query the tag's values from the start time to the end time def vtscada_tag_query(historical_tag: HistoricalTag, ft_start_time: datetime, ft_end_time: datetime) -> List[Union[int, float, None]]: # Query average only for real values (Analog in VTScada) if historical_tag.tag_type == "real": value_string = ":Value:Average" # Otherwise, query the value at the start of the interval else: value_string = ":Value:ValueAtStart" query = "SELECT Timestamp, '" + historical_tag.name_source + value_string + "' FROM History_" + \ str(historical_tag.interval) + "s" + " WHERE Timestamp BETWEEN " + \ ft_start_time + " AND " + ft_end_time url = "http://" + server + ":" + realm_port + \ "/" + realm_name + "/REST/SQLQuery?query=" + query # print_text(url) response = requests.get(url, auth=(application_user, application_pass)) returned = response.json() return returned['results']['values'] # vtscada_query() # ---------------------- # Given the set of HistoricalTags and a start and end time, query the data of those tags from the # REST interface def vtscada_query(historical_tags: List[HistoricalTag], start_time: datetime, end_time: datetime): current_start_time = start_time current_end_time = start_time + timedelta(days=1) while current_start_time < end_time: print("Querying data for: " + str(current_start_time.year) + " " + str(current_start_time.month) + " " + str(current_start_time.day)) dir_path = output_path + str(start_time.year) + "\\" create_directory(dir_path) ft_start_time = "'" + \ str(current_start_time.astimezone(timezone.utc)) + "'" ft_end_time = "'" + \ str(current_end_time.astimezone(timezone.utc)) + "'" tag_mappings = [] for tag in historical_tags: values = vtscada_tag_query(tag, ft_start_time, ft_end_time) output_file = prepare_file_for_tag( tag, values, dir_path, current_end_time) if output_file != "": tag_mappings.append((output_file, tag.name_dest)) write_tagmapping_to_file( dir_path + "TagMapping.csv", tag_mappings) current_start_time += timedelta(days=1) current_end_time += timedelta(days=1) # ---------------------- # Common Functions # ---------------------- # compress_and_scale_real() # ---------------------- # -- Deadband (only keeping values which change by the required amount) # -- Precision (decimal places, cleaning up excess data from floating points) # -- Scaling factor (applies the scaling factor to the value before assigning the precision) def compress_and_scale_real(values: List[Union[int, float, None]], deadband: float, scale_factor: float, precision: int) -> List[Union[int, float, None]]: compressed_values = [] working_value = None for value_pair in values: timestamp, value = value_pair if value is None: continue if working_value is None or abs(value - working_value) > deadband: working_value = value scaled_value = round(value * scale_factor, precision) compressed_values.append((timestamp, scaled_value)) return compressed_values # compress_boolean() # ---------------------- # Compress a set of timestamp and boolean values to transitions. For booleans, transitions are # kept and the assumption is # the interval will be fast enough to keep all transitions. def compress_boolean(values: List[Union[int, float, None]]) -> List[Union[int, float, None]]: compressed_values = [] working_value = None for value_pair in values: timestamp, value = value_pair if value is None: continue if working_value is None or value != working_value: working_value = value compressed_values.append((timestamp, value)) return compressed_values # create_directory() # ---------------------- # Create a directory if it doesn't exist def create_directory(path): if not os.path.exists(path): os.makedirs(path) # postprocess_values() # ---------------------- # Process a list of values assumed and clean up timestamps which are within the interval of the last # timestamp. Values are assumed to already have been compressed def postprocess_values(values: List[Union[int, float, None]]): last_time = time.time() processed_values = [] for (timestamp, value) in values: timedelta = abs(last_time - timestamp) if timedelta > 50: processed_values.append((timestamp, value)) last_time = timestamp last_time = timestamp return processed_values # prepare_file_for_tag() # ---------------------- # Helper function to call the correct compressing and processing functions for a given tag and getting it written to # file def prepare_file_for_tag(tag: HistoricalTag, values: List[Union[int, float, None]], dir_path: str, current_end_time: datetime, append=False) -> str: if values is None: print("No values found") return "" else: output_file = "" if tag.tag_type == "real" or tag.tag_type == "integer": compressed_values = compress_and_scale_real( values, tag.deadband, tag.scale_factor, tag.precision) else: compressed_values = compress_boolean(values) if len(compressed_values) != 0: output_file = tag.name_source.replace('\\', '_') + "_" + str(current_end_time.year) + str( current_end_time.month) + str(current_end_time.day) + ".csv" full_output_file = dir_path + output_file write_values_to_file(full_output_file, compressed_values, True) return output_file # print_text() # ---------------------- # Print formatting a text line for debugging and such def print_text(text: str): print(r'-------------------------------------------------------------------------------------------------------') print(text) print(r'-------------------------------------------------------------------------------------------------------') # read_tags() # ---------------------- # Read in the list of tags and set the mapping parameters for each tag and construct the groupings required for the # query def read_tags(file_path: str) -> List[HistoricalTag]: historical_tags = [] with open(file_path, mode='r', encoding='utf-8-sig') as csvfile: csv_reader = csv.reader(csvfile, delimiter=',') next(csv_reader) # Skip the header row for row, line in enumerate(csv_reader): name_source, name_dest, tag_type, scale_factor, interval, precision, deadband = line tag = HistoricalTag(row=row+1, tag_type=tag_type, name_source=name_source, name_dest=name_dest, scale_factor=float(scale_factor), interval=int(interval), precision=int(precision), deadband=float(deadband)) historical_tags.append(tag) return historical_tags # write_tagmappings_to_file() # ---------------------- # Create a new TagMapping.CSV file which contains the mapping of all tag names and files which # contain their CSV data def write_tagmapping_to_file(output_file: str, tag_mappings: List[str]): with open(output_file, mode='a', newline='', encoding='utf-8') as csvfile: csv_writer = csv.writer(csvfile) for mapping in tag_mappings: csv_writer.writerow(mapping) # write_values_to_file() # ---------------------- # Given a full path name of a file and list of timestamp, value pairs, write the values to a # CSV file with each pair on its own row. def write_values_to_file(output_file: str, values: List[Union[int, float, None]], append=False): if append: csv_mode = 'a' else: csv_mode = 'w' with open(output_file, mode=csv_mode, newline='', encoding='utf-8') as csvfile: csv_writer = csv.writer(csvfile) for value_pair in values: timestamp, value = value_pair if value is not None: utc_dt = datetime.utcfromtimestamp(timestamp) formatted_timestamp = utc_dt.strftime( '%Y-%m-%d %H:%M:%S.%f')[:-3] csv_writer.writerow([formatted_timestamp, value]) # weeks_since_date() # ---------------------- # Returns the number of weeks since the given timestamp, or defaults to January 1, 1601 # It looks like ClearSCADA documentation before the GeoSCADA rebrand erroneously used # the number of weeks since December 25, 1600, which caused being off by 1. def weeks_since_date(timestamp, date=(1601, 1, 1)): dt = datetime.utcfromtimestamp(timestamp) start_date = datetime(*date) delta = dt - start_date weeks = delta.days // 7 return weeks # ---------------------- # Main Section # ---------------------- print(r' _ _ _____ _____ _______ ____ _____ _____ _____ _ _______ ____ ____ _ _____ ') print(r'| | | |_ _|/ ____|__ __/ __ \| __ \|_ _/ ____| /\ | | |__ __/ __ \ / __ \| | / ____|') print(r'| |__| | | | | (___ | | | | | | |__) | | || | / \ | | | | | | | | | | | | | (___ ') print(r'| __ | | | \___ \ | | | | | | _ / | || | / /\ \ | | | | | | | | | | | | \___ \ ') print(r'| | | |_| |_ ____) | | | | |__| | | \ \ _| || |____ / ____ \| |____ | | | |__| | |__| | |____ ____) |') print(r'|_| |_|_____|_____/ |_| \____/|_| \_\_____\_____/_/ \_\______| |_| \____/ \____/|______|_____/ ') config = toml.load("setup.toml") tags_path = config['system']['tags_path'] output_path = config['system']['output_path'] system_timezone = config['system']['system_timezone'] application_user = config['user']['application_user'] application_pass = config['user']['application_pass'] if len(sys.argv) == 4: query_type = sys.argv[1] year, month, day = map(int, str(sys.argv[2]).split("-")) start_time = datetime(year, month, day, 0, 0, 0) year, month, day = map(int, str(sys.argv[3]).split("-")) end_time = datetime(year, month, day, 0, 0, 0) print("") historical_tags = read_tags(tags_path) for tag in historical_tags: print(tag) if query_type == "VTScada": print_text('VTScada Data Query') server = config['vtscada']['server_name'] realm_port = config['vtscada']['realm_port'] realm_name = config['vtscada']['realm_name'] vtscada_query(historical_tags, start_time, end_time) elif query_type == "AVEVA": server = config['aveva']['server_name'] database_name = config['aveva']['database_name'] aveva_query(historical_tags, start_time, end_time) elif query_type == "ClearSCADA": print_text('ClearSCADA - Query Raw Historic Files') historic_files = config['clearscada']['historic_files'] install_location = config['clearscada']['install_location'] delete_processed = config['clearscada']['delete_processed'] clearscada_generate_historical_ids(historic_files) clearscada_process(historical_tags, start_time, end_time) else: print("Invalid arguments!")