########################################################### # # # This file contains various classes and methods # # needed to handle discrete event simulations of # # of binary repairable components in a binary # # monotone system # # # ########################################################### from random import expovariate, weibullvariate from math import gamma import numpy as np import matplotlib.pyplot as plt # Event codes FAILURE_EVENT = 100 REPAIR_EVENT = 200 SAMPLE_EVENT = 300 # State codes FUNCTIONING = 1 FAILED = 0 # Criticality codes CRITICAL = 1 NONCRITICAL = 0 # Run simulation up to maxTime + EPS to make sure that we get all events EPS = 0.1 class Distribution: def generate(self): pass def mean(self): pass class Exponential(Distribution): def __init__(self, rate): self.rate = rate # The rate parameter (inverse of expectation) def generate(self): return expovariate(self.rate) def mean(self): return 1 / self.rate class Weibull(Distribution): def __init__(self, alpha, beta): self.alpha = alpha # The shape parameter self.beta = beta # The scale parameter def generate(self): return weibullvariate(self.beta, self.alpha) def mean(self): return self.beta * gamma(1 + 1/self.alpha) ################################################### # # # Event class # # # ################################################### class Event: def __init__(self, eventTime, eventCode, eventHandler, eventData = None): self.eventTime = eventTime self.eventCode = eventCode self.eventHandler = eventHandler self.eventData = eventData self.next = None def push(self, event): if self.next == None: self.next = event elif event.eventTime < self.next.eventTime: event.next = self.next self.next = event else: self.next.push(event) def processEvent(self): self.eventHandler.processEvent(self) ################################################### # # # EventHandler class # # # ################################################### class EventHandler: def __init__(self, eventQueue): self.eventQueue = eventQueue def prepare(self): pass def processEvent(self, event): pass ################################################### # # # EventQueue class # # # ################################################### class EventQueue: def __init__(self, maxTime): self.first = None self.time = 0 self.maxTime = maxTime def reinit(self): self.first = None self.time = 0 def push(self, event): event.eventTime += self.time if self.first == None: self.first = event elif event.eventTime < self.first.eventTime: event.next = self.first self.first = event else: self.first.push(event) def pop(self): if self.first != None: current_event = self.first self.time = current_event.eventTime self.first = current_event.next current_event.processEvent() def processAllEvents(self): while (self.first != None) and (self.time < self.maxTime + EPS): self.pop() ################################################### # # # EventSystem class # # # ################################################### class EventSystem(EventHandler): def __init__(self, eventQueue, structureFunction, eventComponents, sampleInterval, calculateBirnbaum=False, calculateBarlowProschan=False): super().__init__(eventQueue) self.structureFunction = structureFunction self.eventComponents = eventComponents self.numSimulations = 0 index = 0 for comp in self.eventComponents: comp.setEventSystem(self, index) index += 1 self.numComponents = len(eventComponents) self.compStates = np.ones(self.numComponents) self.sampleInterval = sampleInterval self.numPoints = int(eventQueue.maxTime / sampleInterval) + 1 self.sampleTimes = np.linspace(0, eventQueue.maxTime, self.numPoints) self.systemStates = np.zeros(self.numPoints) self.meanSystemStates = np.zeros(self.numPoints) self.failures = 0 # Birnbaum criticality self.calculateBirnbaum = calculateBirnbaum if self.calculateBirnbaum: self.criticalityStates = [] self.meanCriticalityStates = [] for i in range(self.numComponents): self.criticalityStates.append(np.zeros(self.numPoints)) self.meanCriticalityStates.append(np.zeros(self.numPoints)) self.criticalTime = np.zeros(self.numComponents) self.noncriticalTime = np.zeros(self.numComponents) self.criticalityState = np.zeros(self.numComponents) # Barlow & Proschan criticality self.calculateBarlowProschan = calculateBarlowProschan if self.calculateBarlowProschan: self.barlowProschanCriticality = np.zeros(self.numComponents) for i in range(self.numComponents): self.barlowProschanCriticality[i] = 0 def criticalityFunction(self, index): tmpCompState = self.compStates[index] if tmpCompState == FUNCTIONING: phi_1 = self.state self.compStates[index] = FAILED phi_0 = self.structureFunction(self.compStates) else: phi_0 = self.state self.compStates[index] = FUNCTIONING phi_1 = self.structureFunction(self.compStates) self.compStates[index] = tmpCompState return phi_1 - phi_0 def prepare(self): self.numSimulations += 1 for comp in self.eventComponents: comp.prepare() self.state = self.structureFunction(self.compStates) self.systemStates[0] += self.state self.meanSystemStates[0] += self.state self.upTime = 0 self.downTime = 0 if self.calculateBirnbaum: for i in range(self.numComponents): self.criticalityState[i] = self.criticalityFunction(i) self.criticalityStates[i][0] += self.criticalityState[i] self.meanCriticalityStates[i][0] += self.criticalityState[i] self.criticalTime[i] = 0 self.noncriticalTime[i] = 0 self.pointCount = 1 self.lastEventTime = 0 if self.pointCount < self.numPoints: sampleEvent = Event(self.sampleInterval, SAMPLE_EVENT, self) self.eventQueue.push(sampleEvent) def processEvent(self, event): if event.eventCode == FAILURE_EVENT: self.processFailureEvent(event) elif event.eventCode == REPAIR_EVENT: self.processRepairEvent(event) elif event.eventCode == SAMPLE_EVENT: self.processSampleEvent(event) def processFailureEvent(self, event): index = int(event.eventData) self.compStates[index] = FAILED if self.state == FUNCTIONING: self.upTime += (event.eventTime - self.lastEventTime) self.state = self.structureFunction(self.compStates) if self.state == FAILED: self.failures += 1 if self.calculateBarlowProschan: self.barlowProschanCriticality[index] += 1 else: self.downTime += (event.eventTime - self.lastEventTime) if self.calculateBirnbaum: for i in range(self.numComponents): if self.criticalityState[i] == CRITICAL: self.criticalTime[i] += (event.eventTime - self.lastEventTime) else: self.noncriticalTime[i] += (event.eventTime - self.lastEventTime) self.criticalityState[i] = self.criticalityFunction(i) self.lastEventTime = event.eventTime def processRepairEvent(self, event): index = int(event.eventData) self.compStates[index] = FUNCTIONING if self.state == FAILED: self.downTime += (event.eventTime - self.lastEventTime) self.state = self.structureFunction(self.compStates) else: self.upTime += (event.eventTime - self.lastEventTime) if self.calculateBirnbaum: for i in range(self.numComponents): if self.criticalityState[i] == CRITICAL: self.criticalTime[i] += (event.eventTime - self.lastEventTime) else: self.noncriticalTime[i] += (event.eventTime - self.lastEventTime) self.criticalityState[i] = self.criticalityFunction(i) self.lastEventTime = event.eventTime def processSampleEvent(self, event): if self.state == FUNCTIONING: self.upTime += (event.eventTime - self.lastEventTime) else: self.downTime += (event.eventTime - self.lastEventTime) if self.calculateBirnbaum: for i in range(self.numComponents): if self.criticalityState[i] == CRITICAL: self.criticalTime[i] += (event.eventTime - self.lastEventTime) else: self.noncriticalTime[i] += (event.eventTime - self.lastEventTime) self.lastEventTime = event.eventTime currPoint = self.pointCount self.pointCount += 1 self.systemStates[currPoint] += self.state self.meanSystemStates[currPoint] += (self.upTime / self.lastEventTime) if self.calculateBirnbaum: for i in range(self.numComponents): self.criticalityStates[i][currPoint] += self.criticalityState[i] self.meanCriticalityStates[i][currPoint] += (self.criticalTime[i] / self.lastEventTime) if self.pointCount < self.numPoints: sampleEvent = Event(self.sampleInterval, SAMPLE_EVENT, self) self.eventQueue.push(sampleEvent) def calculateAverages(self): for i in range(self.numPoints): self.systemStates[i] /= self.numSimulations self.meanSystemStates[i] /= self.numSimulations if self.calculateBirnbaum: for i in range(self.numComponents): self.criticalityStates[i] /= self.numSimulations self.meanCriticalityStates[i] /= self.numSimulations ###### Various plot methods related to system states and mean system states ###### def plotSystemStates(self, file=None): plt.plot(self.sampleTimes, self.systemStates, label='Availability') plt.xlabel('Time') plt.ylabel('Avail.') plt.title("Availability as a function of time") plt.legend() if file != None: plt.savefig(file) plt.show() def plotMeanSystemStates(self, file=None): plt.plot(self.sampleTimes, self.meanSystemStates, label='Mean availability') plt.xlabel('Time') plt.ylabel('Avail.') plt.title("Mean availability as a function of time") plt.legend() if file != None: plt.savefig(file) plt.show() def plotComboSystemStates(self, file=None): plt.plot(self.sampleTimes, self.systemStates, label='Availability') plt.plot(self.sampleTimes, self.meanSystemStates, label='Mean availability') plt.xlabel('Time') plt.ylabel('Avail.') plt.title("Availability and mean availability as a function of time") plt.legend() if file != None: plt.savefig(file) plt.show() def plotBirnbaumImportance(self, file=None): if self.calculateBirnbaum: for i in range(self.numComponents): plt.plot(self.sampleTimes, self.criticalityStates[i], label='Comp ' + str(i+1)) plt.xlabel('Time') plt.ylabel('Importance') plt.title("Birnbaum importance as a function of time") plt.legend() if file != None: plt.savefig(file) plt.show() else: print("Birnbaum importance not calculated") def plotMeanBirnbaumImportance(self, file=None): if self.calculateBirnbaum: for i in range(self.numComponents): plt.plot(self.sampleTimes, self.meanCriticalityStates[i], label='Comp ' + str(i+1)) plt.xlabel('Time') plt.ylabel('Importance') plt.title("Mean Birnbaum importance as a function of time") plt.legend() if file != None: plt.savefig(file) plt.show() else: print("Mean Birnbaum importance not calculated") def plotBarlowProschanImportance(self, file=None): if self.calculateBarlowProschan: comp = [] bp_imp = [] for i in range(self.numComponents): comp.append(i+1) bp_imp.append(self.barlowProschanCriticality[i] / self.failures) plt.bar(comp, bp_imp, color ='gray', width = 0.4) plt.xlabel("Components") plt.ylabel("Importance") plt.title("Barlow-Proschan importance") if file != None: plt.savefig(file) plt.show() else: print("Barlow-Proschan importance not calculated") def printSystemStatistics(self): print("Up time:", self.upTime) print("Down time:", self.downTime) print("Mean availability:", self.upTime / self.lastEventTime) print("Asympt. Availability:", self.meanSystemStates[-1]) print("------------------------------------") print("Number of failures:", self.failures) if self.calculateBirnbaum: print("------------------------------------") for i in range(self.numComponents): print("Critical time", (i+1), ":", self.criticalTime[i]) print("Noncritical time", (i+1), ":", self.noncriticalTime[i]) print("Mean criticality", (i+1), ":", self.criticalTime[i] / self.lastEventTime) print("Asympt. criticality", (i+1), ":", self.criticalityStates[i][-1]) print("") if self.calculateBarlowProschan: print("------------------------------------") for i in range(self.numComponents): print("Barlow-Proschan importance", i, ":", self.barlowProschanCriticality[i] / self.failures) print("------------------------------------") ################################################### # # # EventComponent class # # # ################################################### class EventComponent(EventHandler): def __init__(self, eventQueue, failureDistribution, repairDistribution): super().__init__(eventQueue) self.eventSystem = None self.index = 0 self.failureDistribution = failureDistribution self.repairDistribution = repairDistribution self.state = FUNCTIONING def setEventSystem(self, eventSystem, index): self.eventSystem = eventSystem self.index = index def prepare(self): self.state = FUNCTIONING failureEvent = Event(self.failureDistribution.generate(), FAILURE_EVENT, self, self.index) self.eventQueue.push(failureEvent) def processEvent(self, event): if event.eventCode == FAILURE_EVENT: self.state = FAILED repairEvent = Event(self.repairDistribution.generate(), REPAIR_EVENT, self, self.index) self.eventQueue.push(repairEvent) elif event.eventCode == REPAIR_EVENT: self.state = FUNCTIONING failureEvent = Event(self.failureDistribution.generate(), FAILURE_EVENT, self, self.index) self.eventQueue.push(failureEvent) if self.eventSystem != None: self.eventSystem.processEvent(event) def coproduct(*states): prod = 1 for state in states: prod *= (1 - state) return 1 - prod