import csv import toml import sys import requests import pytz from enum import Enum, auto #from strnum import StrEnum from typing import List from datetime import timedelta, time, datetime # ---------------------- # Classes # ---------------------- #class TagDataType(StrNum): # BOOLEAN = auto() # INTEGER = auto() # FLOAT = auto() # STRING = () class HistoricalTag: def __init__(self, row: int, tag_type: str, name_source: str, name_dest: str, scale_factor: float, interval: int, precision: int): self.row = row self.tag_type = tag_type self.name_source = name_source self.name_dest = name_dest self.scale_factor = scale_factor self.interval = interval self.precision = precision def __repr__(self): return f"({self.row}, {self.tag_type}, {self.name_source}, {self.name_dest}, {self.scale_factor}, {self.interval}, {self.precision})" # ---------------------- # Functions # ---------------------- # format_time_of_day() # ------------------ # day - python datetime # timezone - pytz timezone # minute_offset - # of minutes to offset the # ------------------ # Converts timestamps like YYYY-MM-DDTHH:MM:ss.000000Z to # timestamps in the form of YYYY-MM-DD HH:MM:ss def format_time_of_day(day, timezone, minute_offset): format = '%Y-%m-%dT%H:%M:%S.%fZ' midnight = timezone.localize(datetime.combine( day, time()) + timedelta(minutes=minute_offset), is_dst=None) midnight = midnight.astimezone(pytz.utc).strftime( format).replace(".000000Z", "") return "'" + midnight.replace("T", " ") + "'" def print_text(text: str): print(r'-------------------------------------------------------------------------------------------------------') print(text) print(r'-------------------------------------------------------------------------------------------------------') def map_tags(): print("-- Mapping Tags --") def query_vtscada(historical_tags: List[HistoricalTag], start_time: datetime, end_time: datetime): ft_start_time = format_time_of_day(start_time, timezone, 0) ft_end_time = format_time_of_day(end_time, timezone, 0) for tag in historical_tags: query = "SELECT Timestamp, '" + tag.name_source + ":Value:Average' FROM History_" + str(tag.interval) + "s" + \ " WHERE Timestamp BETWEEN " + ft_start_time + " AND " + ft_end_time url = "http://" + server + ":" + realm_port + \ "/" + realm_name + "/REST/SQLQuery?query=" + query print_text(url) response = requests.get(url, auth=(application_user, application_pass)) returned = response.json() values = returned['results']['values'] output_file = tag.name_source.replace('\\','_') + ".csv" print(output_file) with open(output_file, mode='w', newline='', encoding='utf-8') as csvfile: csv_writer = csv.writer(csvfile) for value_pair in values: if value_pair[1] is not None: csv_writer.writerow(value_pair) # read_tags() # ---------------------- # Read in the list of tags and set the mapping parameters for each tag and construct the groupings required for the # query def read_tags(file_path: str) -> List[HistoricalTag]: historical_tags = [] with open(file_path, mode='r', encoding='utf-8-sig') as csvfile: csv_reader = csv.reader(csvfile, delimiter=',') next(csv_reader) # Skip the header row for row, line in enumerate(csv_reader): name_source, name_dest, tag_type, scale_factor, interval, precision = line tag = HistoricalTag(row=row+1, tag_type=tag_type, name_source=name_source, name_dest=name_dest, scale_factor=float(scale_factor), interval=int(interval), precision=int(precision)) historical_tags.append(tag) return historical_tags # ---------------------- # Main Section # ---------------------- print(r' _ _ _____ _____ _______ ____ _____ _____ _____ _ _______ ____ ____ _ _____ ') print(r'| | | |_ _|/ ____|__ __/ __ \| __ \|_ _/ ____| /\ | | |__ __/ __ \ / __ \| | / ____|') print(r'| |__| | | | | (___ | | | | | | |__) | | || | / \ | | | | | | | | | | | | | (___ ') print(r'| __ | | | \___ \ | | | | | | _ / | || | / /\ \ | | | | | | | | | | | | \___ \ ') print(r'| | | |_| |_ ____) | | | | |__| | | \ \ _| || |____ / ____ \| |____ | | | |__| | |__| | |____ ____) |') print(r'|_| |_|_____|_____/ |_| \____/|_| \_\_____\_____/_/ \_\______| |_| \____/ \____/|______|_____/ ') config = toml.load("setup.toml") tags_path = config['system']['tags_path'] timezone = config['system']['system_timezone'] system_timezone = config['system']['system_timezone'] application_user = config['user']['application_user'] application_pass = config['user']['application_pass'] server = config['vtscada']['server_name'] realm_port = config['vtscada']['realm_port'] realm_name = config['vtscada']['realm_name'] timezone = pytz.timezone(system_timezone) if len(sys.argv) == 2: query_type = sys.argv[1] print("") historical_tags = read_tags(tags_path) start_time = datetime(2023, 4, 30, 0, 0, 0) end_time = datetime(2023, 5, 2, 23, 59, 59) for tag in historical_tags: print(tag) if query_type == "VTScada": print_text('VTScada Data Query') query_vtscada(historical_tags, start_time, end_time) elif query_type == "AVEVA": print_text('AVEVA Historian - Not Implemented') elif query_type == "ClearSCADA": print_text('ClearSCADA - Not Implemented') else: print("Invalid arguments!")