summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndrew Butterfield <Andrew.Butterfield@scss.tcd.ie>2023-01-10 19:57:22 +0000
committerSebastian Huber <sebastian.huber@embedded-brains.de>2023-01-18 12:42:32 +0100
commit1283c1680a2042e880220f0d62770b97859ea03b (patch)
tree2575852bd3b236af73332e2356fb1a21e599a97d
parenttop-level sources (diff)
downloadrtems-central-1283c1680a2042e880220f0d62770b97859ea03b.tar.bz2
main Coconut sources
-rw-r--r--formal/promela/src/src/__init__.py0
-rw-r--r--formal/promela/src/src/library.coco94
-rw-r--r--formal/promela/src/src/refine_command.coco487
-rw-r--r--formal/promela/src/src/spin2test.coco104
-rw-r--r--formal/promela/src/src/syntax_ml.coco211
-rw-r--r--formal/promela/src/src/syntax_pml.coco396
-rw-r--r--formal/promela/src/src/syntax_yaml.coco182
-rw-r--r--formal/promela/src/src/testgen.coco624
8 files changed, 2098 insertions, 0 deletions
diff --git a/formal/promela/src/src/__init__.py b/formal/promela/src/src/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/formal/promela/src/src/__init__.py
diff --git a/formal/promela/src/src/library.coco b/formal/promela/src/src/library.coco
new file mode 100644
index 00000000..e212538d
--- /dev/null
+++ b/formal/promela/src/src/library.coco
@@ -0,0 +1,94 @@
+ ##############################################################################
+ # Library
+ #
+ # Copyright (C) 2019-2021 Trinity College Dublin (www.tcd.ie)
+ #
+ # All rights reserved.
+ #
+ # Redistribution and use in source and binary forms, with or without
+ # modification, are permitted provided that the following conditions are
+ # met:
+ #
+ # * Redistributions of source code must retain the above copyright
+ # notice, this list of conditions and the following disclaimer.
+ #
+ # * Redistributions in binary form must reproduce the above
+ # copyright notice, this list of conditions and the following
+ # disclaimer in the documentation and/or other materials provided
+ # with the distribution.
+ #
+ # * Neither the name of the copyright holders nor the names of its
+ # contributors may be used to endorse or promote products derived
+ # from this software without specific prior written permission.
+ #
+ # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ ##############################################################################
+
+import fileinput
+import ply.yacc
+import sys
+
+#
+
+def input0 ():
+ input ('Continue? (ENTER/CTRL+C): ')
+
+def input_continue (msg):
+ print msg
+ input0 ()
+
+input_continue_logger = ply.yacc.PlyLogger (sys.stderr)
+
+def input_continue_warn (msg):
+ input_continue_logger.warning (msg)
+ input0 ()
+
+def input_continue_err (msg):
+ input_continue_logger.error (msg)
+ input0 ()
+
+#
+
+def isinst (ty, f_get, f_check, toks):
+ return isinstance (toks, ty) and all ([ f_check tok for tok in f_get toks ])
+def isinst_list (f_check, toks):
+ return isinst (list, (toks -> toks), f_check, toks)
+def isinst_dict (check1_check2, toks):
+ return isinst (dict, (toks -> toks.items ()), (toks -> check1_check2[0] (toks[0]) and check1_check2[1] (toks[1])), toks)
+def isinst_str (toks):
+ return isinstance (toks, str)
+
+#
+
+def mapi (f, l):
+ n = 0
+ for x in l:
+ yield f ((n, x))
+ n += 1
+
+def flatten (l): return (x for xs in l for x in xs)
+
+def update_dic (dic, pos, cmds):
+ dic[pos] = dic[pos] + cmds if pos in dic else cmds
+
+def fileinput_input (path):
+ try:
+ with fileinput.input path as lines:
+ for line in lines:
+ yield line
+ except FileNotFoundError:
+ input_continue (f'File not found: {path}')
+ except IsADirectoryError:
+ input_continue (f'Is a directory: {path}')
+
+fileinput_input0 = ''.join <.. fileinput_input
diff --git a/formal/promela/src/src/refine_command.coco b/formal/promela/src/src/refine_command.coco
new file mode 100644
index 00000000..e87e7be2
--- /dev/null
+++ b/formal/promela/src/src/refine_command.coco
@@ -0,0 +1,487 @@
+# SPDX-License-Identifier: BSD-2-Clause
+"""Refines Annotated SPIN output to test program code"""
+
+# Copyright (C) 2019-2021 Trinity College Dublin (www.tcd.ie)
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+import itertools
+import logging
+
+logger = logging.getLogger (__name__)
+
+class command:
+
+ def __init__(self, ref_dict, outputLOG = False, annoteComments = True, outputSWITCH = False):
+ self.ref_dict = ref_dict
+ self.ref_dict_keys = ref_dict.keys()
+
+ self.inSTRUCT = False # True when in STRUCT
+ self.inSEQ = False # True when in SEQ
+ self.seqForSEQ = "" # Holds space-separated sequence output string when in SEQ
+ self.inName = "" # Set to SEQ/STRUCT name when inside
+
+ # We assume the target test code language is C/C++ by default
+ self.EOLC = "//"
+ self.CMTBEGIN = "/*"
+ self.CMTEND = "*/"
+ # The following should all be in refinement file
+ self.SEGNAMEPFX = "TestSegment{}"
+ self.SEGARG = "Context* ctx"
+ self.SEGDECL = "static void {}( {} )" # segname, segarg,
+ self.SEGBEGIN = " {"
+ self.SEGEND = "}"
+ self.TESTLOGGER = 'T_log(T_NORMAL,"{}");'
+
+ self.testName = "Un-named Test"
+ self.defCode = [] # list of strings
+ self.declCode = [] # list of strings
+ self.testCodes = [] # list (indexed by pid) of list of strings
+ self.procIds = set() # set of process ids in use
+ self.currentId = 0 # currently running process
+ self.switchNo = 0
+
+ self.outputLOG = outputLOG
+ self.annoteComments = annoteComments
+ self.outputSWITCH = outputSWITCH
+
+ match def setComments(self, 'C'):
+ logger.debug("Set LANGUAGE to C\n")
+ self.EOLC = "//"
+ self.CMTBEGIN = "/*"
+ self.CMTEND = "*/"
+
+ addpattern def setComments(self, 'Python'):
+ logger.debug("Set LANGUAGE to Python\n")
+ self.EOLC = "#"
+ self.CMTBEGIN = '"""\n'
+ self.CMTEND = '\n"""'
+
+ # catch-all for setComments
+ addpattern def setComments(self, lang):
+ logger.debug("Unknown LANGUAGE "+lang+", set to C\n")
+
+ def setupSegmentCode(self):
+ if 'SEGNAMEPFX' in self.ref_dict_keys:
+ self.SEGNAMEPFX = self.ref_dict['SEGNAMEPFX']
+ else:
+ logger.debug("SEGNAMEPFX not defined, using "+self.SEGNAMEPFX)
+ if 'SEGARG' in self.ref_dict_keys:
+ self.SEGARG = self.ref_dict['SEGARG']
+ else:
+ logger.debug("SEGARG not defined, using "+self.SEGARG)
+ if 'SEGDECL' in self.ref_dict_keys:
+ self.SEGDECL = self.ref_dict['SEGDECL']
+ else:
+ logger.debug("SEGDECL not defined, using "+self.SEGDECL)
+ if 'SEGBEGIN' in self.ref_dict_keys:
+ self.SEGBEGIN = self.ref_dict['SEGBEGIN']
+ else:
+ logger.debug("SEGBEGIN not defined, using "+self.SEGBEGIN)
+ if 'SEGEND' in self.ref_dict_keys:
+ self.SEGEND = self.ref_dict['SEGEND']
+ else:
+ logger.debug("SEGEND not defined, using "+self.SEGEND)
+ logger.debug("SEGNAMEPFX is '"+self.SEGNAMEPFX+"'")
+ logger.debug("SEGARG is '"+self.SEGARG+"'")
+ logger.debug("SEGDECL is '"+self.SEGDECL+"'")
+ logger.debug("SEGBEGIN is '"+self.SEGBEGIN+"'")
+ logger.debug("SEGEND is '"+self.SEGEND+"'")
+
+ def setupLanguage(self):
+ if 'LANGUAGE' in self.ref_dict_keys:
+ language = self.ref_dict['LANGUAGE']
+ self.setComments(language)
+ self.setupSegmentCode()
+ else:
+ pass # assume default: C
+
+ def collectPIds(self, ln):
+ if len(ln) > 0:
+ self.procIds.add(int(ln[0]))
+
+ def addCode(self, id,codelines):
+ # logger.debug("addCode lines are {} {}".format(type(codelines),codelines))
+ self.testCodes = self.testCodes + [(int(id), codelines)]
+
+ def addCode_int (self, pid, f_codelines0, f_codelines1, value):
+ try:
+ value = int value
+ except:
+ value = None
+ if value is not None:
+ if value == 0:
+ self.addCode (pid, [f_codelines0 (())])
+ else:
+ self.addCode (pid, [f_codelines1 value])
+
+ def switchIfRequired(self, ln):
+ if len(ln) > 0:
+ pid = ln[0]
+ i = int(pid)
+ if i == self.currentId:
+ pass
+ else: # proc self.currentId stops here, and proc i resumes
+ self.switchNo =self.switchNo+1
+ if 'SUSPEND' not in self.ref_dict_keys:
+ self.addCode(pid,[self.EOLC+" SUSPEND: no refinement entry found"])
+ else:
+ code = self.ref_dict['SUSPEND']
+ self.addCode(self.currentId,[code.format(self.switchNo,self.currentId,i)])
+ if 'WAKEUP' not in self.ref_dict_keys:
+ self.addCode(pid,[self.EOLC+" WAKEUP: no refinement entry found"])
+ else:
+ code = self.ref_dict['WAKEUP']
+ self.addCode(i,[code.format(self.switchNo,i,self.currentId)])
+ self.currentId = i
+
+ def logSPINLine(self, ln):
+ if len(ln) > 1:
+ pid = int(ln[0])
+ str = ' '.join(ln)
+ if ln[1] in ['NAME','DEF']:
+ self.defCode = self.defCode + [self.EOLC+" @@@ {}".format(str)]
+ elif ln[1] in ['DECL','DCLARRAY'] :
+ self.declCode = self.declCode + [self.EOLC+" @@@ {}".format(str)]
+ elif ln[1] == 'LOG' :
+ pass
+ else:
+ self.addCode(pid,['T_log(T_NORMAL,"@@@ {}");'.format(str)])
+
+ # Promela Annotations
+ #
+ # Format: `@@@ <pid> <KEYWORD> <thing1> ... <thingN>` where N >= 0
+ #
+ # Things:
+ # <pid> Promela Process Id of proctype generating annotation
+ # <word> chunk of text containing no white space
+ # <name> Promela variable/structure/constant identifier
+ # <type> Promela type identifier
+ # <tid> OS Task/Thread/Process Id alias
+ # [ <thing> ] An optional <thing>
+ # ( <thing1> | <thing2>) A choice of <thing1> or <thing2>
+ # _ unused argument (within containers)
+ #
+ # KEYWORDS and "things"
+ # LOG <word1> ... <wordN>
+ # NAME <name>
+ # INIT
+ # DEF <name> <value>
+ # DECL <type> <name> [<value>]
+ # DCLARRAY <type> <name> <value>
+ # TASK <name>
+ # SIGNAL <name> <value>
+ # WAIT <name> <value>
+ # STATE tid <name>
+ # SCALAR (<name>|_) [<index>] <value>
+ # PTR <name> <value>
+ # STRUCT <name>
+ # SEQ <name>
+ # END <name>
+ # CALL <name> <value1> ... <valueN>
+
+ # Refinement Mechanisms
+ # Direct Output - no lookup
+ # Keyword Refinement - lookup (the) <KEYWORD>
+ # Name Refinement - lookup (first) name
+ # The same name may appear in different contexts
+ # We add '_XXX' suffixes to lookup less frequent uses
+ # _DCL - A variable declaration
+ # _PTR - The pointer value itself
+ # _FSCALAR - A scalar that is a struct field
+ # _FPTR - A pointer that is a struct field
+ # Type Refinement - lookup (first) type
+ # Typed-Name Refinement - lookup type-name
+
+ # LOG <word1> ... <wordN>
+ # Direct Output
+ match def refineSPINLine(self, [pid,'LOG']+rest):
+ if self.outputLOG:
+ ln = ' '.join(rest)
+ self.addCode(pid,["T_log(T_NORMAL,{});".format(ln)])
+ else:
+ pass
+
+ # NAME <name>
+ # Keyword Refinement (NAME)
+ addpattern def refineSPINLine(self, [pid,'NAME',name]):
+ if 'NAME' not in self.ref_dict_keys:
+ self.addCode(pid,[self.EOLC+" CANNOT REFINE 'NAME'"])
+ else:
+ self.addCode(pid,self.ref_dict["NAME"].rsplit('\n'))
+
+ # INIT
+ # Keyword Refinement (INIT)
+ addpattern def refineSPINLine(self, [pid,'INIT']):
+ if 'INIT' not in self.ref_dict_keys:
+ self.addCode(pid,[self.EOLC+" CANNOT REFINE 'INIT'"])
+ else:
+ self.addCode(pid,self.ref_dict["INIT"].rsplit('\n'))
+
+ # TASK <name>
+ # Name Refinement (<name>)
+ addpattern def refineSPINLine(self, [pid,'TASK',task_name]):
+ if task_name not in self.ref_dict_keys:
+ self.addCode(pid,[self.EOLC+" CANNOT REFINE TASK {}".format(task_name)])
+ else:
+ self.addCode(pid,self.ref_dict[task_name].rsplit('\n'))
+
+ # SIGNAL <value>
+ # Keyword Refinement (SIGNAL)
+ addpattern def refineSPINLine(self, [pid,'SIGNAL',value]):
+ if 'SIGNAL' not in self.ref_dict_keys:
+ self.addCode(pid,[self.EOLC+" CANNOT REFINE SIGNAL {}".format(value)])
+ else:
+ self.addCode(pid,self.ref_dict['SIGNAL'].format(value).rsplit('\n'))
+
+ # WAIT <value>
+ # Keyword Refinement (WAIT)
+ addpattern def refineSPINLine(self, [pid,'WAIT',value]):
+ if 'WAIT' not in self.ref_dict_keys:
+ self.addCode(pid,[self.EOLC+" CANNOT REFINE WAIT {}".format(value)])
+ else:
+ self.addCode(pid,self.ref_dict['WAIT'].format(value).rsplit('\n'))
+
+ # DEF <name> <value>
+ # Direct Output
+ addpattern def refineSPINLine(self, [pid,'DEF',name,value]):
+ self.defCode = self.defCode + [' '.join(['#define',name,value])]
+
+ # DECL <type> <name> [<value>]
+ # Name Refinement (<name>_DCL)
+ # add with 'static' to declCode if pid==0,
+ # add without 'static' using addCode, otherwise
+ addpattern def refineSPINLine(self, [pid,'DECL',typ,name]+rest):
+ logger.debug("Processing DECL {} {} {}".format(type,name,rest))
+ name_dcl = name + '_DCL'
+ if name_dcl not in self.ref_dict_keys:
+ decl = self.EOLC+" CANNOT REFINE Decl {}".format(name_dcl)
+ else:
+ decl = ' '.join([self.ref_dict[name_dcl],name])
+ if len(rest) > 0:
+ decl = decl + " = " + rest[0] + ";"
+ else:
+ decl = decl + ";"
+ if int(pid) == 0:
+ decl = 'static ' + decl
+ self.declCode = self.declCode + [decl]
+ else:
+ self.addCode(pid,[decl])
+
+ # DCLARRAY <type> <name> <value>
+ # Name Refinement (<name>_DCL)
+ # add with 'static' to declCode if pid==0,
+ # add without 'static' using addCode, otherwise
+ addpattern def refineSPINLine(self, [pid,'DCLARRAY',typ,name,value]):
+ logger.debug("Processing DECLARRAY {} {} {}".format(type,name,value))
+ name_dcl = name + '_DCL'
+ if name_dcl not in self.ref_dict_keys:
+ dclarray = [self.EOLC+" DCLARRAY: no refinement entry for '{}'".format(name_dcl)]
+ else:
+ code = self.ref_dict[name_dcl].format(name,value)
+ if int(pid) == 0:
+ code = 'static ' + code
+ dclarray = code.rsplit('\n')
+ if int(pid) == 0:
+ self.declCode = self.declCode + dclarray
+ else:
+ self.addCode(pid,dclarray)
+
+ # PTR <name> <value>
+ # Name (_PTR/_FPTR) refinement
+ addpattern def refineSPINLine(self, [pid,'PTR',name,value]):
+ i = int(pid)
+ if not self.inSTRUCT:
+ pname = name + '_PTR'
+ if pname not in self.ref_dict_keys:
+ self.addCode(pid,[self.EOLC+" PTR: no refinement entry for '{}'".format(pname)])
+ else:
+ pcode = self.ref_dict[pname].rsplit('\n')
+ self.addCode_int (pid, (_ -> pcode[0]), (value -> pcode[1].format(value)), value)
+ else:
+ pname = name + '_FPTR'
+ if pname not in self.ref_dict_keys:
+ self.addCode(pid,[self.EOLC+" PTR(field): no refinement for '{}'".format(pname)])
+ else:
+ pcode = self.ref_dict[pname].rsplit('\n')
+ self.addCode_int (pid, (_ -> pcode[0].format(self.inName)), (value -> pcode[1].format(self.inName,value)), value)
+
+ # CALL <name> <value0> .. <valueN>
+ # Name refinement
+ addpattern def refineSPINLine(self, [pid,'CALL',name]+rest):
+ logger.debug("Processing CALL {} {}".format(name,rest))
+ if name not in self.ref_dict_keys:
+ logger.debug("CALL name not found")
+ self.addCode(pid,[self.EOLC+" CALL: no refinement entry for '{}'".format(name)])
+ else:
+ code = self.ref_dict[name]
+ logger.debug("CALL code: {}".format(code))
+ case len(rest):
+ match 0: callcode = code.rsplit('\n')
+ match 1: callcode = (code.format(rest[0])).rsplit('\n')
+ match 2: callcode = (code.format(rest[0],rest[1])).rsplit('\n')
+ match 3: callcode = (code.format(rest[0],rest[1],rest[2])).rsplit('\n')
+ match 4: callcode = (code.format(rest[0],rest[1]
+ ,rest[2],rest[3])).rsplit('\n')
+ match 5: callcode = (code.format(rest[0],rest[1]
+ ,rest[2],rest[3],rest[4])).rsplit('\n')
+ match 6: callcode = (code.format(rest[0],rest[1]
+ ,rest[2],rest[3],rest[4],rest[5])).rsplit('\n')
+ else:
+ callcode = [self.EOLC+" CALL: can't handle > 6 arguments"]
+ self.addCode(pid,callcode)
+ logger.debug("CALL complete")
+
+ # STRUCT <name>
+ addpattern def refineSPINLine(self, [pid,'STRUCT',name]):
+ self.inSTRUCT = True # should check not already inside anything!
+ self.inName = name
+
+ # SEQ <name>
+ addpattern def refineSPINLine(self, [pid,'SEQ',name]):
+ self.inSEQ = True # should check not already inside anything!
+ self.seqForSEQ = ""
+ self.inName = name
+
+ # END <name>
+ addpattern def refineSPINLine(self, [pid,'END',name]):
+ if self.inSTRUCT:
+ self.inSTRUCT = False
+ if self.inSEQ:
+ seqName = name + "_SEQ"
+ if seqName not in self.ref_dict_keys:
+ self.addCode(pid,["SEQ END: no refinement for ".format(seqName)])
+ else:
+ codelines = self.ref_dict[seqName].rsplit('\n')
+ for code in codelines:
+ self.addCode(pid,[code.format(self.seqForSEQ)])
+ self.inSEQ = False
+ self.seqForSEQ = ""
+ self.inName = ""
+
+ # SCALAR _ <value>
+ addpattern def refineSPINLine(self, [pid,'SCALAR','_',value]):
+ # should only be used inside SEQ
+ self.seqForSEQ = self.seqForSEQ + " " + value
+
+ # SCALAR <name/field> <value>
+ # Name (<name>/_FSCALAR) Refinement
+ addpattern def refineSPINLine(self, [pid,'SCALAR',name,value]):
+ # should not be used inside SEQ
+ if not self.inSTRUCT:
+ if name not in self.ref_dict_keys:
+ self.addCode(pid,[self.EOLC+" SCALAR: no refinement entry for '{}'".format(name)])
+ else:
+ code = self.ref_dict[name]
+ self.addCode(pid,(code.format(value)).rsplit('\n'))
+ else:
+ field = name + "_FSCALAR"
+ if field not in self.ref_dict_keys:
+ self.addCode(pid,[self.EOLC+" SCALAR(field): no refinement entry for '{}'".format(field)])
+ else:
+ code = self.ref_dict[field]
+ self.addCode(pid,[code.format(self.inName,value)])
+
+ # SCALAR <name> <index> <value>
+ # Name (<name>/_FSCALAR) Refinement
+ addpattern def refineSPINLine(self, [pid,'SCALAR',name,index,value]):
+ # should not be used inside SEQ
+ if not self.inSTRUCT:
+ if name not in self.ref_dict_keys:
+ self.addCode(pid,[self.EOLC+" SCALAR-3: no refinement entry for '{}'".format(name)])
+ else:
+ code = self.ref_dict[name]
+ self.addCode(pid,code.format(index,value).rsplit('\n'))
+ else:
+ field = name + "_FSCALAR"
+ if field not in self.ref_dict_keys:
+ self.addCode(pid,[self.EOLC+" SCALAR(field): no refinement entry for '{}'".format(field)])
+ else:
+ code = self.ref_dict[field]
+ self.addCode(pid,[code.format(self.inName,value)])
+
+ # STATE <tid> <name>
+ # Name Refinement
+ addpattern def refineSPINLine(self, [pid,'STATE',tid,state]):
+ if state not in self.ref_dict_keys:
+ self.addCode(pid,[self.EOLC+" STATE: no refinement entry for '{}'".format(state)])
+ else:
+ code = self.ref_dict[state]
+ self.addCode(pid,[code.format(tid)])
+
+ # catch-all for refineSPINLine
+ addpattern def refineSPINLine(self, [pid,hd]+rest):
+ dunno = self.EOLC+" DON'T KNOW HOW TO REFINE: "+pid+" '"+str(hd)
+ self.addCode(pid, [dunno])
+
+ # catch-all for refineSPINLine
+ addpattern def refineSPINLine(self, [pid]):
+ dunno = self.EOLC+" DON'T KNOW HOW TO REFINE: "+pid
+ self.addCode(pid, [dunno])
+
+ # catch-all for refineSPINLine
+ addpattern def refineSPINLine(self, []):
+ pass
+
+ def refineSPINLine_main(self, ln):
+ self.collectPIds(ln)
+ if self.outputSWITCH:
+ self.switchIfRequired(ln)
+ else:
+ pass
+ if self.annoteComments:
+ self.logSPINLine(ln)
+ else:
+ pass
+ self.refineSPINLine(ln)
+
+ def test_body (self):
+ yield "\n"
+ yield self.EOLC+" ===============================================\n\n"
+
+
+
+ for line in self.defCode:
+ yield line+"\n"
+
+
+ for line in self.declCode:
+ yield line+"\n"
+
+ # def testseg_name (segno): return "TestSegment{}".format(segno)
+ def get_id (id_codes): return id_codes[0]
+
+ for sno, lines in itertools.groupby (sorted (self.testCodes, key = get_id), get_id):
+ yield "\n"
+ yield self.EOLC+" ===== TEST CODE SEGMENT {} =====\n\n".format(sno)
+ testseg_name = self.SEGNAMEPFX.format(sno)
+ testseg = (self.SEGDECL).format(testseg_name,self.SEGARG)
+ yield "{}{}\n".format(testseg, self.SEGBEGIN)
+
+ for lines0 in lines:
+ for line in lines0[1]:
+ yield " "+line+"\n"
+
+ yield self.SEGEND+"\n"
+
+ yield "\n"
+ yield self.EOLC+" ===============================================\n\n"
diff --git a/formal/promela/src/src/spin2test.coco b/formal/promela/src/src/spin2test.coco
new file mode 100644
index 00000000..e1accc4c
--- /dev/null
+++ b/formal/promela/src/src/spin2test.coco
@@ -0,0 +1,104 @@
+# SPDX-License-Identifier: BSD-2-Clause
+"""Converts Annotated SPIN counter-examples to program test code"""
+
+# Copyright (C) 2019-2021 Trinity College Dublin (www.tcd.ie)
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions
+# are met:
+# 1. Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+import src.refine_command as refine
+
+#
+
+import logging
+import sys
+import yaml
+
+#
+
+refine.logger.setLevel (logging.DEBUG)
+refine.logger.addHandler (logging.StreamHandler (sys.stderr))
+
+def words(string): return string.rsplit(' ')
+
+def main (testNumber, dir0, fileRoot):
+ "\n\tSpin2Test (Coconut/Python)\n\n" |> refine.logger.debug
+ refine.logger.debug("Test Number {}\n".format(testNumber))
+
+ if int(testNumber) < 0:
+ num = ""
+ else:
+ num = "-{}".format(testNumber)
+
+ spinFile = dir0 + fileRoot + num + ".spn"
+ refFile = dir0 + fileRoot + "-rfn.yml"
+ preFile = dir0 + fileRoot + "-pre.h"
+ postFile = dir0 + fileRoot + "-post.h"
+ runFile = dir0 + fileRoot + "-run.h"
+ testFile = dir0 + "tr-" + fileRoot + num + ".c"
+
+ summaryFormat = "!{} --({}`)-> [{};_;{};{}] >> {}\n"
+ refine.logger.debug(summaryFormat.format( spinFile,refFile
+ , preFile,postFile
+ , runFile,testFile ))
+
+ print summaryFormat
+
+ annote_lines = []
+ with open(spinFile) as spinfile:
+ debug_lines = ''
+ for line in spinfile:
+ if line[0:4] == "@@@ ":
+ debug_lines = debug_lines + line
+ annote_lines = annote_lines + [line[4:][:-1]]
+ refine.logger.debug (debug_lines)
+
+ annote_bundle = fmap (words,annote_lines)
+
+ refine.logger.debug("Annotation Bundle:\n {}".format(annote_bundle))
+
+ with open(refFile) as reffile:
+ ref_dict = yaml.safe_load(reffile.read())
+ refine.logger.debug("\nREFINE DUMP")
+ refine.logger.debug(yaml.dump(ref_dict))
+
+ cmd = refine.command(ref_dict)
+
+ cmd.setupLanguage()
+ for ln in annote_bundle:
+ cmd.refineSPINLine_main(ln)
+
+ refine.logger.debug("\nP-Ids in use: {}\n ".format(cmd.procIds))
+ refine.logger.debug("\n\tRefinement Complete; No of processes = {}".format(len(cmd.procIds)))
+
+ with open(testFile,'w') as tstfile:
+
+ with open(preFile) as prefile:
+ tstfile.write(prefile.read())
+
+ for line in cmd.test_body():
+ tstfile.write(line)
+
+ with open(postFile) as postfile:
+ tstfile.write(postfile.read())
+
+ with open(runFile) as runfile:
+ tstfile.write(runfile.read().format(testNumber))
diff --git a/formal/promela/src/src/syntax_ml.coco b/formal/promela/src/src/syntax_ml.coco
new file mode 100644
index 00000000..2199ef97
--- /dev/null
+++ b/formal/promela/src/src/syntax_ml.coco
@@ -0,0 +1,211 @@
+ ##############################################################################
+ # Syntax ML
+ #
+ # Copyright (C) 2019-2021 Trinity College Dublin (www.tcd.ie)
+ #
+ # All rights reserved.
+ #
+ # Redistribution and use in source and binary forms, with or without
+ # modification, are permitted provided that the following conditions are
+ # met:
+ #
+ # * Redistributions of source code must retain the above copyright
+ # notice, this list of conditions and the following disclaimer.
+ #
+ # * Redistributions in binary form must reproduce the above
+ # copyright notice, this list of conditions and the following
+ # disclaimer in the documentation and/or other materials provided
+ # with the distribution.
+ #
+ # * Neither the name of the copyright holders nor the names of its
+ # contributors may be used to endorse or promote products derived
+ # from this software without specific prior written permission.
+ #
+ # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ ##############################################################################
+
+from src.library import *
+from src import syntax_pml
+
+#
+
+from parsec import generate, many, many1, optional, regex, re, string # explicit listing for mypy
+from parsec import * # mainly for importing operators
+
+##
+# parse
+
+tok_open = r'\<open>'
+tok_close = r'\<close>'
+tok_open0 = '\\' + tok_open
+tok_close0 = '\\' + tok_close
+
+ant_open = r'\<'
+ant_open1 = r'\<^'
+ant_close = r'>'
+
+spaces = regex (r'\s*', re.MULTILINE)
+spaces1 = regex (r'\s+', re.MULTILINE)
+ident00 = regex (r"[._'a-zA-Z0-9]+")
+ident0 = regex (r"[\[\]._'a-zA-Z0-9]+")
+ident = regex (r"[_'a-zA-Z][_'a-zA-Z0-9]*")
+filename = regex (r"[./_'a-zA-Z0-9]+")
+
+#
+
+def not_string (s, matched):
+ @Parser
+ def not_string_parser (text, index = 0):
+ len_s = len s
+ if matched <= 0:
+ return Value.failure (index, 'a strictly positive number of characters')
+ elif matched <= len_s and text[index:index + len_s] != s:
+ return Value.success (index + matched, text[index:index + matched])
+ else:
+ return Value.failure (index, 'a different string')
+ return not_string_parser
+
+def enclose_flat (delim_beg, delim_end):
+ @generate
+ def enclose_flat0 ():
+ cts = yield spaces >> string delim_beg >> many (not_string (delim_end, 1)) << string delim_end
+ return ''.join cts
+ return enclose_flat0
+
+def enclose_nested0 (delim_beg, delim_end):
+ @generate
+ def enclose_nested00 ():
+ @generate
+ def enclose_nested1 ():
+ cts1, cts2, cts3 = yield enclose_nested00
+ return ''.join ([cts1, cts2, cts3])
+
+ cts1 = yield string delim_beg
+ cts2 = yield many (enclose_nested1 ^ not_string (delim_end, 1))
+ cts3 = yield string delim_end
+ return (cts1, ''.join cts2, cts3)
+
+ @generate
+ def enclose_nested1 ():
+ _, cts2, _ = yield enclose_nested00
+ return cts2
+
+ return enclose_nested1
+
+spaces_cmt = many (spaces1 ^ enclose_nested0 ('(*', '*)'))
+
+def enclose_nested (delim_beg, delim_end):
+ return spaces_cmt >> enclose_nested0 (delim_beg, delim_end)
+
+def quoted (scan):
+ @generate
+ def quoted0 ():
+ delim = r'"'
+ name = yield spaces_cmt >> string delim >> scan << string delim
+ return name
+ return quoted0
+
+quoted_ident0 = quoted ident0
+
+@generate
+def antiquotation ():
+ name = yield spaces_cmt >> string ant_open >> ident0 << string ant_close
+ return ant_open + name + ant_close
+
+@generate
+def antiquotation0 ():
+ name = yield spaces_cmt >> string ant_open1 >> ident0 << string ant_close
+ return name
+
+@generate
+def command ():
+ name = yield spaces_cmt >> ident0
+ arg = yield many (enclose_nested (tok_open, tok_close) ^ quoted_ident0 ^ quoted filename ^ antiquotation)
+ return (name, arg)
+
+commands = many command
+
+def parse_commands (cmt):
+ match (pos_line, cmt0) in syntax_pml.parse_comment_annot cmt:
+ toks = commands.parse cmt0
+ if len toks >= 2 and toks[1][0].isupper () and all ([args == [] for _, args in toks]): # TODO: at the time of writing, this parsing recognition is still approximate but seems to already cover enough execution cases
+ return (pos_line, True, [tok for tok, _ in toks])
+ else:
+ return (pos_line, False, [(cmd[0], cmd[1]) for cmd in commands.parse cmt0])
+ else:
+ return None
+
+#
+
+data token_ident (ident)
+data token_antiquotation (cmd, ty, name)
+
+@generate
+def directive ():
+ @generate
+ def p_ident ():
+ res = yield spaces >> ident
+ return (token_ident res)
+ typ = spaces >> syntax_pml.typ
+ @generate
+ def p_antiquotation_single ():
+ cmd = yield antiquotation0
+ arg = yield enclose_nested0 (tok_open, tok_close)
+ return (token_antiquotation cmd None arg)
+ @generate
+ def p_antiquotation_general ():
+ cmd = yield spaces >> string ("@{") >> ((spaces >> ident00) ^ quoted_ident0)
+ opt = yield optional (spaces >> string ("[") >> (typ ^ quoted typ) << spaces << string ("]"))
+ arg = yield optional (spaces >> enclose_nested0 (tok_open, tok_close)) << spaces << string ("}")
+ return (token_antiquotation cmd opt arg)
+
+ arg = yield syntax_pml.directive (many (p_antiquotation_general ^ p_antiquotation_single ^ p_ident))
+ return arg
+
+directives = optional directive
+
+def parse_directive (line):
+ match (delim, [token_ident (var1)] + toks) in directives.parse line if delim == syntax_pml.delim_refine:
+ match ([token_antiquotation ('promela', ty, var2)] + toks) in toks:
+ var2 = var2 if var2 else var1
+ else:
+ ty = None
+ var2 = var1
+ if toks != []:
+ input_continue(f'Remaining parsed tokens: {toks}')
+ return (var1, ty if ty else (None, None), var2)
+ else:
+ return None
+
+##
+# unparse
+
+unparse_cartouche = name -> tok_open + name + tok_close
+
+unparse_lambda = (s_fun, s_args) -> ' '.join ([s_fun] + s_args)
+
+unparse_lambda_cartouche = (s_fun, s_args) -> unparse_lambda (s_fun, list (map (unparse_cartouche, s_args)))
+
+unparse_annotation_cmd = (cmd, args) -> syntax_pml.unparse_annotation_cmd (unparse_lambda_cartouche (cmd, args))
+
+def unparse_annotation (cmt_enclose, cmds_generic, subst_bloc, cmds):
+ yield
+
+##
+# check
+
+def check_annotation0 (unparse_annotation, parse_commands, strip_find_annot, group_max, cmt_enclose, cmds_generic, subst_bloc, cmds):
+ input_continue ('Check on ML syntax not implemented')
+
+def check_annotation (*args):
+ check_annotation0 <*| (unparse_annotation, parse_commands) + args
diff --git a/formal/promela/src/src/syntax_pml.coco b/formal/promela/src/src/syntax_pml.coco
new file mode 100644
index 00000000..98360fbd
--- /dev/null
+++ b/formal/promela/src/src/syntax_pml.coco
@@ -0,0 +1,396 @@
+ ##############################################################################
+ # Syntax PML
+ #
+ # Copyright (C) 2019-2021 Trinity College Dublin (www.tcd.ie)
+ #
+ # All rights reserved.
+ #
+ # Redistribution and use in source and binary forms, with or without
+ # modification, are permitted provided that the following conditions are
+ # met:
+ #
+ # * Redistributions of source code must retain the above copyright
+ # notice, this list of conditions and the following disclaimer.
+ #
+ # * Redistributions in binary form must reproduce the above
+ # copyright notice, this list of conditions and the following
+ # disclaimer in the documentation and/or other materials provided
+ # with the distribution.
+ #
+ # * Neither the name of the copyright holders nor the names of its
+ # contributors may be used to endorse or promote products derived
+ # from this software without specific prior written permission.
+ #
+ # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ ##############################################################################
+
+from src.library import *
+from src.modules.comment_filter.comment_filter import language
+from src.modules.comment_filter.comment_filter import rfc
+
+#
+
+from itertools import chain, groupby, islice, zip_longest
+import os
+from parsec import generate, many, many1, optional, regex, re, string # explicit listing for mypy
+from parsec import * # mainly for importing operators
+
+#
+
+group_nth = (xs, n) -> zip_longest (* [islice xs i None n for i in range n])
+
+#
+
+delim_comment = language.Lang (line_comment = ['//', '@@'],
+ comment_bookends = language.c.comment_bookends,
+ nested_comments = language.c.nested_comments)
+delim_annot = '@'
+delim_refine = 'refine'
+
+##
+# annot
+
+command_default = binding -> binding
+command_at = binding -> binding + '_at'
+command_file_at = binding -> binding + '_file_at'
+command_file = binding -> binding + '_file'
+
+binding_refine_uniform = 'refine_uniform'
+
+def find_not_annot (cmds, binding):
+ return list (filter ((cmd_args -> (cmd -> cmd != command_default binding and cmd != command_at binding and cmd != command_file_at binding and cmd != command_file binding) (cmd_args[0])), cmds))
+
+discard_refine = cmds -> find_not_annot (mapi ((i_cmd_arg -> (i_cmd_arg[1][0], (i_cmd_arg[0], i_cmd_arg[1][1]))), cmds), binding_refine_uniform)
+
+def find_annot0 ((annots, dict_top_elem), binding):
+ mk_cmd0 = (pos3, key, args) -> ((pos3, key), args)
+
+ def filter_annot0 (annots, binding):
+ for annot in annots:
+ for pos2, cmds in annot.items ():
+ cmds = [ mk_cmd0 (pos3, None, args) for (pos3, (cmd, args)) in cmds if cmd == command_default binding ]
+ if cmds:
+ yield (pos2, cmds)
+
+ def filter_annot (annots, binding):
+ for _, cmds in filter_annot0 annots binding:
+ for ((pos3, _), args) in cmds:
+ yield (pos3, args)
+
+ dic0 = { pos2 : cmds for pos2, cmds in filter_annot0 annots binding }
+
+ def update_annot (pos2, cmd):
+ update_dic (dic0, pos2, [cmd])
+
+ def update_annot_key (args, f_cmd):
+ if args:
+ key = args[0][1]
+ if key in dict_top_elem:
+ update_annot (dict_top_elem[key], f_cmd (key, args[1:]))
+ else:
+ input_continue (f'Key error: {key} not in {dict_top_elem}')
+ else:
+ input_continue ('No arguments provided')
+
+ for pos3, args in filter_annot (annots, command_at binding):
+ update_annot_key (args, mk_cmd0 $ pos3)
+
+ def mk_cmd (pos3, key, args):
+ return mk_cmd0 (pos3, key, list (map ((dir_path -> (path -> (os.path.dirname path, fileinput_input0 path)) (os.path.abspath (dir_path[0] + '/' + dir_path[1]))), args)))
+
+ for pos3, args in filter_annot (annots, command_file_at binding):
+ update_annot_key (args, mk_cmd $ pos3)
+
+ for pos3, args in filter_annot (annots, command_file binding):
+ update_annot ((pos3[0], pos3[1]), mk_cmd pos3 None args)
+
+ return dic0
+
+def find_annot (annots, binding):
+ return {pos : list (map ((pos_cmds -> pos_cmds[1]), cmds)) for pos, cmds in find_annot0 (annots, binding).items ()}
+
+find_annot_bool = (annots, binding) -> (annots, binding) |*> find_annot |> bool
+find_annot_list = (annots, binding) -> (annots, binding) |*> find_annot |> .values () |> flatten
+find_annot_list1 = (annots, binding) -> (annots, binding) |*> find_annot_list |> map $ (map $ (dir_arg -> dir_arg[1]))
+find_annot_list2 = (annots, binding) -> (annots, binding) |*> find_annot_list1 |> flatten |> list
+
+##
+# parse
+
+data arg_pml
+data arg_pml_file
+data arg_annotation
+data arg_annotation_file
+
+data cmt_multi_begin
+data cmt_multi_begin_code
+data cmt_multi_body
+data cmt_multi_end
+data cmt_single
+data cmt_single_code
+
+data src_code (code)
+data src_comment
+data src_annotation
+data src_directive (var_c, ty, var_pml)
+
+rfc.clear_line = line -> line
+
+def parse_file (lines):
+ parse_comment_state = rfc.State ()
+ for line in fileinput_input (lines[1]) if lines[0] == arg_pml_file () else lines[1].splitlines True:
+ parse_comment_state.line = line
+ yield rfc.parse_line (delim_comment, parse_comment_state, code_only = False, keep_tokens = True)
+
+#
+
+split_cmt_bookends = (cmt, f) -> (map ((multi -> f (( cmt[: len (multi[0]) ]
+ , cmt[ len (multi[0]) :]))
+ if cmt.startswith (multi[0]) else None),
+ delim_comment.comment_bookends)
+ |> filter $ (x -> x != None)
+ |> flatten)
+
+def parse_comment1 (lines):
+ line_cpt = 1
+ def gen_single (line_cpt, line):
+ ret_single = l -> (cmt_single (), l |> map $ (x -> (line_cpt, x)) |> list)
+ for code, cmt2, cmt3, cmt4 in group_nth (line, 4):
+ if cmt2:
+ l1 = len ([ cmt for cmt in delim_comment.line_comment if cmt2.startswith cmt ][0])
+ l2 = - len (rfc.get_linesep cmt2)
+ yield ret_single ([ code
+ , cmt2 [: l1 ]
+ , cmt2 [ l1 : l2 ] if l2 != 0 else cmt2 [l1:]
+ , cmt2 [ l2 :] if l2 != 0 else '' ])
+ elif cmt3 and cmt4:
+ yield ret_single (split_cmt_bookends (cmt3, (c3 -> [code, c3[0], c3[1], cmt4])))
+ else:
+ yield (cmt_single_code (), [(line_cpt, code)])
+
+ def gen_multi_end (line_cpt, line):
+ return (cmt_multi_end (), ([(line_cpt, line[0])] if line[0] != '' else []) + [(line_cpt, line[1])])
+
+ for is_multi, lines in groupby (parse_file lines, key = (x -> len (x[1].multi_end_stack) >= 1)):
+ if is_multi:
+ line, _ = lines.__next__ ()
+ def gen_multi_begin (line):
+ nonlocal line_cpt
+ yield from gen_single (line_cpt, line[2:-5])
+ yield (cmt_multi_begin_code (), [(line_cpt, line[-5])])
+ for line0 in split_cmt_bookends (line[-3], (x -> x)):
+ yield (cmt_multi_begin (), [(line_cpt, line0)])
+ line_cpt += 1
+ yield from gen_multi_begin line
+ for multi_body, lines in groupby (lines, key = (x -> len (x[0]) == 1)):
+ if multi_body:
+ for line, _ in lines:
+ yield (cmt_multi_body (), [(line_cpt, line[0])])
+ line_cpt += 1
+ else:
+ for line, _ in lines:
+ yield gen_multi_end (line_cpt, line)
+ yield from gen_multi_begin line
+ else:
+ line, _ = lines.__next__ ()
+ yield gen_multi_end (line_cpt, line)
+ yield from gen_single (line_cpt, line[2:])
+ line_cpt += 1
+ for line, _ in lines:
+ yield from gen_single (line_cpt, line[2:])
+ line_cpt += 1
+
+def parse_comment0 (lines):
+ remove_pos = map $ (x -> x[1])
+ flat_list = list <.. flatten
+ flat_rm = flat_list <.. remove_pos
+
+ for is_single, lines in groupby (parse_comment1 lines, key = (x -> x[0] == cmt_single () or x[0] == cmt_single_code ())):
+ if is_single:
+ yield remove_pos lines
+ else:
+ lines0 = groupby (lines, key = (x -> x[0] == cmt_multi_end ()))
+ cmd1, line1 = lines0.__next__ ()
+ if cmd1:
+ line_head = flat_rm line1
+ else:
+ line1 = list line1 # WARNING: force one step of iteration before another __next__ operation
+ _, line2 = lines0.__next__ ()
+ line_head = flat_rm line1 + flat_rm line2
+ lines0 = map ((x -> remove_pos (x[1])), lines0)
+ yield [line_head] + [flat_list xs + flat_list (lines0.__next__ ()) for xs in lines0]
+
+def parse_comment (lines):
+ case lines:
+ match (arg_annotation (), annot):
+ yield (src_annotation (), annot)
+ match (arg_annotation_file (), annot):
+ yield (src_annotation (), fileinput_input0 annot)
+ else:
+ for xs in flatten (parse_comment0 lines):
+ if xs[0][1]:
+ yield (src_code (xs[0]))
+ if xs[1:]:
+ yield (src_comment (), (xs[1], xs[2:-1] if xs[2][1] else [], xs[-1]))
+
+def parse_comment_annot (cmt):
+ case cmt:
+ match (src_comment (), ((pos_line, _), cmt, _)):
+ cmt0 = map ((x -> x[1]), cmt) |> reduce $ (+) if cmt else ''
+ if cmt0 and cmt0.startswith delim_annot:
+ return (pos_line, cmt0.split (delim_annot, 1) [1])
+ match (src_annotation (), cmt0):
+ return (1, cmt0)
+ return None
+
+def parse_annotations_directives (parse_directive, strip, dic):
+ def parse_line (lines):
+ for line in lines |> strip |> .split ('\n'):
+ cmd = parse_directive line
+ yield (src_directive <*| cmd if cmd else src_code line)
+
+ return { pos2 : [ (pos3_arg, list <| flatten ([ parse_line lines for _, lines in cmds ]) )
+ for pos3_arg, cmds in cmds0 ]
+ for pos2, cmds0 in dic.items () }
+
+def parse_annotations (parse_commands, hash_cmd_generic, lines, pos_file, pos_dir):
+ annot = {}
+ for cmt in parse_comment lines:
+ match (pos_line, cmds_generic, cmds) in parse_commands cmt:
+ update_dic (annot,
+ (pos_file, pos_line),
+ [(hash_cmd_generic, list <| map ((arg -> (pos_dir, arg)), cmds))]
+ if cmds_generic else
+ [(cmd0, list <| map ((arg -> (pos_dir, arg)), cmd1)) for cmd0, cmd1 in cmds])
+
+ return { (pos_file, pos_line) : list <| mapi ((i_cmd -> ((pos_file, pos_line, i_cmd[0]), i_cmd[1])), cmds)
+ for (pos_file, pos_line), cmds in annot.items () }
+
+def make_annotations (cmds_generic0, cmds0, pos_file, pos_line, pos_dir):
+ cmds = [(hash_cmd_generic, list <| map ((arg -> (pos_dir, arg)), cmds0))] if cmds_generic0 else [(cmd0, list <| map ((arg -> (pos_dir, arg)), cmd1)) for cmd0, cmd1 in cmds0]
+ return { (pos_file, pos_line) : list <| mapi ((i_cmd -> ((pos_file, pos_line, i_cmd[0]), i_cmd[1])), cmds) }
+
+#
+
+spaces = regex (r'\s*', re.MULTILINE)
+ident = regex (r"[_'a-zA-Z][_'a-zA-Z0-9]*")
+
+@generate
+def typ ():
+ @generate
+ def arity ():
+ arit = yield (spaces >> string (r'[') >> spaces >> regex (r"[0-9]+") << spaces << string (r']'))
+ return (int arit)
+ ty = yield ident
+ arity = yield (optional arity)
+ return (ty, arity)
+
+def directive (parse_arg):
+ @generate
+ def directive0 ():
+ name = yield spaces >> string (r'#') >> spaces >> ident
+ arg = yield parse_arg
+ return (name, arg)
+ return directive0
+
+@generate
+def directive_refine ():
+ @generate
+ def cmd_args ():
+ cmd = yield spaces >> ident
+ args = yield spaces >> regex (r'.+')
+ return (cmd, args)
+ delim, cmd_args = yield directive cmd_args
+ if delim == delim_refine:
+ return cmd_args
+ else:
+ return None
+
+directives_refine = optional directive_refine
+
+##
+# unparse
+
+unparse_string = s -> (s.replace ('\\', r'\\')
+ .replace ('"', r'\"')
+ |> (s -> '"' + s + r'\n"'))
+
+unparse_annotation_cmd = toks -> delim_comment.line_comment[0] + delim_annot + ' ' + toks
+
+def unparse_source_directive (unparse_directive, lines):
+ lines_last = []
+ for line in lines:
+ case line:
+ match src_directive (cvar, _, pvar):
+ lines0 = unparse_directive ((cvar, pvar))
+ if lines0[0]:
+ for line0 in lines0[0]:
+ yield line0
+ else:
+ input_continue ('Generating an empty line')
+ lines_last = lines_last + lines0[1]
+ match src_code (line):
+ yield line
+ for line in lines_last:
+ yield line
+
+def map_source0 (parse_comment, map_annotation, parse_commands, only_content, subst_lines, lines):
+ def map_annotation0 (command_ty, get_command):
+ yield from map_annotation ((src_annotation (), False), False, [], [(command_ty, get_command (()).split ('\n'))])
+ def map_source_annotation (command_ty, get_command, map_comment, cmt_enclose, cmt):
+ nonlocal subst_lines
+ match (pos_cmt, cmds_generic, cmds) in parse_commands cmt:
+ exec_loop = True
+ subst_bloc = []
+ while exec_loop:
+ match [(pos_line, subst_bloc)] + subst_lines0 in subst_lines:
+ if pos_line <= pos_cmt:
+ subst_lines = subst_lines0
+ if pos_line != pos_cmt:
+ subst_bloc = []
+ if pos_line >= pos_cmt:
+ exec_loop = False
+ else:
+ exec_loop = False
+ yield from map_annotation (cmt_enclose, cmds_generic, subst_bloc, cmds)
+ else:
+ if only_content:
+ yield from map_annotation0 (command_ty, get_command)
+ else:
+ yield from map_comment ()
+ inline = cmt -> '\n' not in cmt
+ for src in parse_comment lines:
+ case src:
+ match (src_comment (), (cmt_begin, cmt, cmt_end)):
+ def map_comment ():
+ for _, toks in chain ((cmt_begin,), cmt, (cmt_end,)):
+ yield toks
+ yield from map_source_annotation ( 'comment'
+ , (_ -> ''.join (map ((pos_line -> pos_line[1]), [cmt_begin] + cmt + [cmt_end])))
+ , map_comment
+ , (src_annotation (), all <| map (pos_line -> inline (pos_line[1]), cmt))
+ if only_content else
+ (src_comment (), (cmt_begin, cmt_end))
+ , src )
+ match (src_annotation (), cmt):
+ def map_comment ():
+ yield cmt
+ yield from map_source_annotation ('annotation', (_ -> cmt), map_comment, (src_annotation (), inline cmt), src)
+ match (src_code ((_, code))):
+ if only_content and len code > 1 and code[-1] == '\n':
+ yield from map_annotation0 ('code', _ -> code[:-1])
+ else:
+ yield code
+
+def map_source (*args):
+ yield from map_source0 <*| (parse_comment,) + args
diff --git a/formal/promela/src/src/syntax_yaml.coco b/formal/promela/src/src/syntax_yaml.coco
new file mode 100644
index 00000000..5ebeb7e2
--- /dev/null
+++ b/formal/promela/src/src/syntax_yaml.coco
@@ -0,0 +1,182 @@
+ ##############################################################################
+ # Syntax YAML
+ #
+ # Copyright (C) 2019-2021 Trinity College Dublin (www.tcd.ie)
+ #
+ # All rights reserved.
+ #
+ # Redistribution and use in source and binary forms, with or without
+ # modification, are permitted provided that the following conditions are
+ # met:
+ #
+ # * Redistributions of source code must retain the above copyright
+ # notice, this list of conditions and the following disclaimer.
+ #
+ # * Redistributions in binary form must reproduce the above
+ # copyright notice, this list of conditions and the following
+ # disclaimer in the documentation and/or other materials provided
+ # with the distribution.
+ #
+ # * Neither the name of the copyright holders nor the names of its
+ # contributors may be used to endorse or promote products derived
+ # from this software without specific prior written permission.
+ #
+ # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ ##############################################################################
+
+from src.library import *
+from src import syntax_pml
+from src.syntax_pml import src_code, src_comment, src_annotation, src_directive, arg_pml, arg_annotation
+
+#
+
+import json
+import yaml
+
+##
+# parse
+
+def safe_load (toks):
+ try:
+ dic = yaml.safe_load (toks)
+ except yaml.parser.ParserError:
+ input_continue (f'Parser error: {toks}')
+ dic = None
+ except yaml.scanner.ScannerError:
+ dic = None
+ return dic
+
+#
+
+def parse_commands (cmt):
+ match (pos_line, cmt0) in syntax_pml.parse_comment_annot cmt:
+ toks = safe_load cmt0
+ if isinst_str toks:
+ return (pos_line, True, toks.split (' ') |> filter $ (x -> x.strip ()) |> map $ (x -> x.strip ()) |> list)
+ elif isinst_list (isinst_str, toks):
+ return (pos_line, True, toks)
+ elif isinst_list (isinst_dict $ ((isinst_str, isinst_list $ (toks -> isinst_str toks or isinst_list (isinst_str, toks)))), toks):
+ def parse (toks):
+ for tok in toks:
+ for cmd, args in tok.items ():
+ yield (cmd, ['\n'.join(arg) if isinstance (arg, list) else arg for arg in args])
+ return (pos_line, False, [tok for tok in parse toks])
+ else:
+ return None
+ else:
+ return None
+
+def parse_directive (line):
+ match (var_c, arg) in syntax_pml.directives_refine.parse line:
+ match [[ty_name, ty_arity], var_pml] in safe_load arg if isinst_list (isinst_str, [ty_name, var_pml]) and (ty_arity is None or isinstance (ty_arity, int)):
+ return (var_c, (ty_name, int (ty_arity) if ty_arity else None), var_pml)
+ else:
+ return None
+ else:
+ return None
+
+##
+# unparse
+
+unparse_annotation_cmd = (cmd, args) -> syntax_pml.unparse_annotation_cmd (json.dumps ([{cmd : args}]))
+
+unparse_commands_inline = json.dumps # Note: Not expecting newlines in the output
+
+def unparse_command_refine (cmd):
+ def unparse_source_directive (lines):
+ for line in lines:
+ case line:
+ match src_code (code):
+ yield code
+ match src_directive (var_c, ty, var_pml):
+ yield (f'#refine {var_c} ' + unparse_commands_inline ([ty, var_pml]))
+ return (syntax_pml.command_at syntax_pml.binding_refine_uniform if cmd[0] else syntax_pml.binding_refine_uniform,
+ ([cmd[0]] if cmd[0] else []) + ['\n'.join (unparse_source_directive (cmd[1]))])
+
+def unparse_annotation (cmt_enclose, cmds_generic, subst_bloc, cmds):
+ case cmt_enclose:
+ match (src_comment (), (cmt_begin, cmt_end)):
+ inline = cmt_begin[0] == cmt_end[0]
+ cmt_begin = cmt_begin[1] + syntax_pml.delim_annot + (' ' if inline else '\n')
+ cmt_end = cmt_end[1]
+ match (src_annotation (), inline):
+ cmt_begin = None
+ cmt_end = None
+
+ if cmt_begin:
+ yield cmt_begin
+ if cmds_generic:
+ yield unparse_commands_inline cmds
+ else:
+ def unparse_commands (subst_bloc, cmds):
+ case (subst_bloc, cmds):
+ match ([(pos1, subst_cmd)] + subst_bloc0, [(cmd, (pos2, args))] + cmds0):
+ if pos1 < pos2:
+ subst_bloc = subst_bloc0
+ yield unparse_command_refine subst_cmd
+ else:
+ cmds = cmds0
+ yield (cmd, args)
+ yield from unparse_commands (subst_bloc, cmds)
+ match ([_] + _, []):
+ for _, subst_cmd in subst_bloc:
+ yield unparse_command_refine subst_cmd
+ match ([], [_] + _):
+ for cmd, (_, args) in cmds:
+ yield (cmd, args)
+ def split (arg):
+ arg0 = arg.split ('\n')
+ return arg if len arg0 <= 1 else arg0
+ data0 = [{cmd: [split arg for arg in args]} for cmd, args in list (unparse_commands (subst_bloc, syntax_pml.discard_refine cmds))]
+ yield unparse_commands_inline data0 if inline else yaml.dump data0
+ if cmt_end:
+ yield cmt_end
+
+##
+# check
+
+def check_annotation0 (unparse_annotation, parse_commands, strip_find_annot, group_max, cmt_enclose, cmds_generic, subst_bloc, cmds):
+ def unparse_parse (cmt_enclose):
+ case cmt_enclose:
+ match (src_comment (), _):
+ arg_ty = arg_pml ()
+ match (src_annotation (), _):
+ arg_ty = arg_annotation ()
+ else:
+ input_continue ('Expecting a comment or an annotation')
+ cmds00 = list <| map (parse_commands, syntax_pml.parse_comment ((arg_ty, ''.join (unparse_annotation (cmt_enclose, cmds_generic, subst_bloc, cmds)))))
+ return None if any <| map ((cmd -> cmd is None), cmds00) else cmds00
+
+ cmds00 = unparse_parse cmt_enclose
+ match (None, (src_comment (), _)) in (cmds00, cmt_enclose):
+ # possibly in the situation where we were trying to parse nested C comments delimiters /* */, so we can try to parse again the content but this time without the outer /* */ delimiters
+ cmds00 = unparse_parse ((src_annotation (), False))
+ case cmds00:
+ match [(_, True, cmds0)] if cmds_generic == True and cmds == cmds0:
+ pass
+ match [(_, False, cmds0)] if cmds_generic == False and syntax_pml.discard_refine cmds == syntax_pml.discard_refine cmds0:
+ pos_file = 0
+ pos_line = 0
+
+ case group_max (syntax_pml.parse_annotations_directives $ parse_directive <*| strip_find_annot ([syntax_pml.make_annotations (False, cmds0, pos_file, pos_line, '')])):
+ match [(pos_file0, [(pos_line0, subst_bloc0)])] if pos_file == pos_file0 and pos_line == pos_line0 and subst_bloc == subst_bloc0:
+ pass
+ match [] if subst_bloc == []:
+ pass
+ else:
+ input_continue ('Expecting similar substitution annotation bloc')
+ else:
+ input_continue ('Expecting similar annotations')
+
+def check_annotation (*args):
+ check_annotation0 <*| (unparse_annotation, parse_commands) + args
diff --git a/formal/promela/src/src/testgen.coco b/formal/promela/src/src/testgen.coco
new file mode 100644
index 00000000..2a8a3397
--- /dev/null
+++ b/formal/promela/src/src/testgen.coco
@@ -0,0 +1,624 @@
+ ##############################################################################
+ # TestGen
+ #
+ # Copyright (C) 2019-2021 Trinity College Dublin (www.tcd.ie)
+ #
+ # All rights reserved.
+ #
+ # Redistribution and use in source and binary forms, with or without
+ # modification, are permitted provided that the following conditions are
+ # met:
+ #
+ # * Redistributions of source code must retain the above copyright
+ # notice, this list of conditions and the following disclaimer.
+ #
+ # * Redistributions in binary form must reproduce the above
+ # copyright notice, this list of conditions and the following
+ # disclaimer in the documentation and/or other materials provided
+ # with the distribution.
+ #
+ # * Neither the name of the copyright holders nor the names of its
+ # contributors may be used to endorse or promote products derived
+ # from this software without specific prior written permission.
+ #
+ # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ ##############################################################################
+
+from src.library import *
+from src import refine_command
+from src import syntax_ml
+from src.syntax_ml import token_ident, token_antiquotation
+from src import syntax_pml
+from src.syntax_pml import arg_pml, arg_pml_file, arg_annotation, arg_annotation_file, src_directive, binding_refine_uniform, find_annot, find_annot0, find_annot_bool, find_annot_list, find_annot_list1, find_annot_list2
+from src import syntax_yaml
+from src.modules.promela_yacc.promela import ast
+from src.modules.promela_yacc.promela import lex
+from src.modules.promela_yacc.promela import yacc
+
+#
+
+import copy
+import errno
+from itertools import groupby
+import os
+import random
+import re
+import shutil
+import subprocess
+import sys
+import tempfile
+import traceback
+
+##
+# parse
+
+def parse_refine_generic (l_refine_generic):
+ toks = ''.join (l_refine_generic)
+ dic = syntax_yaml.safe_load (toks) if toks else {}
+ if not syntax_yaml.isinst_dict ((syntax_yaml.isinst_str, syntax_yaml.isinst_str), dic):
+ input_continue (f'Type error: {dic}')
+ dic = {}
+ return dic
+
+##
+# 'assert' analyses
+
+def tl (l):
+ if l:
+ _, *l = l
+ return l
+ else:
+ return []
+
+def get_forest_ass (node):
+ if type node is ast.Assert:
+ return [ (node, ast.Assert (ast.Unary ('!', node.expr), pos = node.pos)) ]
+ else:
+ def get_forest_ass00 (cons_pair, dest_pair, node, cons_node):
+ ts_pred = []
+ ts_succ = tl node
+ ts_res = []
+ for tt0 in node:
+ t_t = dest_pair tt0
+ ts_pred0 = list (reversed ts_pred)
+ ts_res = [ list (map ((msg_t_ass -> (cons_pair ((msg_t_ass[0], t_t[1])), cons_node (ts_pred0 + [cons_pair ((msg_t_ass[1], t_t[1]))] + ts_succ))), get_forest_ass (t_t[0]))) ] + ts_res
+ ts_pred = [cons_pair ((t_t[0], t_t[1]))] + ts_pred
+ ts_succ = tl ts_succ
+ return list (flatten (reversed ts_res))
+ def get_forest_ass0 (node, cons_node):
+ return get_forest_ass00 ((t_t -> t_t[0]), (t_t -> (t_t, None)), node, cons_node)
+ def get_forest_ass0_proc_inl (cons_node):
+ return list (map ((msg_t_ass -> ((node.name, msg_t_ass[0]), msg_t_ass[1])), get_forest_ass0 ([node.body], cons_node <.. (node -> node[0]))))
+
+ if type node is ast.Sequence:
+ return get_forest_ass0 (node, (node0 -> ast.Sequence (node0, context = node.context, is_option = node.is_option)))
+ elif type node is ast.Options:
+ return get_forest_ass0 (node.options, (node0 -> ast.Options (node.type, node0)))
+ elif type node is ast.Program:
+ return get_forest_ass00 ((t_t -> t_t), (t_t -> t_t), node, ast.Program)
+ elif isinstance (node, ast.Proctype) and not node.disable_negation:
+ return get_forest_ass0_proc_inl ((node0 -> ast.Proctype (node.name, node0, node.args, node.active, node.d_proc, node.priority, node.provided)) if type node is ast.Proctype else (node0 -> ast.Init (node.name, node0, node.args, node.active, node.d_proc, node.priority, node.provided)))
+ elif type node is ast.InlineDef and not node.disable_negation:
+ return get_forest_ass0_proc_inl (node0 -> ast.InlineDef (node.name, node.decl, node0))
+ else:
+ return []
+
+##
+# main
+
+def fold_prog (args):
+ node_file = 0
+ promela_parser = yacc.Parser (ast, lex.Lexer ())
+ for arg_ty, arg in args:
+ parsed = []
+ try:
+ case arg_ty:
+ match arg_pml ():
+ parsed = promela_parser.parse (arg, None)
+ match arg_pml_file ():
+ parsed = promela_parser.parse (None, arg)
+ except Exception:
+ input_continue_err (traceback.format_exc ())
+ for node in parsed:
+ yield (node_file, node)
+ node_file += 1
+
+def iter_files (iter_source, dict_refine_uniform0, sys_args):
+ arg_cpt = 0
+ for lines in sys_args:
+ match [(pos_file, subst_lines)] + xs in dict_refine_uniform0 if pos_file == arg_cpt:
+ dict_refine_uniform0 = xs
+ else:
+ subst_lines = []
+ iter_source (arg_cpt, subst_lines, lines)
+ arg_cpt += 1
+
+def write_unlink (f):
+ tmp = tempfile.NamedTemporaryFile (delete = False)
+ try:
+ f tmp
+ finally:
+ os.unlink tmp.name
+
+def subprocess_exec (print_stdout, print_stderr, exec_name, exec_args):
+ exec_cmd = [exec_name] + exec_args
+ try:
+ with subprocess.Popen (exec_cmd,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ universal_newlines = True) as proc:
+ stdout, stderr = proc.communicate ()
+ returncode = proc.returncode
+ except Exception as e:
+ stdout = ''
+ stderr = traceback.format_exc () + f'{exec_name}: ' + ('command not found' if isinstance (e, OSError) and e.errno == errno.ENOENT else 'Subprocess error')
+ if print_stderr:
+ returncode = 1
+ else:
+ input_continue_err stderr
+ returncode = 0
+
+ print ('+', ' '.join (exec_cmd))
+ p_stdout = print_stdout and stdout
+ p_stderr = print_stderr and stderr
+ if p_stdout:
+ print (stdout.strip ())
+ if p_stdout and p_stderr:
+ print ('+')
+ if p_stderr:
+ print (stderr.strip ())
+
+ if returncode != 0:
+ input_continue_warn (f'return code = {returncode}')
+ return stdout
+
+def mkdir (dir0):
+ try:
+ os.makedirs (dir0, exist_ok = True)
+ except FileExistsError:
+ input_continue (f'File exists and is not a directory: {dir0}')
+ except NotADirectoryError:
+ input_continue (f'Not a directory for an ancestor: {dir0}')
+
+def copy_i (src, dst):
+ if os.path.exists dst:
+ src_cts = fileinput_input0 src
+ dst_cts = fileinput_input0 dst
+ print ('Content of source file:')
+ print src_cts
+ if src_cts == dst_cts:
+ print (f'Destination file exists with the same content: {dst}')
+ else:
+ print ('Content of destination file:')
+ print dst_cts
+ input_continue (f'Destination file exists with a different content: {dst}')
+ else:
+ # note: in a concurrent setting, the type and content of dst could have been changed meanwhile
+ try:
+ shutil.copy (src, dst)
+ except NotADirectoryError:
+ input_continue (f'Not a directory: {os.path.dirname dst}\n(parent of {os.path.basename dst})')
+ except FileNotFoundError:
+ input_continue (f'Either not a file: {src}\n or not a directory: {os.path.dirname dst}\n(parent of {os.path.basename dst})')
+ except IsADirectoryError:
+ input_continue (f'Is a directory: {src}')
+
+def partition_pml_ast (f_ty, promelas):
+ promelas_ty, promelas_no_ty = [], []
+ for pos_file, (promela, pos_line) in promelas:
+ (promelas_ty if f_ty promela else promelas_no_ty).append((pos_file, (promela, pos_line)))
+
+ return (ast.Program (map ((promela -> promela[1]), promelas_no_ty)).__str__ ().encode (), promelas_no_ty, promelas_ty)
+
+def split_arg (split_arg0, l):
+ for fic in l:
+ match [args] :: _ in split_arg0 fic if args:
+ for arg in args:
+ yield arg
+ else:
+ yield fic
+
+export_yml_ext = (dir0, fic) -> dir0 + fic + '.yml' if fic else dir0
+
+class read_eval_print:
+ def __init__ (self, seed, argv, input_output_yaml):
+ random.seed seed
+ self.hash_cmd_uniform = random.random ().__str__ ()
+ self.hash_cmd_generic = random.random ().__str__ ()
+ self.cwd = os.getcwd ()
+ self.parse_commands = syntax_yaml.parse_commands if input_output_yaml else syntax_ml.parse_commands
+ self.parse_directive = syntax_yaml.parse_directive if input_output_yaml else syntax_ml.parse_directive
+ self.unparse_annotation_cmd = syntax_yaml.unparse_annotation_cmd if input_output_yaml else syntax_ml.unparse_annotation_cmd
+
+ options_pml = ['-p', '--promela']
+ options0 = ['-a', '--annotation'] + options_pml
+ options_annotation_file = ['-af', '--annotation_file']
+ options = options_annotation_file + options0
+ def map_arg (l):
+ case l:
+ match [option, cts] + xs if option in options0 or option in options_annotation_file:
+ yield ( arg_pml () if option in options_pml else
+ arg_annotation () if option in options0 else
+ arg_annotation_file ()
+ , cts)
+ yield from map_arg xs
+ match [fic] + xs:
+ if fic.find ('-') == 0:
+ if fic in options:
+ input_continue (f'Missing parameter: {fic} {xs}')
+ else:
+ input_continue (f'Unknown option: {fic} {xs}')
+ elif os.path.isfile fic:
+ yield (arg_pml_file (), fic)
+ else:
+ input_continue (f'Expecting an existing file: {fic}')
+ yield from map_arg xs
+ def split_arg0 (fic):
+ for opt in options:
+ match [''] + args in fic.split opt:
+ arg = opt.join args
+ yield [opt, arg] if arg else None
+ self.argv = list (map_arg (list (split_arg (split_arg0, argv))))
+
+ ##
+ # printf insertion
+
+ def insert_printf_node (self, node, node_args, node_pos, no_atomic, print_first):
+ printf = ast.Printf <| ast.Node ()
+ dict_print_ty = { 'int' : '%d' }
+ print_default = ('%d', '0', 1)
+ proc_args = list (map ((x -> ((dict_print_ty[x[1][0]], x[1][1] if x[1][1] else 1) if x[1][0] in dict_print_ty else (None, print_default[2]), x[0])), node_args))
+ printf.s = self.unparse_annotation_cmd (self.hash_cmd_uniform, list (map (str, [node_pos[0], node_pos[1]])) + list (map ((x -> map ((_ -> x[0][0]), range (x[0][1])) if x[0][0] else [print_default[0]]), proc_args) |> flatten)) |> syntax_pml.unparse_string
+
+ if proc_args:
+ printf.args = ast.Sequence (map ((x -> (map ((index -> ast.VarRef (x[1], index = index)), range (x[0][1])) if x[0][1] > 1 else [ast.VarRef (x[1])]) if x[0][0] else [ast.VarRef (print_default[1])]), proc_args) |> flatten)
+ atomic = 'atomic'
+
+ def insert_printf_node0 ():
+ sequence = ast.Sequence ([printf] + node.body if print_first else node.body + [printf])
+ if not no_atomic:
+ sequence.context = atomic
+ node.body = sequence
+ match [seq] in node.body:
+ if type seq is ast.Sequence:
+ if not no_atomic and seq.context != atomic:
+ seq.context = atomic
+ if print_first:
+ seq.insert 0 printf
+ else:
+ seq.append printf
+ else:
+ insert_printf_node0 ()
+ else:
+ insert_printf_node0 ()
+ return (node.name, proc_args)
+
+ ##
+ #
+
+ def init_top_elems (self, programs, sys_args, l_check_unparse_parse = False):
+ annots = [ syntax_pml.parse_annotations
+ (self.parse_commands,
+ self.hash_cmd_generic,
+ (arg_ty, arg_cts),
+ pos_file,
+ os.path.dirname (os.path.abspath arg_cts) if arg_ty == arg_pml_file () or arg_ty == arg_annotation_file () else self.cwd)
+ for (pos_file, (arg_ty, arg_cts)) in mapi (args -> args, sys_args) ]
+ dict_top_elem = {}
+
+ for (node_file, (node, node_line)) in programs:
+ if isinstance (node, ast.Proctype) or type node is ast.InlineDef or type node is ast.LTL:
+ dict_top_elem[node.name] = (node_file, node_line)
+
+ def check_binding (binding):
+ dic = find_annot ((annots, dict_top_elem), binding)
+ return (pos -> pos in dic)
+
+ binding_no_printf = 'no_printf'
+ check_no_printf = check_binding binding_no_printf
+ binding_no_atomic = 'no_atomic'
+ check_no_atomic = check_binding binding_no_atomic
+ binding_print_first = 'print_first'
+ check_print_first = check_binding binding_print_first
+ binding_disable_negation = 'disable_negation'
+ check_disable_negation = check_binding binding_disable_negation
+
+ l_refine_uniform_strip = find_annot_bool ((annots, dict_top_elem), 'refine_uniform_strip')
+ def strip (s):
+ if l_refine_uniform_strip:
+ l = s.split ('\n')
+ while l and l[0].strip () == '':
+ l = l[1:]
+ return '\n'.join (l).rstrip ()
+ else:
+ return s
+
+ dict_refine_uniform0 = syntax_pml.parse_annotations_directives (self.parse_directive, strip, find_annot0 ((annots, dict_top_elem), binding_refine_uniform))
+ dict_refine_uniform = { pos2 : cmds |> map $ (pos_cmds -> pos_cmds[1]) |> flatten |> list
+ for pos2, cmds in dict_refine_uniform0.items () }
+ l_refine_uniform_force = find_annot_bool ((annots, dict_top_elem), 'refine_uniform_force')
+ l_check_unparse_parse = l_check_unparse_parse or find_annot_bool ((annots, dict_top_elem), 'check_unparse_parse')
+ l_export_input_yaml = find_annot_list2 ((annots, dict_top_elem), 'export_input_yaml')
+
+ if l_check_unparse_parse or l_export_input_yaml:
+ def group_max (dict_refine_uniform0):
+ def group (f_next, xs):
+ for key, xs in groupby (xs, key = (val -> val[0][0])):
+ yield (key, list <| f_next (map ((val -> (val[0][1:], val[1])), xs)))
+ return list (sorted (dict_refine_uniform0.values ()
+ |> flatten
+ |> map $ (pos4_lines -> (pos4_lines[0][0], (pos4_lines[0][1], pos4_lines[1]))),
+ key = (pos_lines -> pos_lines[0]))
+ |> group $ (group $ (map $ (val -> (val[0][0], val[1])))))
+ dict_refine_uniform0 = group_max dict_refine_uniform0
+ if l_check_unparse_parse:
+ def check_source (arg_cpt, subst_lines, lines):
+ def check_annotation (*args):
+ args0 = ((annots -> (strip, find_annot0 ((annots, dict_top_elem), binding_refine_uniform))), group_max) + args
+ syntax_ml.check_annotation <*| args0
+ syntax_yaml.check_annotation <*| args0
+ yield
+ consume <| syntax_pml.map_source (check_annotation, self.parse_commands, False, subst_lines, lines)
+ iter_files (check_source, dict_refine_uniform0, sys_args)
+ else:
+ dict_refine_uniform0 = None
+
+ procs_args = {}
+ for (node_file, (node, node_line)) in programs:
+ type_inst_proctype = isinstance (node, ast.Proctype)
+ type_is_ltl = type node is ast.LTL
+ if type_inst_proctype or type node is ast.InlineDef or type_is_ltl:
+ node_pos = (node_file, node_line)
+
+ if check_no_printf node_pos:
+ del dict_top_elem[node.name]
+ elif not type_is_ltl:
+ node.disable_negation = check_disable_negation node_pos
+ node_in_dict_refine = node_pos in dict_refine_uniform
+ if node_in_dict_refine or l_refine_uniform_force:
+ node_args = {}
+ def add_dic (node_args, name, ty_arit):
+ if name in node_args:
+ input_continue (f'Duplicated parameters: {name}')
+ node_args[name] = ty_arit
+ return node_args
+
+ if type_inst_proctype:
+ for arg in node.args if node.args else []:
+ node_args = add_dic (node_args, arg[0].name, (arg[0].type, None))
+ else:
+ for name in node.decl if node.decl else []:
+ node_args = add_dic (node_args, name, (None, None))
+
+ node_args0 = { name : False for name in node_args.keys () }
+
+ if node_in_dict_refine:
+ for src in dict_refine_uniform[node_pos]:
+ match src_directive (_, ty_arit, name) in src:
+ if name in node_args:
+ def redef_direc (msg):
+ if name not in node_args0 or node_args0[name]:
+ input_continue ('Redefinition of directive' + msg)
+ msg0 = f': Overwriting the old type ({node_args[name]}) for {name} with {ty_arit}'
+ if node_args[name] != (None, None):
+ if ty_arit != (None, None) and node_args[name] != ty_arit:
+ input_continue ('Type error' + msg0)
+ overwrite = True
+ else:
+ redef_direc ('')
+ overwrite = False
+ elif ty_arit != (None, None):
+ redef_direc (msg0)
+ overwrite = True
+ else:
+ redef_direc ('')
+ overwrite = False
+ else:
+ overwrite = True
+ node_args0[name] = True
+ if overwrite:
+ node_args[name] = ty_arit
+
+ if node_pos in procs_args:
+ input_continue (f'Duplicated positions: {node_pos}')
+ procs_args[node_pos] = self.insert_printf_node (node, node_args.items (), node_pos, check_no_atomic node_pos, check_print_first node_pos)
+ return (programs, (annots, dict_top_elem), procs_args, dict_refine_uniform0, dict_refine_uniform, l_refine_uniform_force, l_check_unparse_parse, l_export_input_yaml)
+
+ ##
+ #
+
+ def write_promela (self, (promelas, annots, procs_args, dict_refine_uniform0, dict_refine_uniform, l_refine_uniform_force, l_check_unparse_parse, l_export_input_yaml)):
+ def exec_input (print_stdout, print_stderr, binding, args):
+ for cmd in find_annot_list1 (annots, binding):
+ cmd = list cmd
+ if cmd:
+ yield subprocess_exec (print_stdout, print_stderr, cmd[0], cmd[1:] + args)
+ else:
+ input_continue (f'Empty command: {binding}')
+
+ promelas_no_ltl_str, promelas_no_ltl, promelas_ltl = partition_pml_ast ((promela -> type promela is ast.LTL), promelas)
+ l_refine_uniform_log = find_annot_bool (annots, 'refine_uniform_log')
+ l_enclose = find_annot_list1 (annots, 'enclose') |> map $ (args -> (args[0], args[1])) |> list
+ l_enclose_uniform = find_annot_list1 (annots, 'enclose_uniform') |> map $ (args -> (args[0], args[1])) |> list
+ l_export_dir = find_annot_list (annots, 'export_dir') |> flatten |> map $ (args -> args[0] + '/' + args[1]) |> list
+ l_export_pml = find_annot_list2 (annots, 'export_pml')
+ l_export_trail = find_annot_list2 (annots, 'export_trail')
+ l_export_code = find_annot_list2 (annots, 'export_code')
+ l_refine_generic = find_annot_list2 (annots, 'refine_generic')
+ dic_refine_generic = parse_refine_generic l_refine_generic
+ l_export_trail_num = find_annot_list2 (annots, 'export_trail_num')
+
+ if l_export_dir:
+ def copy_cat0 (ltl_dirname):
+ def copy_cat (src, dir1, dst):
+ src.close ()
+ for dir0 in l_export_dir:
+ dir0 = dir0 + '/' + ltl_dirname + ('/' + '/'.join (dir1) if dir1 else '')
+ mkdir dir0
+ copy_i (src.name, dir0 + '/' + ''.join (dst))
+ return copy_cat
+ else:
+ def copy_cat0 (ltl_dirname):
+ def copy_cat (src, dir1, dst):
+ src.close ()
+ subprocess_exec (True, True, 'cat', [src.name])
+ return copy_cat
+
+ if l_export_input_yaml:
+ l_export_input_only_content = find_annot_bool (annots, 'export_input_only_content')
+ for dir_yaml in l_export_input_yaml:
+ copy_cat = copy_cat0 (dir_yaml)
+ def write_source (arg_cpt, subst_lines, lines):
+ def write_unlink0 (fic):
+ for toks in syntax_pml.map_source (syntax_yaml.unparse_annotation, self.parse_commands, l_export_input_only_content, subst_lines, lines):
+ fic.write (toks.encode ())
+ copy_cat (fic, [], [export_yml_ext ('', str arg_cpt)])
+ write_unlink (write_unlink0)
+ iter_files (write_source, dict_refine_uniform0, self.argv)
+
+ pml_name = 'pan.pml'
+ if l_export_trail_num:
+ add_num = (num, l) -> list <| map (fic -> fic + num, l)
+ trails = list <| map ((num -> (pml_name + num + '.trail', add_num (num, l_export_trail), add_num (num, l_export_code))), l_export_trail_num)
+ model_checker_exec = 'model_checker_exec_all'
+ else:
+ trails = [(pml_name + '.trail', l_export_trail, l_export_code)]
+ model_checker_exec = 'model_checker_exec_one'
+
+ with tempfile.TemporaryDirectory () as dir_test:
+ def write_promela0 (ltl_cpt, write_promela_fic, get_dirname, ltl_pos, ltl_name):
+ def write_unlink0 (ltl_dirname0, fic_pml):
+ ltl_dirname = get_dirname ltl_dirname0
+ copy_cat = copy_cat0 ltl_dirname
+ def copy_cat_or_close (src, dir1, dst):
+ if dst:
+ copy_cat (src, dir1, dst)
+ else:
+ src.close ()
+ dir_test_ltl = dir_test + '/' + ltl_dirname
+ mkdir dir_test_ltl
+ write_promela_fic fic_pml
+ copy_cat_or_close (fic_pml, [], l_export_pml)
+ os.symlink (fic_pml.name, dir_test_ltl + '/' + pml_name)
+ os.chdir dir_test_ltl
+
+ consume <| exec_input (True, True, 'model_checker_verifier', [pml_name])
+ consume <| exec_input (True, True, 'model_checker_compile', [])
+ pattern_vectorsz = 'VECTORSZ'
+ warn_lines = [ line for lines in exec_input (True, True, model_checker_exec, []) for line in lines.splitlines () if pattern_vectorsz in line ]
+ if warn_lines:
+ for line in warn_lines:
+ print line
+ input_continue_warn ('A trail file has been generated, but it might be not yet exploitable. To obtain a possibly different trail file with more information, one can try increase "'+ pattern_vectorsz +'" (using e.g. "gcc -D'+ pattern_vectorsz +'=4096" or higher values).')
+
+ for trail_name, l_export_trail, l_export_code in trails:
+ if os.path.exists trail_name:
+ trail = self.unparse_annotation_cmd ('add_cmd', [self.hash_cmd_uniform]) + '\n' + ''.join (exec_input (False, True, 'model_checker_trail', [trail_name, pml_name]))
+ if l_export_trail:
+ def write_unlink01 (fic_trail):
+ fic_trail.write (trail.encode ())
+ copy_cat (fic_trail, [], l_export_trail)
+ write_unlink write_unlink01
+
+ l_refine_uniform = dict_refine_uniform or l_refine_uniform_force
+ if l_refine_uniform or l_refine_generic:
+ _, annots_trail, _, _, _, _, _, _ = self.init_top_elems ([], [(arg_pml (), trail)], l_check_unparse_parse)
+ def write_c (refine_line, l_refine_other, dir0):
+ def write_unlink00 (fic_c):
+ def write_nl (lines):
+ for line in lines:
+ if line:
+ fic_c.write (line.encode ())
+ fic_c.write ('\n'.encode ())
+ def write_nl_dic (name, dic, expand_directive):
+ if l_refine_uniform_log:
+ write_nl ([self.unparse_annotation_cmd ('promela_term', [name])])
+ write_nl (syntax_pml.unparse_source_directive (expand_directive, dic))
+ if l_refine_uniform_log:
+ write_nl ([''])
+ write_nl (map (x -> x[0], l_enclose))
+ refine_line (find_annot_list1 $ annots_trail, write_nl, write_nl_dic)
+ write_nl (map (x -> x[1], reversed l_enclose))
+ copy_cat_or_close (fic_c, [dir0] if l_refine_other else [], l_export_code)
+ write_unlink write_unlink00
+
+ if l_refine_uniform:
+ def refine_line (find_annot_l, write_nl, write_nl_dic):
+ write_nl (map (x -> x[0], l_enclose_uniform))
+ for spin_atomic in find_annot_l self.hash_cmd_uniform:
+ pos_proc = (int (spin_atomic[0]), int (spin_atomic[1]))
+ c_print = dict_refine_uniform[pos_proc] if pos_proc in dict_refine_uniform else []
+ proc_name, proc_args = procs_args[pos_proc]
+ spin_atomic = spin_atomic[2:]
+ dict_args = {}
+ for (ty, alias_pml) in proc_args:
+ dict_args[alias_pml] = (ty[0], ty[1], spin_atomic[: ty[1] ])
+ spin_atomic = spin_atomic[ ty[1] :]
+ # note: from this point, spin_atomic should have been fully consumed
+ def expand_directive (cvar_pvar):
+ # note: cvar_pvar[1] is in dict_args
+ ty_val = dict_args[cvar_pvar[1]]
+ if ty_val[0] == None:
+ input_continue (f'Not serializable type: printing the content of {cvar_pvar[1]} with a default value')
+ pos = 'pos'
+ concat_str = '_'
+ def def_name (nb):
+ return '{}{}{}'.format (cvar_pvar[0], concat_str, nb)
+ def define_undef (pat, var, *args):
+ return (('#define ' + pat).format (var, *args), '#undef ' + var)
+
+ if ty_val[1] == 1:
+ lines = [define_undef ('{} {}', cvar_pvar[0], ty_val[2][0])]
+ else:
+ lines = [define_undef ('{} ""', def_name 0)] + list (map ((ind_val -> define_undef ('{} {} " {}"', def_name (ind_val[0] + 1), def_name (ind_val[0]), ind_val[1])), zip (range (ty_val[1]), ty_val[2]))) + [define_undef ('{}({}) {}{} ## {}', cvar_pvar[0], pos, cvar_pvar[0], concat_str, pos)]
+ lines = list (zip (* lines))
+ return (list (lines[0]), list (lines[1]))
+ write_nl_dic (proc_name, c_print, expand_directive)
+ if ltl_pos in dict_refine_uniform:
+ write_nl_dic (ltl_name, dict_refine_uniform[ltl_pos], (_ -> ([], [])))
+ write_nl (map (x -> x[1], reversed l_enclose_uniform))
+ write_c (refine_line, l_refine_generic, 'uniform')
+
+ if l_refine_generic:
+ def refine_line (find_annot_l, write_nl, write_nl_dic):
+ cmd = refine_command.command dic_refine_generic
+ for spin_atomic in find_annot_l self.hash_cmd_generic:
+ cmd.refineSPINLine_main (list spin_atomic)
+ write_nl (''.join (cmd.test_body ()).splitlines ()) # LTL support for the 'generic' style is not yet supported. The writing would have to be performed inside this instruction.
+ write_c (refine_line, l_refine_uniform, 'generic')
+ else:
+ input_continue (f'No trail file for {fic_pml.name} ({dir_test_ltl}/{pml_name})')
+ write_unlink (write_unlink0 $ (str ltl_cpt))
+ return ltl_cpt + 1
+
+ ltl_cpt = 0
+ for ltl_pos_file, (ltl, ltl_pos_line) in promelas_ltl:
+ def write_promela_fic (fic_pml):
+ fic_pml.write promelas_no_ltl_str
+ fic_pml.write (ast.LTL (ast.Unary ('!', ltl.formula), name = ltl.name).__str__ ().encode ())
+ ltl_cpt = write_promela0 (ltl_cpt, write_promela_fic, (ltl_dirname0 -> 'ltl' + '_' + (ltl.name if ltl.name else ltl_dirname0)), (ltl_pos_file, ltl_pos_line), ltl.name if ltl.name else ltl.formula.__str__ ())
+
+ promelas_ass = get_forest_ass (ast.Program (map ((promela -> (promela[1][0], (promela[0], promela[1][1]))), promelas_no_ltl)))
+ dic_dirname = {}
+ for ((ltl_name, _), _), _ in promelas_ass:
+ dic_dirname[ltl_name] = dic_dirname[ltl_name] + 1 if ltl_name in dic_dirname else 0
+ for ((ltl_name, ltl), (ltl_pos_file, _)), promelas_ass0 in promelas_ass:
+ def write_promela_fic (fic_pml):
+ fic_pml.write (promelas_ass0.__str__ ().encode ())
+ ltl_cpt = write_promela0 (ltl_cpt, write_promela_fic, (ltl_dirname0 -> 'assert' + '_' + ('' if dic_dirname[ltl_name] == 0 else ltl_dirname0 + '_') + ltl_name), (ltl_pos_file, ltl.pos), ltl.expr.__str__ ())
+ os.chdir self.cwd
+
+def main (*args):
+ st = read_eval_print (*args)
+ st.write_promela (st.init_top_elems (list (fold_prog st.argv), st.argv))