diff options
author | Andrew Butterfield <Andrew.Butterfield@scss.tcd.ie> | 2023-01-10 19:57:22 +0000 |
---|---|---|
committer | Sebastian Huber <sebastian.huber@embedded-brains.de> | 2023-01-18 12:42:32 +0100 |
commit | 1283c1680a2042e880220f0d62770b97859ea03b (patch) | |
tree | 2575852bd3b236af73332e2356fb1a21e599a97d | |
parent | top-level sources (diff) | |
download | rtems-central-1283c1680a2042e880220f0d62770b97859ea03b.tar.bz2 |
main Coconut sources
-rw-r--r-- | formal/promela/src/src/__init__.py | 0 | ||||
-rw-r--r-- | formal/promela/src/src/library.coco | 94 | ||||
-rw-r--r-- | formal/promela/src/src/refine_command.coco | 487 | ||||
-rw-r--r-- | formal/promela/src/src/spin2test.coco | 104 | ||||
-rw-r--r-- | formal/promela/src/src/syntax_ml.coco | 211 | ||||
-rw-r--r-- | formal/promela/src/src/syntax_pml.coco | 396 | ||||
-rw-r--r-- | formal/promela/src/src/syntax_yaml.coco | 182 | ||||
-rw-r--r-- | formal/promela/src/src/testgen.coco | 624 |
8 files changed, 2098 insertions, 0 deletions
diff --git a/formal/promela/src/src/__init__.py b/formal/promela/src/src/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/formal/promela/src/src/__init__.py diff --git a/formal/promela/src/src/library.coco b/formal/promela/src/src/library.coco new file mode 100644 index 00000000..e212538d --- /dev/null +++ b/formal/promela/src/src/library.coco @@ -0,0 +1,94 @@ + ############################################################################## + # Library + # + # Copyright (C) 2019-2021 Trinity College Dublin (www.tcd.ie) + # + # All rights reserved. + # + # Redistribution and use in source and binary forms, with or without + # modification, are permitted provided that the following conditions are + # met: + # + # * Redistributions of source code must retain the above copyright + # notice, this list of conditions and the following disclaimer. + # + # * Redistributions in binary form must reproduce the above + # copyright notice, this list of conditions and the following + # disclaimer in the documentation and/or other materials provided + # with the distribution. + # + # * Neither the name of the copyright holders nor the names of its + # contributors may be used to endorse or promote products derived + # from this software without specific prior written permission. + # + # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ############################################################################## + +import fileinput +import ply.yacc +import sys + +# + +def input0 (): + input ('Continue? (ENTER/CTRL+C): ') + +def input_continue (msg): + print msg + input0 () + +input_continue_logger = ply.yacc.PlyLogger (sys.stderr) + +def input_continue_warn (msg): + input_continue_logger.warning (msg) + input0 () + +def input_continue_err (msg): + input_continue_logger.error (msg) + input0 () + +# + +def isinst (ty, f_get, f_check, toks): + return isinstance (toks, ty) and all ([ f_check tok for tok in f_get toks ]) +def isinst_list (f_check, toks): + return isinst (list, (toks -> toks), f_check, toks) +def isinst_dict (check1_check2, toks): + return isinst (dict, (toks -> toks.items ()), (toks -> check1_check2[0] (toks[0]) and check1_check2[1] (toks[1])), toks) +def isinst_str (toks): + return isinstance (toks, str) + +# + +def mapi (f, l): + n = 0 + for x in l: + yield f ((n, x)) + n += 1 + +def flatten (l): return (x for xs in l for x in xs) + +def update_dic (dic, pos, cmds): + dic[pos] = dic[pos] + cmds if pos in dic else cmds + +def fileinput_input (path): + try: + with fileinput.input path as lines: + for line in lines: + yield line + except FileNotFoundError: + input_continue (f'File not found: {path}') + except IsADirectoryError: + input_continue (f'Is a directory: {path}') + +fileinput_input0 = ''.join <.. fileinput_input diff --git a/formal/promela/src/src/refine_command.coco b/formal/promela/src/src/refine_command.coco new file mode 100644 index 00000000..e87e7be2 --- /dev/null +++ b/formal/promela/src/src/refine_command.coco @@ -0,0 +1,487 @@ +# SPDX-License-Identifier: BSD-2-Clause +"""Refines Annotated SPIN output to test program code""" + +# Copyright (C) 2019-2021 Trinity College Dublin (www.tcd.ie) +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import itertools +import logging + +logger = logging.getLogger (__name__) + +class command: + + def __init__(self, ref_dict, outputLOG = False, annoteComments = True, outputSWITCH = False): + self.ref_dict = ref_dict + self.ref_dict_keys = ref_dict.keys() + + self.inSTRUCT = False # True when in STRUCT + self.inSEQ = False # True when in SEQ + self.seqForSEQ = "" # Holds space-separated sequence output string when in SEQ + self.inName = "" # Set to SEQ/STRUCT name when inside + + # We assume the target test code language is C/C++ by default + self.EOLC = "//" + self.CMTBEGIN = "/*" + self.CMTEND = "*/" + # The following should all be in refinement file + self.SEGNAMEPFX = "TestSegment{}" + self.SEGARG = "Context* ctx" + self.SEGDECL = "static void {}( {} )" # segname, segarg, + self.SEGBEGIN = " {" + self.SEGEND = "}" + self.TESTLOGGER = 'T_log(T_NORMAL,"{}");' + + self.testName = "Un-named Test" + self.defCode = [] # list of strings + self.declCode = [] # list of strings + self.testCodes = [] # list (indexed by pid) of list of strings + self.procIds = set() # set of process ids in use + self.currentId = 0 # currently running process + self.switchNo = 0 + + self.outputLOG = outputLOG + self.annoteComments = annoteComments + self.outputSWITCH = outputSWITCH + + match def setComments(self, 'C'): + logger.debug("Set LANGUAGE to C\n") + self.EOLC = "//" + self.CMTBEGIN = "/*" + self.CMTEND = "*/" + + addpattern def setComments(self, 'Python'): + logger.debug("Set LANGUAGE to Python\n") + self.EOLC = "#" + self.CMTBEGIN = '"""\n' + self.CMTEND = '\n"""' + + # catch-all for setComments + addpattern def setComments(self, lang): + logger.debug("Unknown LANGUAGE "+lang+", set to C\n") + + def setupSegmentCode(self): + if 'SEGNAMEPFX' in self.ref_dict_keys: + self.SEGNAMEPFX = self.ref_dict['SEGNAMEPFX'] + else: + logger.debug("SEGNAMEPFX not defined, using "+self.SEGNAMEPFX) + if 'SEGARG' in self.ref_dict_keys: + self.SEGARG = self.ref_dict['SEGARG'] + else: + logger.debug("SEGARG not defined, using "+self.SEGARG) + if 'SEGDECL' in self.ref_dict_keys: + self.SEGDECL = self.ref_dict['SEGDECL'] + else: + logger.debug("SEGDECL not defined, using "+self.SEGDECL) + if 'SEGBEGIN' in self.ref_dict_keys: + self.SEGBEGIN = self.ref_dict['SEGBEGIN'] + else: + logger.debug("SEGBEGIN not defined, using "+self.SEGBEGIN) + if 'SEGEND' in self.ref_dict_keys: + self.SEGEND = self.ref_dict['SEGEND'] + else: + logger.debug("SEGEND not defined, using "+self.SEGEND) + logger.debug("SEGNAMEPFX is '"+self.SEGNAMEPFX+"'") + logger.debug("SEGARG is '"+self.SEGARG+"'") + logger.debug("SEGDECL is '"+self.SEGDECL+"'") + logger.debug("SEGBEGIN is '"+self.SEGBEGIN+"'") + logger.debug("SEGEND is '"+self.SEGEND+"'") + + def setupLanguage(self): + if 'LANGUAGE' in self.ref_dict_keys: + language = self.ref_dict['LANGUAGE'] + self.setComments(language) + self.setupSegmentCode() + else: + pass # assume default: C + + def collectPIds(self, ln): + if len(ln) > 0: + self.procIds.add(int(ln[0])) + + def addCode(self, id,codelines): + # logger.debug("addCode lines are {} {}".format(type(codelines),codelines)) + self.testCodes = self.testCodes + [(int(id), codelines)] + + def addCode_int (self, pid, f_codelines0, f_codelines1, value): + try: + value = int value + except: + value = None + if value is not None: + if value == 0: + self.addCode (pid, [f_codelines0 (())]) + else: + self.addCode (pid, [f_codelines1 value]) + + def switchIfRequired(self, ln): + if len(ln) > 0: + pid = ln[0] + i = int(pid) + if i == self.currentId: + pass + else: # proc self.currentId stops here, and proc i resumes + self.switchNo =self.switchNo+1 + if 'SUSPEND' not in self.ref_dict_keys: + self.addCode(pid,[self.EOLC+" SUSPEND: no refinement entry found"]) + else: + code = self.ref_dict['SUSPEND'] + self.addCode(self.currentId,[code.format(self.switchNo,self.currentId,i)]) + if 'WAKEUP' not in self.ref_dict_keys: + self.addCode(pid,[self.EOLC+" WAKEUP: no refinement entry found"]) + else: + code = self.ref_dict['WAKEUP'] + self.addCode(i,[code.format(self.switchNo,i,self.currentId)]) + self.currentId = i + + def logSPINLine(self, ln): + if len(ln) > 1: + pid = int(ln[0]) + str = ' '.join(ln) + if ln[1] in ['NAME','DEF']: + self.defCode = self.defCode + [self.EOLC+" @@@ {}".format(str)] + elif ln[1] in ['DECL','DCLARRAY'] : + self.declCode = self.declCode + [self.EOLC+" @@@ {}".format(str)] + elif ln[1] == 'LOG' : + pass + else: + self.addCode(pid,['T_log(T_NORMAL,"@@@ {}");'.format(str)]) + + # Promela Annotations + # + # Format: `@@@ <pid> <KEYWORD> <thing1> ... <thingN>` where N >= 0 + # + # Things: + # <pid> Promela Process Id of proctype generating annotation + # <word> chunk of text containing no white space + # <name> Promela variable/structure/constant identifier + # <type> Promela type identifier + # <tid> OS Task/Thread/Process Id alias + # [ <thing> ] An optional <thing> + # ( <thing1> | <thing2>) A choice of <thing1> or <thing2> + # _ unused argument (within containers) + # + # KEYWORDS and "things" + # LOG <word1> ... <wordN> + # NAME <name> + # INIT + # DEF <name> <value> + # DECL <type> <name> [<value>] + # DCLARRAY <type> <name> <value> + # TASK <name> + # SIGNAL <name> <value> + # WAIT <name> <value> + # STATE tid <name> + # SCALAR (<name>|_) [<index>] <value> + # PTR <name> <value> + # STRUCT <name> + # SEQ <name> + # END <name> + # CALL <name> <value1> ... <valueN> + + # Refinement Mechanisms + # Direct Output - no lookup + # Keyword Refinement - lookup (the) <KEYWORD> + # Name Refinement - lookup (first) name + # The same name may appear in different contexts + # We add '_XXX' suffixes to lookup less frequent uses + # _DCL - A variable declaration + # _PTR - The pointer value itself + # _FSCALAR - A scalar that is a struct field + # _FPTR - A pointer that is a struct field + # Type Refinement - lookup (first) type + # Typed-Name Refinement - lookup type-name + + # LOG <word1> ... <wordN> + # Direct Output + match def refineSPINLine(self, [pid,'LOG']+rest): + if self.outputLOG: + ln = ' '.join(rest) + self.addCode(pid,["T_log(T_NORMAL,{});".format(ln)]) + else: + pass + + # NAME <name> + # Keyword Refinement (NAME) + addpattern def refineSPINLine(self, [pid,'NAME',name]): + if 'NAME' not in self.ref_dict_keys: + self.addCode(pid,[self.EOLC+" CANNOT REFINE 'NAME'"]) + else: + self.addCode(pid,self.ref_dict["NAME"].rsplit('\n')) + + # INIT + # Keyword Refinement (INIT) + addpattern def refineSPINLine(self, [pid,'INIT']): + if 'INIT' not in self.ref_dict_keys: + self.addCode(pid,[self.EOLC+" CANNOT REFINE 'INIT'"]) + else: + self.addCode(pid,self.ref_dict["INIT"].rsplit('\n')) + + # TASK <name> + # Name Refinement (<name>) + addpattern def refineSPINLine(self, [pid,'TASK',task_name]): + if task_name not in self.ref_dict_keys: + self.addCode(pid,[self.EOLC+" CANNOT REFINE TASK {}".format(task_name)]) + else: + self.addCode(pid,self.ref_dict[task_name].rsplit('\n')) + + # SIGNAL <value> + # Keyword Refinement (SIGNAL) + addpattern def refineSPINLine(self, [pid,'SIGNAL',value]): + if 'SIGNAL' not in self.ref_dict_keys: + self.addCode(pid,[self.EOLC+" CANNOT REFINE SIGNAL {}".format(value)]) + else: + self.addCode(pid,self.ref_dict['SIGNAL'].format(value).rsplit('\n')) + + # WAIT <value> + # Keyword Refinement (WAIT) + addpattern def refineSPINLine(self, [pid,'WAIT',value]): + if 'WAIT' not in self.ref_dict_keys: + self.addCode(pid,[self.EOLC+" CANNOT REFINE WAIT {}".format(value)]) + else: + self.addCode(pid,self.ref_dict['WAIT'].format(value).rsplit('\n')) + + # DEF <name> <value> + # Direct Output + addpattern def refineSPINLine(self, [pid,'DEF',name,value]): + self.defCode = self.defCode + [' '.join(['#define',name,value])] + + # DECL <type> <name> [<value>] + # Name Refinement (<name>_DCL) + # add with 'static' to declCode if pid==0, + # add without 'static' using addCode, otherwise + addpattern def refineSPINLine(self, [pid,'DECL',typ,name]+rest): + logger.debug("Processing DECL {} {} {}".format(type,name,rest)) + name_dcl = name + '_DCL' + if name_dcl not in self.ref_dict_keys: + decl = self.EOLC+" CANNOT REFINE Decl {}".format(name_dcl) + else: + decl = ' '.join([self.ref_dict[name_dcl],name]) + if len(rest) > 0: + decl = decl + " = " + rest[0] + ";" + else: + decl = decl + ";" + if int(pid) == 0: + decl = 'static ' + decl + self.declCode = self.declCode + [decl] + else: + self.addCode(pid,[decl]) + + # DCLARRAY <type> <name> <value> + # Name Refinement (<name>_DCL) + # add with 'static' to declCode if pid==0, + # add without 'static' using addCode, otherwise + addpattern def refineSPINLine(self, [pid,'DCLARRAY',typ,name,value]): + logger.debug("Processing DECLARRAY {} {} {}".format(type,name,value)) + name_dcl = name + '_DCL' + if name_dcl not in self.ref_dict_keys: + dclarray = [self.EOLC+" DCLARRAY: no refinement entry for '{}'".format(name_dcl)] + else: + code = self.ref_dict[name_dcl].format(name,value) + if int(pid) == 0: + code = 'static ' + code + dclarray = code.rsplit('\n') + if int(pid) == 0: + self.declCode = self.declCode + dclarray + else: + self.addCode(pid,dclarray) + + # PTR <name> <value> + # Name (_PTR/_FPTR) refinement + addpattern def refineSPINLine(self, [pid,'PTR',name,value]): + i = int(pid) + if not self.inSTRUCT: + pname = name + '_PTR' + if pname not in self.ref_dict_keys: + self.addCode(pid,[self.EOLC+" PTR: no refinement entry for '{}'".format(pname)]) + else: + pcode = self.ref_dict[pname].rsplit('\n') + self.addCode_int (pid, (_ -> pcode[0]), (value -> pcode[1].format(value)), value) + else: + pname = name + '_FPTR' + if pname not in self.ref_dict_keys: + self.addCode(pid,[self.EOLC+" PTR(field): no refinement for '{}'".format(pname)]) + else: + pcode = self.ref_dict[pname].rsplit('\n') + self.addCode_int (pid, (_ -> pcode[0].format(self.inName)), (value -> pcode[1].format(self.inName,value)), value) + + # CALL <name> <value0> .. <valueN> + # Name refinement + addpattern def refineSPINLine(self, [pid,'CALL',name]+rest): + logger.debug("Processing CALL {} {}".format(name,rest)) + if name not in self.ref_dict_keys: + logger.debug("CALL name not found") + self.addCode(pid,[self.EOLC+" CALL: no refinement entry for '{}'".format(name)]) + else: + code = self.ref_dict[name] + logger.debug("CALL code: {}".format(code)) + case len(rest): + match 0: callcode = code.rsplit('\n') + match 1: callcode = (code.format(rest[0])).rsplit('\n') + match 2: callcode = (code.format(rest[0],rest[1])).rsplit('\n') + match 3: callcode = (code.format(rest[0],rest[1],rest[2])).rsplit('\n') + match 4: callcode = (code.format(rest[0],rest[1] + ,rest[2],rest[3])).rsplit('\n') + match 5: callcode = (code.format(rest[0],rest[1] + ,rest[2],rest[3],rest[4])).rsplit('\n') + match 6: callcode = (code.format(rest[0],rest[1] + ,rest[2],rest[3],rest[4],rest[5])).rsplit('\n') + else: + callcode = [self.EOLC+" CALL: can't handle > 6 arguments"] + self.addCode(pid,callcode) + logger.debug("CALL complete") + + # STRUCT <name> + addpattern def refineSPINLine(self, [pid,'STRUCT',name]): + self.inSTRUCT = True # should check not already inside anything! + self.inName = name + + # SEQ <name> + addpattern def refineSPINLine(self, [pid,'SEQ',name]): + self.inSEQ = True # should check not already inside anything! + self.seqForSEQ = "" + self.inName = name + + # END <name> + addpattern def refineSPINLine(self, [pid,'END',name]): + if self.inSTRUCT: + self.inSTRUCT = False + if self.inSEQ: + seqName = name + "_SEQ" + if seqName not in self.ref_dict_keys: + self.addCode(pid,["SEQ END: no refinement for ".format(seqName)]) + else: + codelines = self.ref_dict[seqName].rsplit('\n') + for code in codelines: + self.addCode(pid,[code.format(self.seqForSEQ)]) + self.inSEQ = False + self.seqForSEQ = "" + self.inName = "" + + # SCALAR _ <value> + addpattern def refineSPINLine(self, [pid,'SCALAR','_',value]): + # should only be used inside SEQ + self.seqForSEQ = self.seqForSEQ + " " + value + + # SCALAR <name/field> <value> + # Name (<name>/_FSCALAR) Refinement + addpattern def refineSPINLine(self, [pid,'SCALAR',name,value]): + # should not be used inside SEQ + if not self.inSTRUCT: + if name not in self.ref_dict_keys: + self.addCode(pid,[self.EOLC+" SCALAR: no refinement entry for '{}'".format(name)]) + else: + code = self.ref_dict[name] + self.addCode(pid,(code.format(value)).rsplit('\n')) + else: + field = name + "_FSCALAR" + if field not in self.ref_dict_keys: + self.addCode(pid,[self.EOLC+" SCALAR(field): no refinement entry for '{}'".format(field)]) + else: + code = self.ref_dict[field] + self.addCode(pid,[code.format(self.inName,value)]) + + # SCALAR <name> <index> <value> + # Name (<name>/_FSCALAR) Refinement + addpattern def refineSPINLine(self, [pid,'SCALAR',name,index,value]): + # should not be used inside SEQ + if not self.inSTRUCT: + if name not in self.ref_dict_keys: + self.addCode(pid,[self.EOLC+" SCALAR-3: no refinement entry for '{}'".format(name)]) + else: + code = self.ref_dict[name] + self.addCode(pid,code.format(index,value).rsplit('\n')) + else: + field = name + "_FSCALAR" + if field not in self.ref_dict_keys: + self.addCode(pid,[self.EOLC+" SCALAR(field): no refinement entry for '{}'".format(field)]) + else: + code = self.ref_dict[field] + self.addCode(pid,[code.format(self.inName,value)]) + + # STATE <tid> <name> + # Name Refinement + addpattern def refineSPINLine(self, [pid,'STATE',tid,state]): + if state not in self.ref_dict_keys: + self.addCode(pid,[self.EOLC+" STATE: no refinement entry for '{}'".format(state)]) + else: + code = self.ref_dict[state] + self.addCode(pid,[code.format(tid)]) + + # catch-all for refineSPINLine + addpattern def refineSPINLine(self, [pid,hd]+rest): + dunno = self.EOLC+" DON'T KNOW HOW TO REFINE: "+pid+" '"+str(hd) + self.addCode(pid, [dunno]) + + # catch-all for refineSPINLine + addpattern def refineSPINLine(self, [pid]): + dunno = self.EOLC+" DON'T KNOW HOW TO REFINE: "+pid + self.addCode(pid, [dunno]) + + # catch-all for refineSPINLine + addpattern def refineSPINLine(self, []): + pass + + def refineSPINLine_main(self, ln): + self.collectPIds(ln) + if self.outputSWITCH: + self.switchIfRequired(ln) + else: + pass + if self.annoteComments: + self.logSPINLine(ln) + else: + pass + self.refineSPINLine(ln) + + def test_body (self): + yield "\n" + yield self.EOLC+" ===============================================\n\n" + + + + for line in self.defCode: + yield line+"\n" + + + for line in self.declCode: + yield line+"\n" + + # def testseg_name (segno): return "TestSegment{}".format(segno) + def get_id (id_codes): return id_codes[0] + + for sno, lines in itertools.groupby (sorted (self.testCodes, key = get_id), get_id): + yield "\n" + yield self.EOLC+" ===== TEST CODE SEGMENT {} =====\n\n".format(sno) + testseg_name = self.SEGNAMEPFX.format(sno) + testseg = (self.SEGDECL).format(testseg_name,self.SEGARG) + yield "{}{}\n".format(testseg, self.SEGBEGIN) + + for lines0 in lines: + for line in lines0[1]: + yield " "+line+"\n" + + yield self.SEGEND+"\n" + + yield "\n" + yield self.EOLC+" ===============================================\n\n" diff --git a/formal/promela/src/src/spin2test.coco b/formal/promela/src/src/spin2test.coco new file mode 100644 index 00000000..e1accc4c --- /dev/null +++ b/formal/promela/src/src/spin2test.coco @@ -0,0 +1,104 @@ +# SPDX-License-Identifier: BSD-2-Clause +"""Converts Annotated SPIN counter-examples to program test code""" + +# Copyright (C) 2019-2021 Trinity College Dublin (www.tcd.ie) +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# 1. Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import src.refine_command as refine + +# + +import logging +import sys +import yaml + +# + +refine.logger.setLevel (logging.DEBUG) +refine.logger.addHandler (logging.StreamHandler (sys.stderr)) + +def words(string): return string.rsplit(' ') + +def main (testNumber, dir0, fileRoot): + "\n\tSpin2Test (Coconut/Python)\n\n" |> refine.logger.debug + refine.logger.debug("Test Number {}\n".format(testNumber)) + + if int(testNumber) < 0: + num = "" + else: + num = "-{}".format(testNumber) + + spinFile = dir0 + fileRoot + num + ".spn" + refFile = dir0 + fileRoot + "-rfn.yml" + preFile = dir0 + fileRoot + "-pre.h" + postFile = dir0 + fileRoot + "-post.h" + runFile = dir0 + fileRoot + "-run.h" + testFile = dir0 + "tr-" + fileRoot + num + ".c" + + summaryFormat = "!{} --({}`)-> [{};_;{};{}] >> {}\n" + refine.logger.debug(summaryFormat.format( spinFile,refFile + , preFile,postFile + , runFile,testFile )) + + print summaryFormat + + annote_lines = [] + with open(spinFile) as spinfile: + debug_lines = '' + for line in spinfile: + if line[0:4] == "@@@ ": + debug_lines = debug_lines + line + annote_lines = annote_lines + [line[4:][:-1]] + refine.logger.debug (debug_lines) + + annote_bundle = fmap (words,annote_lines) + + refine.logger.debug("Annotation Bundle:\n {}".format(annote_bundle)) + + with open(refFile) as reffile: + ref_dict = yaml.safe_load(reffile.read()) + refine.logger.debug("\nREFINE DUMP") + refine.logger.debug(yaml.dump(ref_dict)) + + cmd = refine.command(ref_dict) + + cmd.setupLanguage() + for ln in annote_bundle: + cmd.refineSPINLine_main(ln) + + refine.logger.debug("\nP-Ids in use: {}\n ".format(cmd.procIds)) + refine.logger.debug("\n\tRefinement Complete; No of processes = {}".format(len(cmd.procIds))) + + with open(testFile,'w') as tstfile: + + with open(preFile) as prefile: + tstfile.write(prefile.read()) + + for line in cmd.test_body(): + tstfile.write(line) + + with open(postFile) as postfile: + tstfile.write(postfile.read()) + + with open(runFile) as runfile: + tstfile.write(runfile.read().format(testNumber)) diff --git a/formal/promela/src/src/syntax_ml.coco b/formal/promela/src/src/syntax_ml.coco new file mode 100644 index 00000000..2199ef97 --- /dev/null +++ b/formal/promela/src/src/syntax_ml.coco @@ -0,0 +1,211 @@ + ############################################################################## + # Syntax ML + # + # Copyright (C) 2019-2021 Trinity College Dublin (www.tcd.ie) + # + # All rights reserved. + # + # Redistribution and use in source and binary forms, with or without + # modification, are permitted provided that the following conditions are + # met: + # + # * Redistributions of source code must retain the above copyright + # notice, this list of conditions and the following disclaimer. + # + # * Redistributions in binary form must reproduce the above + # copyright notice, this list of conditions and the following + # disclaimer in the documentation and/or other materials provided + # with the distribution. + # + # * Neither the name of the copyright holders nor the names of its + # contributors may be used to endorse or promote products derived + # from this software without specific prior written permission. + # + # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ############################################################################## + +from src.library import * +from src import syntax_pml + +# + +from parsec import generate, many, many1, optional, regex, re, string # explicit listing for mypy +from parsec import * # mainly for importing operators + +## +# parse + +tok_open = r'\<open>' +tok_close = r'\<close>' +tok_open0 = '\\' + tok_open +tok_close0 = '\\' + tok_close + +ant_open = r'\<' +ant_open1 = r'\<^' +ant_close = r'>' + +spaces = regex (r'\s*', re.MULTILINE) +spaces1 = regex (r'\s+', re.MULTILINE) +ident00 = regex (r"[._'a-zA-Z0-9]+") +ident0 = regex (r"[\[\]._'a-zA-Z0-9]+") +ident = regex (r"[_'a-zA-Z][_'a-zA-Z0-9]*") +filename = regex (r"[./_'a-zA-Z0-9]+") + +# + +def not_string (s, matched): + @Parser + def not_string_parser (text, index = 0): + len_s = len s + if matched <= 0: + return Value.failure (index, 'a strictly positive number of characters') + elif matched <= len_s and text[index:index + len_s] != s: + return Value.success (index + matched, text[index:index + matched]) + else: + return Value.failure (index, 'a different string') + return not_string_parser + +def enclose_flat (delim_beg, delim_end): + @generate + def enclose_flat0 (): + cts = yield spaces >> string delim_beg >> many (not_string (delim_end, 1)) << string delim_end + return ''.join cts + return enclose_flat0 + +def enclose_nested0 (delim_beg, delim_end): + @generate + def enclose_nested00 (): + @generate + def enclose_nested1 (): + cts1, cts2, cts3 = yield enclose_nested00 + return ''.join ([cts1, cts2, cts3]) + + cts1 = yield string delim_beg + cts2 = yield many (enclose_nested1 ^ not_string (delim_end, 1)) + cts3 = yield string delim_end + return (cts1, ''.join cts2, cts3) + + @generate + def enclose_nested1 (): + _, cts2, _ = yield enclose_nested00 + return cts2 + + return enclose_nested1 + +spaces_cmt = many (spaces1 ^ enclose_nested0 ('(*', '*)')) + +def enclose_nested (delim_beg, delim_end): + return spaces_cmt >> enclose_nested0 (delim_beg, delim_end) + +def quoted (scan): + @generate + def quoted0 (): + delim = r'"' + name = yield spaces_cmt >> string delim >> scan << string delim + return name + return quoted0 + +quoted_ident0 = quoted ident0 + +@generate +def antiquotation (): + name = yield spaces_cmt >> string ant_open >> ident0 << string ant_close + return ant_open + name + ant_close + +@generate +def antiquotation0 (): + name = yield spaces_cmt >> string ant_open1 >> ident0 << string ant_close + return name + +@generate +def command (): + name = yield spaces_cmt >> ident0 + arg = yield many (enclose_nested (tok_open, tok_close) ^ quoted_ident0 ^ quoted filename ^ antiquotation) + return (name, arg) + +commands = many command + +def parse_commands (cmt): + match (pos_line, cmt0) in syntax_pml.parse_comment_annot cmt: + toks = commands.parse cmt0 + if len toks >= 2 and toks[1][0].isupper () and all ([args == [] for _, args in toks]): # TODO: at the time of writing, this parsing recognition is still approximate but seems to already cover enough execution cases + return (pos_line, True, [tok for tok, _ in toks]) + else: + return (pos_line, False, [(cmd[0], cmd[1]) for cmd in commands.parse cmt0]) + else: + return None + +# + +data token_ident (ident) +data token_antiquotation (cmd, ty, name) + +@generate +def directive (): + @generate + def p_ident (): + res = yield spaces >> ident + return (token_ident res) + typ = spaces >> syntax_pml.typ + @generate + def p_antiquotation_single (): + cmd = yield antiquotation0 + arg = yield enclose_nested0 (tok_open, tok_close) + return (token_antiquotation cmd None arg) + @generate + def p_antiquotation_general (): + cmd = yield spaces >> string ("@{") >> ((spaces >> ident00) ^ quoted_ident0) + opt = yield optional (spaces >> string ("[") >> (typ ^ quoted typ) << spaces << string ("]")) + arg = yield optional (spaces >> enclose_nested0 (tok_open, tok_close)) << spaces << string ("}") + return (token_antiquotation cmd opt arg) + + arg = yield syntax_pml.directive (many (p_antiquotation_general ^ p_antiquotation_single ^ p_ident)) + return arg + +directives = optional directive + +def parse_directive (line): + match (delim, [token_ident (var1)] + toks) in directives.parse line if delim == syntax_pml.delim_refine: + match ([token_antiquotation ('promela', ty, var2)] + toks) in toks: + var2 = var2 if var2 else var1 + else: + ty = None + var2 = var1 + if toks != []: + input_continue(f'Remaining parsed tokens: {toks}') + return (var1, ty if ty else (None, None), var2) + else: + return None + +## +# unparse + +unparse_cartouche = name -> tok_open + name + tok_close + +unparse_lambda = (s_fun, s_args) -> ' '.join ([s_fun] + s_args) + +unparse_lambda_cartouche = (s_fun, s_args) -> unparse_lambda (s_fun, list (map (unparse_cartouche, s_args))) + +unparse_annotation_cmd = (cmd, args) -> syntax_pml.unparse_annotation_cmd (unparse_lambda_cartouche (cmd, args)) + +def unparse_annotation (cmt_enclose, cmds_generic, subst_bloc, cmds): + yield + +## +# check + +def check_annotation0 (unparse_annotation, parse_commands, strip_find_annot, group_max, cmt_enclose, cmds_generic, subst_bloc, cmds): + input_continue ('Check on ML syntax not implemented') + +def check_annotation (*args): + check_annotation0 <*| (unparse_annotation, parse_commands) + args diff --git a/formal/promela/src/src/syntax_pml.coco b/formal/promela/src/src/syntax_pml.coco new file mode 100644 index 00000000..98360fbd --- /dev/null +++ b/formal/promela/src/src/syntax_pml.coco @@ -0,0 +1,396 @@ + ############################################################################## + # Syntax PML + # + # Copyright (C) 2019-2021 Trinity College Dublin (www.tcd.ie) + # + # All rights reserved. + # + # Redistribution and use in source and binary forms, with or without + # modification, are permitted provided that the following conditions are + # met: + # + # * Redistributions of source code must retain the above copyright + # notice, this list of conditions and the following disclaimer. + # + # * Redistributions in binary form must reproduce the above + # copyright notice, this list of conditions and the following + # disclaimer in the documentation and/or other materials provided + # with the distribution. + # + # * Neither the name of the copyright holders nor the names of its + # contributors may be used to endorse or promote products derived + # from this software without specific prior written permission. + # + # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ############################################################################## + +from src.library import * +from src.modules.comment_filter.comment_filter import language +from src.modules.comment_filter.comment_filter import rfc + +# + +from itertools import chain, groupby, islice, zip_longest +import os +from parsec import generate, many, many1, optional, regex, re, string # explicit listing for mypy +from parsec import * # mainly for importing operators + +# + +group_nth = (xs, n) -> zip_longest (* [islice xs i None n for i in range n]) + +# + +delim_comment = language.Lang (line_comment = ['//', '@@'], + comment_bookends = language.c.comment_bookends, + nested_comments = language.c.nested_comments) +delim_annot = '@' +delim_refine = 'refine' + +## +# annot + +command_default = binding -> binding +command_at = binding -> binding + '_at' +command_file_at = binding -> binding + '_file_at' +command_file = binding -> binding + '_file' + +binding_refine_uniform = 'refine_uniform' + +def find_not_annot (cmds, binding): + return list (filter ((cmd_args -> (cmd -> cmd != command_default binding and cmd != command_at binding and cmd != command_file_at binding and cmd != command_file binding) (cmd_args[0])), cmds)) + +discard_refine = cmds -> find_not_annot (mapi ((i_cmd_arg -> (i_cmd_arg[1][0], (i_cmd_arg[0], i_cmd_arg[1][1]))), cmds), binding_refine_uniform) + +def find_annot0 ((annots, dict_top_elem), binding): + mk_cmd0 = (pos3, key, args) -> ((pos3, key), args) + + def filter_annot0 (annots, binding): + for annot in annots: + for pos2, cmds in annot.items (): + cmds = [ mk_cmd0 (pos3, None, args) for (pos3, (cmd, args)) in cmds if cmd == command_default binding ] + if cmds: + yield (pos2, cmds) + + def filter_annot (annots, binding): + for _, cmds in filter_annot0 annots binding: + for ((pos3, _), args) in cmds: + yield (pos3, args) + + dic0 = { pos2 : cmds for pos2, cmds in filter_annot0 annots binding } + + def update_annot (pos2, cmd): + update_dic (dic0, pos2, [cmd]) + + def update_annot_key (args, f_cmd): + if args: + key = args[0][1] + if key in dict_top_elem: + update_annot (dict_top_elem[key], f_cmd (key, args[1:])) + else: + input_continue (f'Key error: {key} not in {dict_top_elem}') + else: + input_continue ('No arguments provided') + + for pos3, args in filter_annot (annots, command_at binding): + update_annot_key (args, mk_cmd0 $ pos3) + + def mk_cmd (pos3, key, args): + return mk_cmd0 (pos3, key, list (map ((dir_path -> (path -> (os.path.dirname path, fileinput_input0 path)) (os.path.abspath (dir_path[0] + '/' + dir_path[1]))), args))) + + for pos3, args in filter_annot (annots, command_file_at binding): + update_annot_key (args, mk_cmd $ pos3) + + for pos3, args in filter_annot (annots, command_file binding): + update_annot ((pos3[0], pos3[1]), mk_cmd pos3 None args) + + return dic0 + +def find_annot (annots, binding): + return {pos : list (map ((pos_cmds -> pos_cmds[1]), cmds)) for pos, cmds in find_annot0 (annots, binding).items ()} + +find_annot_bool = (annots, binding) -> (annots, binding) |*> find_annot |> bool +find_annot_list = (annots, binding) -> (annots, binding) |*> find_annot |> .values () |> flatten +find_annot_list1 = (annots, binding) -> (annots, binding) |*> find_annot_list |> map $ (map $ (dir_arg -> dir_arg[1])) +find_annot_list2 = (annots, binding) -> (annots, binding) |*> find_annot_list1 |> flatten |> list + +## +# parse + +data arg_pml +data arg_pml_file +data arg_annotation +data arg_annotation_file + +data cmt_multi_begin +data cmt_multi_begin_code +data cmt_multi_body +data cmt_multi_end +data cmt_single +data cmt_single_code + +data src_code (code) +data src_comment +data src_annotation +data src_directive (var_c, ty, var_pml) + +rfc.clear_line = line -> line + +def parse_file (lines): + parse_comment_state = rfc.State () + for line in fileinput_input (lines[1]) if lines[0] == arg_pml_file () else lines[1].splitlines True: + parse_comment_state.line = line + yield rfc.parse_line (delim_comment, parse_comment_state, code_only = False, keep_tokens = True) + +# + +split_cmt_bookends = (cmt, f) -> (map ((multi -> f (( cmt[: len (multi[0]) ] + , cmt[ len (multi[0]) :])) + if cmt.startswith (multi[0]) else None), + delim_comment.comment_bookends) + |> filter $ (x -> x != None) + |> flatten) + +def parse_comment1 (lines): + line_cpt = 1 + def gen_single (line_cpt, line): + ret_single = l -> (cmt_single (), l |> map $ (x -> (line_cpt, x)) |> list) + for code, cmt2, cmt3, cmt4 in group_nth (line, 4): + if cmt2: + l1 = len ([ cmt for cmt in delim_comment.line_comment if cmt2.startswith cmt ][0]) + l2 = - len (rfc.get_linesep cmt2) + yield ret_single ([ code + , cmt2 [: l1 ] + , cmt2 [ l1 : l2 ] if l2 != 0 else cmt2 [l1:] + , cmt2 [ l2 :] if l2 != 0 else '' ]) + elif cmt3 and cmt4: + yield ret_single (split_cmt_bookends (cmt3, (c3 -> [code, c3[0], c3[1], cmt4]))) + else: + yield (cmt_single_code (), [(line_cpt, code)]) + + def gen_multi_end (line_cpt, line): + return (cmt_multi_end (), ([(line_cpt, line[0])] if line[0] != '' else []) + [(line_cpt, line[1])]) + + for is_multi, lines in groupby (parse_file lines, key = (x -> len (x[1].multi_end_stack) >= 1)): + if is_multi: + line, _ = lines.__next__ () + def gen_multi_begin (line): + nonlocal line_cpt + yield from gen_single (line_cpt, line[2:-5]) + yield (cmt_multi_begin_code (), [(line_cpt, line[-5])]) + for line0 in split_cmt_bookends (line[-3], (x -> x)): + yield (cmt_multi_begin (), [(line_cpt, line0)]) + line_cpt += 1 + yield from gen_multi_begin line + for multi_body, lines in groupby (lines, key = (x -> len (x[0]) == 1)): + if multi_body: + for line, _ in lines: + yield (cmt_multi_body (), [(line_cpt, line[0])]) + line_cpt += 1 + else: + for line, _ in lines: + yield gen_multi_end (line_cpt, line) + yield from gen_multi_begin line + else: + line, _ = lines.__next__ () + yield gen_multi_end (line_cpt, line) + yield from gen_single (line_cpt, line[2:]) + line_cpt += 1 + for line, _ in lines: + yield from gen_single (line_cpt, line[2:]) + line_cpt += 1 + +def parse_comment0 (lines): + remove_pos = map $ (x -> x[1]) + flat_list = list <.. flatten + flat_rm = flat_list <.. remove_pos + + for is_single, lines in groupby (parse_comment1 lines, key = (x -> x[0] == cmt_single () or x[0] == cmt_single_code ())): + if is_single: + yield remove_pos lines + else: + lines0 = groupby (lines, key = (x -> x[0] == cmt_multi_end ())) + cmd1, line1 = lines0.__next__ () + if cmd1: + line_head = flat_rm line1 + else: + line1 = list line1 # WARNING: force one step of iteration before another __next__ operation + _, line2 = lines0.__next__ () + line_head = flat_rm line1 + flat_rm line2 + lines0 = map ((x -> remove_pos (x[1])), lines0) + yield [line_head] + [flat_list xs + flat_list (lines0.__next__ ()) for xs in lines0] + +def parse_comment (lines): + case lines: + match (arg_annotation (), annot): + yield (src_annotation (), annot) + match (arg_annotation_file (), annot): + yield (src_annotation (), fileinput_input0 annot) + else: + for xs in flatten (parse_comment0 lines): + if xs[0][1]: + yield (src_code (xs[0])) + if xs[1:]: + yield (src_comment (), (xs[1], xs[2:-1] if xs[2][1] else [], xs[-1])) + +def parse_comment_annot (cmt): + case cmt: + match (src_comment (), ((pos_line, _), cmt, _)): + cmt0 = map ((x -> x[1]), cmt) |> reduce $ (+) if cmt else '' + if cmt0 and cmt0.startswith delim_annot: + return (pos_line, cmt0.split (delim_annot, 1) [1]) + match (src_annotation (), cmt0): + return (1, cmt0) + return None + +def parse_annotations_directives (parse_directive, strip, dic): + def parse_line (lines): + for line in lines |> strip |> .split ('\n'): + cmd = parse_directive line + yield (src_directive <*| cmd if cmd else src_code line) + + return { pos2 : [ (pos3_arg, list <| flatten ([ parse_line lines for _, lines in cmds ]) ) + for pos3_arg, cmds in cmds0 ] + for pos2, cmds0 in dic.items () } + +def parse_annotations (parse_commands, hash_cmd_generic, lines, pos_file, pos_dir): + annot = {} + for cmt in parse_comment lines: + match (pos_line, cmds_generic, cmds) in parse_commands cmt: + update_dic (annot, + (pos_file, pos_line), + [(hash_cmd_generic, list <| map ((arg -> (pos_dir, arg)), cmds))] + if cmds_generic else + [(cmd0, list <| map ((arg -> (pos_dir, arg)), cmd1)) for cmd0, cmd1 in cmds]) + + return { (pos_file, pos_line) : list <| mapi ((i_cmd -> ((pos_file, pos_line, i_cmd[0]), i_cmd[1])), cmds) + for (pos_file, pos_line), cmds in annot.items () } + +def make_annotations (cmds_generic0, cmds0, pos_file, pos_line, pos_dir): + cmds = [(hash_cmd_generic, list <| map ((arg -> (pos_dir, arg)), cmds0))] if cmds_generic0 else [(cmd0, list <| map ((arg -> (pos_dir, arg)), cmd1)) for cmd0, cmd1 in cmds0] + return { (pos_file, pos_line) : list <| mapi ((i_cmd -> ((pos_file, pos_line, i_cmd[0]), i_cmd[1])), cmds) } + +# + +spaces = regex (r'\s*', re.MULTILINE) +ident = regex (r"[_'a-zA-Z][_'a-zA-Z0-9]*") + +@generate +def typ (): + @generate + def arity (): + arit = yield (spaces >> string (r'[') >> spaces >> regex (r"[0-9]+") << spaces << string (r']')) + return (int arit) + ty = yield ident + arity = yield (optional arity) + return (ty, arity) + +def directive (parse_arg): + @generate + def directive0 (): + name = yield spaces >> string (r'#') >> spaces >> ident + arg = yield parse_arg + return (name, arg) + return directive0 + +@generate +def directive_refine (): + @generate + def cmd_args (): + cmd = yield spaces >> ident + args = yield spaces >> regex (r'.+') + return (cmd, args) + delim, cmd_args = yield directive cmd_args + if delim == delim_refine: + return cmd_args + else: + return None + +directives_refine = optional directive_refine + +## +# unparse + +unparse_string = s -> (s.replace ('\\', r'\\') + .replace ('"', r'\"') + |> (s -> '"' + s + r'\n"')) + +unparse_annotation_cmd = toks -> delim_comment.line_comment[0] + delim_annot + ' ' + toks + +def unparse_source_directive (unparse_directive, lines): + lines_last = [] + for line in lines: + case line: + match src_directive (cvar, _, pvar): + lines0 = unparse_directive ((cvar, pvar)) + if lines0[0]: + for line0 in lines0[0]: + yield line0 + else: + input_continue ('Generating an empty line') + lines_last = lines_last + lines0[1] + match src_code (line): + yield line + for line in lines_last: + yield line + +def map_source0 (parse_comment, map_annotation, parse_commands, only_content, subst_lines, lines): + def map_annotation0 (command_ty, get_command): + yield from map_annotation ((src_annotation (), False), False, [], [(command_ty, get_command (()).split ('\n'))]) + def map_source_annotation (command_ty, get_command, map_comment, cmt_enclose, cmt): + nonlocal subst_lines + match (pos_cmt, cmds_generic, cmds) in parse_commands cmt: + exec_loop = True + subst_bloc = [] + while exec_loop: + match [(pos_line, subst_bloc)] + subst_lines0 in subst_lines: + if pos_line <= pos_cmt: + subst_lines = subst_lines0 + if pos_line != pos_cmt: + subst_bloc = [] + if pos_line >= pos_cmt: + exec_loop = False + else: + exec_loop = False + yield from map_annotation (cmt_enclose, cmds_generic, subst_bloc, cmds) + else: + if only_content: + yield from map_annotation0 (command_ty, get_command) + else: + yield from map_comment () + inline = cmt -> '\n' not in cmt + for src in parse_comment lines: + case src: + match (src_comment (), (cmt_begin, cmt, cmt_end)): + def map_comment (): + for _, toks in chain ((cmt_begin,), cmt, (cmt_end,)): + yield toks + yield from map_source_annotation ( 'comment' + , (_ -> ''.join (map ((pos_line -> pos_line[1]), [cmt_begin] + cmt + [cmt_end]))) + , map_comment + , (src_annotation (), all <| map (pos_line -> inline (pos_line[1]), cmt)) + if only_content else + (src_comment (), (cmt_begin, cmt_end)) + , src ) + match (src_annotation (), cmt): + def map_comment (): + yield cmt + yield from map_source_annotation ('annotation', (_ -> cmt), map_comment, (src_annotation (), inline cmt), src) + match (src_code ((_, code))): + if only_content and len code > 1 and code[-1] == '\n': + yield from map_annotation0 ('code', _ -> code[:-1]) + else: + yield code + +def map_source (*args): + yield from map_source0 <*| (parse_comment,) + args diff --git a/formal/promela/src/src/syntax_yaml.coco b/formal/promela/src/src/syntax_yaml.coco new file mode 100644 index 00000000..5ebeb7e2 --- /dev/null +++ b/formal/promela/src/src/syntax_yaml.coco @@ -0,0 +1,182 @@ + ############################################################################## + # Syntax YAML + # + # Copyright (C) 2019-2021 Trinity College Dublin (www.tcd.ie) + # + # All rights reserved. + # + # Redistribution and use in source and binary forms, with or without + # modification, are permitted provided that the following conditions are + # met: + # + # * Redistributions of source code must retain the above copyright + # notice, this list of conditions and the following disclaimer. + # + # * Redistributions in binary form must reproduce the above + # copyright notice, this list of conditions and the following + # disclaimer in the documentation and/or other materials provided + # with the distribution. + # + # * Neither the name of the copyright holders nor the names of its + # contributors may be used to endorse or promote products derived + # from this software without specific prior written permission. + # + # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ############################################################################## + +from src.library import * +from src import syntax_pml +from src.syntax_pml import src_code, src_comment, src_annotation, src_directive, arg_pml, arg_annotation + +# + +import json +import yaml + +## +# parse + +def safe_load (toks): + try: + dic = yaml.safe_load (toks) + except yaml.parser.ParserError: + input_continue (f'Parser error: {toks}') + dic = None + except yaml.scanner.ScannerError: + dic = None + return dic + +# + +def parse_commands (cmt): + match (pos_line, cmt0) in syntax_pml.parse_comment_annot cmt: + toks = safe_load cmt0 + if isinst_str toks: + return (pos_line, True, toks.split (' ') |> filter $ (x -> x.strip ()) |> map $ (x -> x.strip ()) |> list) + elif isinst_list (isinst_str, toks): + return (pos_line, True, toks) + elif isinst_list (isinst_dict $ ((isinst_str, isinst_list $ (toks -> isinst_str toks or isinst_list (isinst_str, toks)))), toks): + def parse (toks): + for tok in toks: + for cmd, args in tok.items (): + yield (cmd, ['\n'.join(arg) if isinstance (arg, list) else arg for arg in args]) + return (pos_line, False, [tok for tok in parse toks]) + else: + return None + else: + return None + +def parse_directive (line): + match (var_c, arg) in syntax_pml.directives_refine.parse line: + match [[ty_name, ty_arity], var_pml] in safe_load arg if isinst_list (isinst_str, [ty_name, var_pml]) and (ty_arity is None or isinstance (ty_arity, int)): + return (var_c, (ty_name, int (ty_arity) if ty_arity else None), var_pml) + else: + return None + else: + return None + +## +# unparse + +unparse_annotation_cmd = (cmd, args) -> syntax_pml.unparse_annotation_cmd (json.dumps ([{cmd : args}])) + +unparse_commands_inline = json.dumps # Note: Not expecting newlines in the output + +def unparse_command_refine (cmd): + def unparse_source_directive (lines): + for line in lines: + case line: + match src_code (code): + yield code + match src_directive (var_c, ty, var_pml): + yield (f'#refine {var_c} ' + unparse_commands_inline ([ty, var_pml])) + return (syntax_pml.command_at syntax_pml.binding_refine_uniform if cmd[0] else syntax_pml.binding_refine_uniform, + ([cmd[0]] if cmd[0] else []) + ['\n'.join (unparse_source_directive (cmd[1]))]) + +def unparse_annotation (cmt_enclose, cmds_generic, subst_bloc, cmds): + case cmt_enclose: + match (src_comment (), (cmt_begin, cmt_end)): + inline = cmt_begin[0] == cmt_end[0] + cmt_begin = cmt_begin[1] + syntax_pml.delim_annot + (' ' if inline else '\n') + cmt_end = cmt_end[1] + match (src_annotation (), inline): + cmt_begin = None + cmt_end = None + + if cmt_begin: + yield cmt_begin + if cmds_generic: + yield unparse_commands_inline cmds + else: + def unparse_commands (subst_bloc, cmds): + case (subst_bloc, cmds): + match ([(pos1, subst_cmd)] + subst_bloc0, [(cmd, (pos2, args))] + cmds0): + if pos1 < pos2: + subst_bloc = subst_bloc0 + yield unparse_command_refine subst_cmd + else: + cmds = cmds0 + yield (cmd, args) + yield from unparse_commands (subst_bloc, cmds) + match ([_] + _, []): + for _, subst_cmd in subst_bloc: + yield unparse_command_refine subst_cmd + match ([], [_] + _): + for cmd, (_, args) in cmds: + yield (cmd, args) + def split (arg): + arg0 = arg.split ('\n') + return arg if len arg0 <= 1 else arg0 + data0 = [{cmd: [split arg for arg in args]} for cmd, args in list (unparse_commands (subst_bloc, syntax_pml.discard_refine cmds))] + yield unparse_commands_inline data0 if inline else yaml.dump data0 + if cmt_end: + yield cmt_end + +## +# check + +def check_annotation0 (unparse_annotation, parse_commands, strip_find_annot, group_max, cmt_enclose, cmds_generic, subst_bloc, cmds): + def unparse_parse (cmt_enclose): + case cmt_enclose: + match (src_comment (), _): + arg_ty = arg_pml () + match (src_annotation (), _): + arg_ty = arg_annotation () + else: + input_continue ('Expecting a comment or an annotation') + cmds00 = list <| map (parse_commands, syntax_pml.parse_comment ((arg_ty, ''.join (unparse_annotation (cmt_enclose, cmds_generic, subst_bloc, cmds))))) + return None if any <| map ((cmd -> cmd is None), cmds00) else cmds00 + + cmds00 = unparse_parse cmt_enclose + match (None, (src_comment (), _)) in (cmds00, cmt_enclose): + # possibly in the situation where we were trying to parse nested C comments delimiters /* */, so we can try to parse again the content but this time without the outer /* */ delimiters + cmds00 = unparse_parse ((src_annotation (), False)) + case cmds00: + match [(_, True, cmds0)] if cmds_generic == True and cmds == cmds0: + pass + match [(_, False, cmds0)] if cmds_generic == False and syntax_pml.discard_refine cmds == syntax_pml.discard_refine cmds0: + pos_file = 0 + pos_line = 0 + + case group_max (syntax_pml.parse_annotations_directives $ parse_directive <*| strip_find_annot ([syntax_pml.make_annotations (False, cmds0, pos_file, pos_line, '')])): + match [(pos_file0, [(pos_line0, subst_bloc0)])] if pos_file == pos_file0 and pos_line == pos_line0 and subst_bloc == subst_bloc0: + pass + match [] if subst_bloc == []: + pass + else: + input_continue ('Expecting similar substitution annotation bloc') + else: + input_continue ('Expecting similar annotations') + +def check_annotation (*args): + check_annotation0 <*| (unparse_annotation, parse_commands) + args diff --git a/formal/promela/src/src/testgen.coco b/formal/promela/src/src/testgen.coco new file mode 100644 index 00000000..2a8a3397 --- /dev/null +++ b/formal/promela/src/src/testgen.coco @@ -0,0 +1,624 @@ + ############################################################################## + # TestGen + # + # Copyright (C) 2019-2021 Trinity College Dublin (www.tcd.ie) + # + # All rights reserved. + # + # Redistribution and use in source and binary forms, with or without + # modification, are permitted provided that the following conditions are + # met: + # + # * Redistributions of source code must retain the above copyright + # notice, this list of conditions and the following disclaimer. + # + # * Redistributions in binary form must reproduce the above + # copyright notice, this list of conditions and the following + # disclaimer in the documentation and/or other materials provided + # with the distribution. + # + # * Neither the name of the copyright holders nor the names of its + # contributors may be used to endorse or promote products derived + # from this software without specific prior written permission. + # + # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + ############################################################################## + +from src.library import * +from src import refine_command +from src import syntax_ml +from src.syntax_ml import token_ident, token_antiquotation +from src import syntax_pml +from src.syntax_pml import arg_pml, arg_pml_file, arg_annotation, arg_annotation_file, src_directive, binding_refine_uniform, find_annot, find_annot0, find_annot_bool, find_annot_list, find_annot_list1, find_annot_list2 +from src import syntax_yaml +from src.modules.promela_yacc.promela import ast +from src.modules.promela_yacc.promela import lex +from src.modules.promela_yacc.promela import yacc + +# + +import copy +import errno +from itertools import groupby +import os +import random +import re +import shutil +import subprocess +import sys +import tempfile +import traceback + +## +# parse + +def parse_refine_generic (l_refine_generic): + toks = ''.join (l_refine_generic) + dic = syntax_yaml.safe_load (toks) if toks else {} + if not syntax_yaml.isinst_dict ((syntax_yaml.isinst_str, syntax_yaml.isinst_str), dic): + input_continue (f'Type error: {dic}') + dic = {} + return dic + +## +# 'assert' analyses + +def tl (l): + if l: + _, *l = l + return l + else: + return [] + +def get_forest_ass (node): + if type node is ast.Assert: + return [ (node, ast.Assert (ast.Unary ('!', node.expr), pos = node.pos)) ] + else: + def get_forest_ass00 (cons_pair, dest_pair, node, cons_node): + ts_pred = [] + ts_succ = tl node + ts_res = [] + for tt0 in node: + t_t = dest_pair tt0 + ts_pred0 = list (reversed ts_pred) + ts_res = [ list (map ((msg_t_ass -> (cons_pair ((msg_t_ass[0], t_t[1])), cons_node (ts_pred0 + [cons_pair ((msg_t_ass[1], t_t[1]))] + ts_succ))), get_forest_ass (t_t[0]))) ] + ts_res + ts_pred = [cons_pair ((t_t[0], t_t[1]))] + ts_pred + ts_succ = tl ts_succ + return list (flatten (reversed ts_res)) + def get_forest_ass0 (node, cons_node): + return get_forest_ass00 ((t_t -> t_t[0]), (t_t -> (t_t, None)), node, cons_node) + def get_forest_ass0_proc_inl (cons_node): + return list (map ((msg_t_ass -> ((node.name, msg_t_ass[0]), msg_t_ass[1])), get_forest_ass0 ([node.body], cons_node <.. (node -> node[0])))) + + if type node is ast.Sequence: + return get_forest_ass0 (node, (node0 -> ast.Sequence (node0, context = node.context, is_option = node.is_option))) + elif type node is ast.Options: + return get_forest_ass0 (node.options, (node0 -> ast.Options (node.type, node0))) + elif type node is ast.Program: + return get_forest_ass00 ((t_t -> t_t), (t_t -> t_t), node, ast.Program) + elif isinstance (node, ast.Proctype) and not node.disable_negation: + return get_forest_ass0_proc_inl ((node0 -> ast.Proctype (node.name, node0, node.args, node.active, node.d_proc, node.priority, node.provided)) if type node is ast.Proctype else (node0 -> ast.Init (node.name, node0, node.args, node.active, node.d_proc, node.priority, node.provided))) + elif type node is ast.InlineDef and not node.disable_negation: + return get_forest_ass0_proc_inl (node0 -> ast.InlineDef (node.name, node.decl, node0)) + else: + return [] + +## +# main + +def fold_prog (args): + node_file = 0 + promela_parser = yacc.Parser (ast, lex.Lexer ()) + for arg_ty, arg in args: + parsed = [] + try: + case arg_ty: + match arg_pml (): + parsed = promela_parser.parse (arg, None) + match arg_pml_file (): + parsed = promela_parser.parse (None, arg) + except Exception: + input_continue_err (traceback.format_exc ()) + for node in parsed: + yield (node_file, node) + node_file += 1 + +def iter_files (iter_source, dict_refine_uniform0, sys_args): + arg_cpt = 0 + for lines in sys_args: + match [(pos_file, subst_lines)] + xs in dict_refine_uniform0 if pos_file == arg_cpt: + dict_refine_uniform0 = xs + else: + subst_lines = [] + iter_source (arg_cpt, subst_lines, lines) + arg_cpt += 1 + +def write_unlink (f): + tmp = tempfile.NamedTemporaryFile (delete = False) + try: + f tmp + finally: + os.unlink tmp.name + +def subprocess_exec (print_stdout, print_stderr, exec_name, exec_args): + exec_cmd = [exec_name] + exec_args + try: + with subprocess.Popen (exec_cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines = True) as proc: + stdout, stderr = proc.communicate () + returncode = proc.returncode + except Exception as e: + stdout = '' + stderr = traceback.format_exc () + f'{exec_name}: ' + ('command not found' if isinstance (e, OSError) and e.errno == errno.ENOENT else 'Subprocess error') + if print_stderr: + returncode = 1 + else: + input_continue_err stderr + returncode = 0 + + print ('+', ' '.join (exec_cmd)) + p_stdout = print_stdout and stdout + p_stderr = print_stderr and stderr + if p_stdout: + print (stdout.strip ()) + if p_stdout and p_stderr: + print ('+') + if p_stderr: + print (stderr.strip ()) + + if returncode != 0: + input_continue_warn (f'return code = {returncode}') + return stdout + +def mkdir (dir0): + try: + os.makedirs (dir0, exist_ok = True) + except FileExistsError: + input_continue (f'File exists and is not a directory: {dir0}') + except NotADirectoryError: + input_continue (f'Not a directory for an ancestor: {dir0}') + +def copy_i (src, dst): + if os.path.exists dst: + src_cts = fileinput_input0 src + dst_cts = fileinput_input0 dst + print ('Content of source file:') + print src_cts + if src_cts == dst_cts: + print (f'Destination file exists with the same content: {dst}') + else: + print ('Content of destination file:') + print dst_cts + input_continue (f'Destination file exists with a different content: {dst}') + else: + # note: in a concurrent setting, the type and content of dst could have been changed meanwhile + try: + shutil.copy (src, dst) + except NotADirectoryError: + input_continue (f'Not a directory: {os.path.dirname dst}\n(parent of {os.path.basename dst})') + except FileNotFoundError: + input_continue (f'Either not a file: {src}\n or not a directory: {os.path.dirname dst}\n(parent of {os.path.basename dst})') + except IsADirectoryError: + input_continue (f'Is a directory: {src}') + +def partition_pml_ast (f_ty, promelas): + promelas_ty, promelas_no_ty = [], [] + for pos_file, (promela, pos_line) in promelas: + (promelas_ty if f_ty promela else promelas_no_ty).append((pos_file, (promela, pos_line))) + + return (ast.Program (map ((promela -> promela[1]), promelas_no_ty)).__str__ ().encode (), promelas_no_ty, promelas_ty) + +def split_arg (split_arg0, l): + for fic in l: + match [args] :: _ in split_arg0 fic if args: + for arg in args: + yield arg + else: + yield fic + +export_yml_ext = (dir0, fic) -> dir0 + fic + '.yml' if fic else dir0 + +class read_eval_print: + def __init__ (self, seed, argv, input_output_yaml): + random.seed seed + self.hash_cmd_uniform = random.random ().__str__ () + self.hash_cmd_generic = random.random ().__str__ () + self.cwd = os.getcwd () + self.parse_commands = syntax_yaml.parse_commands if input_output_yaml else syntax_ml.parse_commands + self.parse_directive = syntax_yaml.parse_directive if input_output_yaml else syntax_ml.parse_directive + self.unparse_annotation_cmd = syntax_yaml.unparse_annotation_cmd if input_output_yaml else syntax_ml.unparse_annotation_cmd + + options_pml = ['-p', '--promela'] + options0 = ['-a', '--annotation'] + options_pml + options_annotation_file = ['-af', '--annotation_file'] + options = options_annotation_file + options0 + def map_arg (l): + case l: + match [option, cts] + xs if option in options0 or option in options_annotation_file: + yield ( arg_pml () if option in options_pml else + arg_annotation () if option in options0 else + arg_annotation_file () + , cts) + yield from map_arg xs + match [fic] + xs: + if fic.find ('-') == 0: + if fic in options: + input_continue (f'Missing parameter: {fic} {xs}') + else: + input_continue (f'Unknown option: {fic} {xs}') + elif os.path.isfile fic: + yield (arg_pml_file (), fic) + else: + input_continue (f'Expecting an existing file: {fic}') + yield from map_arg xs + def split_arg0 (fic): + for opt in options: + match [''] + args in fic.split opt: + arg = opt.join args + yield [opt, arg] if arg else None + self.argv = list (map_arg (list (split_arg (split_arg0, argv)))) + + ## + # printf insertion + + def insert_printf_node (self, node, node_args, node_pos, no_atomic, print_first): + printf = ast.Printf <| ast.Node () + dict_print_ty = { 'int' : '%d' } + print_default = ('%d', '0', 1) + proc_args = list (map ((x -> ((dict_print_ty[x[1][0]], x[1][1] if x[1][1] else 1) if x[1][0] in dict_print_ty else (None, print_default[2]), x[0])), node_args)) + printf.s = self.unparse_annotation_cmd (self.hash_cmd_uniform, list (map (str, [node_pos[0], node_pos[1]])) + list (map ((x -> map ((_ -> x[0][0]), range (x[0][1])) if x[0][0] else [print_default[0]]), proc_args) |> flatten)) |> syntax_pml.unparse_string + + if proc_args: + printf.args = ast.Sequence (map ((x -> (map ((index -> ast.VarRef (x[1], index = index)), range (x[0][1])) if x[0][1] > 1 else [ast.VarRef (x[1])]) if x[0][0] else [ast.VarRef (print_default[1])]), proc_args) |> flatten) + atomic = 'atomic' + + def insert_printf_node0 (): + sequence = ast.Sequence ([printf] + node.body if print_first else node.body + [printf]) + if not no_atomic: + sequence.context = atomic + node.body = sequence + match [seq] in node.body: + if type seq is ast.Sequence: + if not no_atomic and seq.context != atomic: + seq.context = atomic + if print_first: + seq.insert 0 printf + else: + seq.append printf + else: + insert_printf_node0 () + else: + insert_printf_node0 () + return (node.name, proc_args) + + ## + # + + def init_top_elems (self, programs, sys_args, l_check_unparse_parse = False): + annots = [ syntax_pml.parse_annotations + (self.parse_commands, + self.hash_cmd_generic, + (arg_ty, arg_cts), + pos_file, + os.path.dirname (os.path.abspath arg_cts) if arg_ty == arg_pml_file () or arg_ty == arg_annotation_file () else self.cwd) + for (pos_file, (arg_ty, arg_cts)) in mapi (args -> args, sys_args) ] + dict_top_elem = {} + + for (node_file, (node, node_line)) in programs: + if isinstance (node, ast.Proctype) or type node is ast.InlineDef or type node is ast.LTL: + dict_top_elem[node.name] = (node_file, node_line) + + def check_binding (binding): + dic = find_annot ((annots, dict_top_elem), binding) + return (pos -> pos in dic) + + binding_no_printf = 'no_printf' + check_no_printf = check_binding binding_no_printf + binding_no_atomic = 'no_atomic' + check_no_atomic = check_binding binding_no_atomic + binding_print_first = 'print_first' + check_print_first = check_binding binding_print_first + binding_disable_negation = 'disable_negation' + check_disable_negation = check_binding binding_disable_negation + + l_refine_uniform_strip = find_annot_bool ((annots, dict_top_elem), 'refine_uniform_strip') + def strip (s): + if l_refine_uniform_strip: + l = s.split ('\n') + while l and l[0].strip () == '': + l = l[1:] + return '\n'.join (l).rstrip () + else: + return s + + dict_refine_uniform0 = syntax_pml.parse_annotations_directives (self.parse_directive, strip, find_annot0 ((annots, dict_top_elem), binding_refine_uniform)) + dict_refine_uniform = { pos2 : cmds |> map $ (pos_cmds -> pos_cmds[1]) |> flatten |> list + for pos2, cmds in dict_refine_uniform0.items () } + l_refine_uniform_force = find_annot_bool ((annots, dict_top_elem), 'refine_uniform_force') + l_check_unparse_parse = l_check_unparse_parse or find_annot_bool ((annots, dict_top_elem), 'check_unparse_parse') + l_export_input_yaml = find_annot_list2 ((annots, dict_top_elem), 'export_input_yaml') + + if l_check_unparse_parse or l_export_input_yaml: + def group_max (dict_refine_uniform0): + def group (f_next, xs): + for key, xs in groupby (xs, key = (val -> val[0][0])): + yield (key, list <| f_next (map ((val -> (val[0][1:], val[1])), xs))) + return list (sorted (dict_refine_uniform0.values () + |> flatten + |> map $ (pos4_lines -> (pos4_lines[0][0], (pos4_lines[0][1], pos4_lines[1]))), + key = (pos_lines -> pos_lines[0])) + |> group $ (group $ (map $ (val -> (val[0][0], val[1]))))) + dict_refine_uniform0 = group_max dict_refine_uniform0 + if l_check_unparse_parse: + def check_source (arg_cpt, subst_lines, lines): + def check_annotation (*args): + args0 = ((annots -> (strip, find_annot0 ((annots, dict_top_elem), binding_refine_uniform))), group_max) + args + syntax_ml.check_annotation <*| args0 + syntax_yaml.check_annotation <*| args0 + yield + consume <| syntax_pml.map_source (check_annotation, self.parse_commands, False, subst_lines, lines) + iter_files (check_source, dict_refine_uniform0, sys_args) + else: + dict_refine_uniform0 = None + + procs_args = {} + for (node_file, (node, node_line)) in programs: + type_inst_proctype = isinstance (node, ast.Proctype) + type_is_ltl = type node is ast.LTL + if type_inst_proctype or type node is ast.InlineDef or type_is_ltl: + node_pos = (node_file, node_line) + + if check_no_printf node_pos: + del dict_top_elem[node.name] + elif not type_is_ltl: + node.disable_negation = check_disable_negation node_pos + node_in_dict_refine = node_pos in dict_refine_uniform + if node_in_dict_refine or l_refine_uniform_force: + node_args = {} + def add_dic (node_args, name, ty_arit): + if name in node_args: + input_continue (f'Duplicated parameters: {name}') + node_args[name] = ty_arit + return node_args + + if type_inst_proctype: + for arg in node.args if node.args else []: + node_args = add_dic (node_args, arg[0].name, (arg[0].type, None)) + else: + for name in node.decl if node.decl else []: + node_args = add_dic (node_args, name, (None, None)) + + node_args0 = { name : False for name in node_args.keys () } + + if node_in_dict_refine: + for src in dict_refine_uniform[node_pos]: + match src_directive (_, ty_arit, name) in src: + if name in node_args: + def redef_direc (msg): + if name not in node_args0 or node_args0[name]: + input_continue ('Redefinition of directive' + msg) + msg0 = f': Overwriting the old type ({node_args[name]}) for {name} with {ty_arit}' + if node_args[name] != (None, None): + if ty_arit != (None, None) and node_args[name] != ty_arit: + input_continue ('Type error' + msg0) + overwrite = True + else: + redef_direc ('') + overwrite = False + elif ty_arit != (None, None): + redef_direc (msg0) + overwrite = True + else: + redef_direc ('') + overwrite = False + else: + overwrite = True + node_args0[name] = True + if overwrite: + node_args[name] = ty_arit + + if node_pos in procs_args: + input_continue (f'Duplicated positions: {node_pos}') + procs_args[node_pos] = self.insert_printf_node (node, node_args.items (), node_pos, check_no_atomic node_pos, check_print_first node_pos) + return (programs, (annots, dict_top_elem), procs_args, dict_refine_uniform0, dict_refine_uniform, l_refine_uniform_force, l_check_unparse_parse, l_export_input_yaml) + + ## + # + + def write_promela (self, (promelas, annots, procs_args, dict_refine_uniform0, dict_refine_uniform, l_refine_uniform_force, l_check_unparse_parse, l_export_input_yaml)): + def exec_input (print_stdout, print_stderr, binding, args): + for cmd in find_annot_list1 (annots, binding): + cmd = list cmd + if cmd: + yield subprocess_exec (print_stdout, print_stderr, cmd[0], cmd[1:] + args) + else: + input_continue (f'Empty command: {binding}') + + promelas_no_ltl_str, promelas_no_ltl, promelas_ltl = partition_pml_ast ((promela -> type promela is ast.LTL), promelas) + l_refine_uniform_log = find_annot_bool (annots, 'refine_uniform_log') + l_enclose = find_annot_list1 (annots, 'enclose') |> map $ (args -> (args[0], args[1])) |> list + l_enclose_uniform = find_annot_list1 (annots, 'enclose_uniform') |> map $ (args -> (args[0], args[1])) |> list + l_export_dir = find_annot_list (annots, 'export_dir') |> flatten |> map $ (args -> args[0] + '/' + args[1]) |> list + l_export_pml = find_annot_list2 (annots, 'export_pml') + l_export_trail = find_annot_list2 (annots, 'export_trail') + l_export_code = find_annot_list2 (annots, 'export_code') + l_refine_generic = find_annot_list2 (annots, 'refine_generic') + dic_refine_generic = parse_refine_generic l_refine_generic + l_export_trail_num = find_annot_list2 (annots, 'export_trail_num') + + if l_export_dir: + def copy_cat0 (ltl_dirname): + def copy_cat (src, dir1, dst): + src.close () + for dir0 in l_export_dir: + dir0 = dir0 + '/' + ltl_dirname + ('/' + '/'.join (dir1) if dir1 else '') + mkdir dir0 + copy_i (src.name, dir0 + '/' + ''.join (dst)) + return copy_cat + else: + def copy_cat0 (ltl_dirname): + def copy_cat (src, dir1, dst): + src.close () + subprocess_exec (True, True, 'cat', [src.name]) + return copy_cat + + if l_export_input_yaml: + l_export_input_only_content = find_annot_bool (annots, 'export_input_only_content') + for dir_yaml in l_export_input_yaml: + copy_cat = copy_cat0 (dir_yaml) + def write_source (arg_cpt, subst_lines, lines): + def write_unlink0 (fic): + for toks in syntax_pml.map_source (syntax_yaml.unparse_annotation, self.parse_commands, l_export_input_only_content, subst_lines, lines): + fic.write (toks.encode ()) + copy_cat (fic, [], [export_yml_ext ('', str arg_cpt)]) + write_unlink (write_unlink0) + iter_files (write_source, dict_refine_uniform0, self.argv) + + pml_name = 'pan.pml' + if l_export_trail_num: + add_num = (num, l) -> list <| map (fic -> fic + num, l) + trails = list <| map ((num -> (pml_name + num + '.trail', add_num (num, l_export_trail), add_num (num, l_export_code))), l_export_trail_num) + model_checker_exec = 'model_checker_exec_all' + else: + trails = [(pml_name + '.trail', l_export_trail, l_export_code)] + model_checker_exec = 'model_checker_exec_one' + + with tempfile.TemporaryDirectory () as dir_test: + def write_promela0 (ltl_cpt, write_promela_fic, get_dirname, ltl_pos, ltl_name): + def write_unlink0 (ltl_dirname0, fic_pml): + ltl_dirname = get_dirname ltl_dirname0 + copy_cat = copy_cat0 ltl_dirname + def copy_cat_or_close (src, dir1, dst): + if dst: + copy_cat (src, dir1, dst) + else: + src.close () + dir_test_ltl = dir_test + '/' + ltl_dirname + mkdir dir_test_ltl + write_promela_fic fic_pml + copy_cat_or_close (fic_pml, [], l_export_pml) + os.symlink (fic_pml.name, dir_test_ltl + '/' + pml_name) + os.chdir dir_test_ltl + + consume <| exec_input (True, True, 'model_checker_verifier', [pml_name]) + consume <| exec_input (True, True, 'model_checker_compile', []) + pattern_vectorsz = 'VECTORSZ' + warn_lines = [ line for lines in exec_input (True, True, model_checker_exec, []) for line in lines.splitlines () if pattern_vectorsz in line ] + if warn_lines: + for line in warn_lines: + print line + input_continue_warn ('A trail file has been generated, but it might be not yet exploitable. To obtain a possibly different trail file with more information, one can try increase "'+ pattern_vectorsz +'" (using e.g. "gcc -D'+ pattern_vectorsz +'=4096" or higher values).') + + for trail_name, l_export_trail, l_export_code in trails: + if os.path.exists trail_name: + trail = self.unparse_annotation_cmd ('add_cmd', [self.hash_cmd_uniform]) + '\n' + ''.join (exec_input (False, True, 'model_checker_trail', [trail_name, pml_name])) + if l_export_trail: + def write_unlink01 (fic_trail): + fic_trail.write (trail.encode ()) + copy_cat (fic_trail, [], l_export_trail) + write_unlink write_unlink01 + + l_refine_uniform = dict_refine_uniform or l_refine_uniform_force + if l_refine_uniform or l_refine_generic: + _, annots_trail, _, _, _, _, _, _ = self.init_top_elems ([], [(arg_pml (), trail)], l_check_unparse_parse) + def write_c (refine_line, l_refine_other, dir0): + def write_unlink00 (fic_c): + def write_nl (lines): + for line in lines: + if line: + fic_c.write (line.encode ()) + fic_c.write ('\n'.encode ()) + def write_nl_dic (name, dic, expand_directive): + if l_refine_uniform_log: + write_nl ([self.unparse_annotation_cmd ('promela_term', [name])]) + write_nl (syntax_pml.unparse_source_directive (expand_directive, dic)) + if l_refine_uniform_log: + write_nl (['']) + write_nl (map (x -> x[0], l_enclose)) + refine_line (find_annot_list1 $ annots_trail, write_nl, write_nl_dic) + write_nl (map (x -> x[1], reversed l_enclose)) + copy_cat_or_close (fic_c, [dir0] if l_refine_other else [], l_export_code) + write_unlink write_unlink00 + + if l_refine_uniform: + def refine_line (find_annot_l, write_nl, write_nl_dic): + write_nl (map (x -> x[0], l_enclose_uniform)) + for spin_atomic in find_annot_l self.hash_cmd_uniform: + pos_proc = (int (spin_atomic[0]), int (spin_atomic[1])) + c_print = dict_refine_uniform[pos_proc] if pos_proc in dict_refine_uniform else [] + proc_name, proc_args = procs_args[pos_proc] + spin_atomic = spin_atomic[2:] + dict_args = {} + for (ty, alias_pml) in proc_args: + dict_args[alias_pml] = (ty[0], ty[1], spin_atomic[: ty[1] ]) + spin_atomic = spin_atomic[ ty[1] :] + # note: from this point, spin_atomic should have been fully consumed + def expand_directive (cvar_pvar): + # note: cvar_pvar[1] is in dict_args + ty_val = dict_args[cvar_pvar[1]] + if ty_val[0] == None: + input_continue (f'Not serializable type: printing the content of {cvar_pvar[1]} with a default value') + pos = 'pos' + concat_str = '_' + def def_name (nb): + return '{}{}{}'.format (cvar_pvar[0], concat_str, nb) + def define_undef (pat, var, *args): + return (('#define ' + pat).format (var, *args), '#undef ' + var) + + if ty_val[1] == 1: + lines = [define_undef ('{} {}', cvar_pvar[0], ty_val[2][0])] + else: + lines = [define_undef ('{} ""', def_name 0)] + list (map ((ind_val -> define_undef ('{} {} " {}"', def_name (ind_val[0] + 1), def_name (ind_val[0]), ind_val[1])), zip (range (ty_val[1]), ty_val[2]))) + [define_undef ('{}({}) {}{} ## {}', cvar_pvar[0], pos, cvar_pvar[0], concat_str, pos)] + lines = list (zip (* lines)) + return (list (lines[0]), list (lines[1])) + write_nl_dic (proc_name, c_print, expand_directive) + if ltl_pos in dict_refine_uniform: + write_nl_dic (ltl_name, dict_refine_uniform[ltl_pos], (_ -> ([], []))) + write_nl (map (x -> x[1], reversed l_enclose_uniform)) + write_c (refine_line, l_refine_generic, 'uniform') + + if l_refine_generic: + def refine_line (find_annot_l, write_nl, write_nl_dic): + cmd = refine_command.command dic_refine_generic + for spin_atomic in find_annot_l self.hash_cmd_generic: + cmd.refineSPINLine_main (list spin_atomic) + write_nl (''.join (cmd.test_body ()).splitlines ()) # LTL support for the 'generic' style is not yet supported. The writing would have to be performed inside this instruction. + write_c (refine_line, l_refine_uniform, 'generic') + else: + input_continue (f'No trail file for {fic_pml.name} ({dir_test_ltl}/{pml_name})') + write_unlink (write_unlink0 $ (str ltl_cpt)) + return ltl_cpt + 1 + + ltl_cpt = 0 + for ltl_pos_file, (ltl, ltl_pos_line) in promelas_ltl: + def write_promela_fic (fic_pml): + fic_pml.write promelas_no_ltl_str + fic_pml.write (ast.LTL (ast.Unary ('!', ltl.formula), name = ltl.name).__str__ ().encode ()) + ltl_cpt = write_promela0 (ltl_cpt, write_promela_fic, (ltl_dirname0 -> 'ltl' + '_' + (ltl.name if ltl.name else ltl_dirname0)), (ltl_pos_file, ltl_pos_line), ltl.name if ltl.name else ltl.formula.__str__ ()) + + promelas_ass = get_forest_ass (ast.Program (map ((promela -> (promela[1][0], (promela[0], promela[1][1]))), promelas_no_ltl))) + dic_dirname = {} + for ((ltl_name, _), _), _ in promelas_ass: + dic_dirname[ltl_name] = dic_dirname[ltl_name] + 1 if ltl_name in dic_dirname else 0 + for ((ltl_name, ltl), (ltl_pos_file, _)), promelas_ass0 in promelas_ass: + def write_promela_fic (fic_pml): + fic_pml.write (promelas_ass0.__str__ ().encode ()) + ltl_cpt = write_promela0 (ltl_cpt, write_promela_fic, (ltl_dirname0 -> 'assert' + '_' + ('' if dic_dirname[ltl_name] == 0 else ltl_dirname0 + '_') + ltl_name), (ltl_pos_file, ltl.pos), ltl.expr.__str__ ()) + os.chdir self.cwd + +def main (*args): + st = read_eval_print (*args) + st.write_promela (st.init_top_elems (list (fold_prog st.argv), st.argv)) |