305 lines
11 KiB
Python
305 lines
11 KiB
Python
|
# Copyright (c) 2016 Comcast Cable Communications Management, LLC.
|
||
|
#
|
||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
# you may not use this file except in compliance with the License.
|
||
|
# You may obtain a copy of the License at:
|
||
|
#
|
||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
#
|
||
|
# Unless required by applicable law or agreed to in writing, software
|
||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
# See the License for the specific language governing permissions and
|
||
|
# limitations under the License.
|
||
|
|
||
|
# Generate .siphon source fragments for later processing
|
||
|
|
||
|
import logging
|
||
|
import os, sys, re, json
|
||
|
|
||
|
"""List of (regexp, siphon_name) tuples for matching the start of C
|
||
|
initializer blocks in source files. Each siphon class registers
|
||
|
themselves on tihs list."""
|
||
|
siphon_patterns = []
|
||
|
|
||
|
class Generate(object):
|
||
|
"""Matches a siphon comment block start"""
|
||
|
siphon_block_start = re.compile("^\s*/\*\?\s*(.*)$")
|
||
|
|
||
|
"""Matches a siphon comment block stop"""
|
||
|
siphon_block_stop = re.compile("^(.*)\s*\?\*/\s*$")
|
||
|
|
||
|
"""Siphon block directive delimiter"""
|
||
|
siphon_block_delimiter = "%%"
|
||
|
|
||
|
"""Matches a siphon block directive such as
|
||
|
'%clicmd:group_label Debug CLI%'"""
|
||
|
siphon_block_directive = re.compile("(%s)\s*([a-zA-Z0-9_:]+)\s+(.*)\s*(%s)" % \
|
||
|
(siphon_block_delimiter, siphon_block_delimiter))
|
||
|
|
||
|
"""Matches the start of an initializer block"""
|
||
|
siphon_initializer = re.compile("\s*=")
|
||
|
|
||
|
"""Collated output for each siphon"""
|
||
|
output = None
|
||
|
|
||
|
"""Directory prefix to strip from input filenames to keep things tidy."""
|
||
|
input_prefix = None
|
||
|
|
||
|
"""List of known siphons"""
|
||
|
known_siphons = None
|
||
|
|
||
|
"""Logging handler"""
|
||
|
log = None
|
||
|
|
||
|
|
||
|
def __init__(self, output_directory, input_prefix):
|
||
|
super(Generate, self).__init__()
|
||
|
self.log = logging.getLogger("siphon.generate")
|
||
|
|
||
|
# Build a list of known siphons
|
||
|
self.known_siphons = []
|
||
|
for item in siphon_patterns:
|
||
|
siphon = item[1]
|
||
|
if siphon not in self.known_siphons:
|
||
|
self.known_siphons.append(siphon)
|
||
|
|
||
|
# Setup information for siphons we know about
|
||
|
self.output = {}
|
||
|
for siphon in self.known_siphons:
|
||
|
self.output[siphon] = {
|
||
|
"file": "%s/%s.siphon" % (output_directory, siphon),
|
||
|
"global": {},
|
||
|
"items": [],
|
||
|
}
|
||
|
|
||
|
self.input_prefix = input_prefix
|
||
|
|
||
|
|
||
|
"""
|
||
|
count open and close braces in str
|
||
|
return (0, index) when braces were found and count becomes 0.
|
||
|
index indicates the position at which the last closing brace was
|
||
|
found.
|
||
|
return (-1, -1) if a closing brace is found before any opening one.
|
||
|
return (count, -1) if not all opening braces are closed, count is the
|
||
|
current depth
|
||
|
"""
|
||
|
def count_braces(self, str, count=0, found=False):
|
||
|
for index in range(0, len(str)):
|
||
|
if str[index] == '{':
|
||
|
count += 1;
|
||
|
found = True
|
||
|
elif str[index] == '}':
|
||
|
if count == 0:
|
||
|
# means we never found an open brace
|
||
|
return (-1, -1)
|
||
|
count -= 1;
|
||
|
|
||
|
if count == 0 and found:
|
||
|
return (count, index)
|
||
|
|
||
|
return (count, -1)
|
||
|
|
||
|
def parse(self, filename):
|
||
|
# Strip the current directory off the start of the
|
||
|
# filename for brevity
|
||
|
if filename[0:len(self.input_prefix)] == self.input_prefix:
|
||
|
filename = filename[len(self.input_prefix):]
|
||
|
if filename[0] == "/":
|
||
|
filename = filename[1:]
|
||
|
|
||
|
# Work out the abbreviated directory name
|
||
|
directory = os.path.dirname(filename)
|
||
|
if directory[0:2] == "./":
|
||
|
directory = directory[2:]
|
||
|
elif directory[0:len(self.input_prefix)] == self.input_prefix:
|
||
|
directory = directory[len(self.input_prefix):]
|
||
|
if directory[0] == "/":
|
||
|
directory = directory[1:]
|
||
|
|
||
|
# Open the file and explore its contents...
|
||
|
self.log.info("Siphoning from %s." % filename)
|
||
|
directives = {}
|
||
|
with open(filename) as fd:
|
||
|
siphon = None
|
||
|
close_siphon = None
|
||
|
siphon_block = ""
|
||
|
in_block = False
|
||
|
line_num = 0
|
||
|
siphon_line = 0
|
||
|
|
||
|
for line in fd:
|
||
|
line_num += 1
|
||
|
str = line[:-1] # filter \n
|
||
|
|
||
|
"""See if there is a block directive and if so extract it"""
|
||
|
def process_block_directive(str, directives):
|
||
|
m = self.siphon_block_directive.search(str)
|
||
|
if m is not None:
|
||
|
k = m.group(2)
|
||
|
v = m.group(3).strip()
|
||
|
directives[k] = v
|
||
|
# Return only the parts we did not match
|
||
|
return str[0:m.start(1)] + str[m.end(4):]
|
||
|
|
||
|
return str
|
||
|
|
||
|
def process_block_prefix(str):
|
||
|
if str.startswith(" * "):
|
||
|
str = str[3:]
|
||
|
elif str == " *":
|
||
|
str = ""
|
||
|
return str
|
||
|
|
||
|
if not in_block:
|
||
|
# See if the line contains the start of a siphon doc block
|
||
|
m = self.siphon_block_start.search(str)
|
||
|
if m is not None:
|
||
|
in_block = True
|
||
|
t = m.group(1)
|
||
|
|
||
|
# Now check if the block closes on the same line
|
||
|
m = self.siphon_block_stop.search(t)
|
||
|
if m is not None:
|
||
|
t = m.group(1)
|
||
|
in_block = False
|
||
|
|
||
|
# Check for directives
|
||
|
t = process_block_directive(t, directives)
|
||
|
|
||
|
# Filter for normal comment prefixes
|
||
|
t = process_block_prefix(t)
|
||
|
|
||
|
# Add what is left
|
||
|
siphon_block += t
|
||
|
|
||
|
# Skip to next line
|
||
|
continue
|
||
|
|
||
|
else:
|
||
|
# Check to see if we have an end block marker
|
||
|
m = self.siphon_block_stop.search(str)
|
||
|
if m is not None:
|
||
|
in_block = False
|
||
|
t = m.group(1)
|
||
|
else:
|
||
|
t = str
|
||
|
|
||
|
# Check for directives
|
||
|
t = process_block_directive(t, directives)
|
||
|
|
||
|
# Filter for normal comment prefixes
|
||
|
t = process_block_prefix(t)
|
||
|
|
||
|
# Add what is left
|
||
|
siphon_block += t + "\n"
|
||
|
|
||
|
# Skip to next line
|
||
|
continue
|
||
|
|
||
|
|
||
|
if siphon is None:
|
||
|
# Look for blocks we need to siphon
|
||
|
for p in siphon_patterns:
|
||
|
if p[0].match(str):
|
||
|
siphon = [ p[1], str + "\n", 0 ]
|
||
|
siphon_line = line_num
|
||
|
|
||
|
# see if we have an initializer
|
||
|
m = self.siphon_initializer.search(str)
|
||
|
if m is not None:
|
||
|
# count the braces on this line
|
||
|
(count, index) = \
|
||
|
self.count_braces(str[m.start():])
|
||
|
siphon[2] = count
|
||
|
# TODO - it's possible we have the
|
||
|
# initializer all on the first line
|
||
|
# we should check for it, but also
|
||
|
# account for the possibility that
|
||
|
# the open brace is on the next line
|
||
|
#if count == 0:
|
||
|
# # braces balanced
|
||
|
# close_siphon = siphon
|
||
|
# siphon = None
|
||
|
else:
|
||
|
# no initializer: close the siphon right now
|
||
|
close_siphon = siphon
|
||
|
siphon = None
|
||
|
else:
|
||
|
# See if we should end the siphon here - do we have
|
||
|
# balanced braces?
|
||
|
(count, index) = self.count_braces(str,
|
||
|
count=siphon[2], found=True)
|
||
|
if count == 0:
|
||
|
# braces balanced - add the substring and
|
||
|
# close the siphon
|
||
|
siphon[1] += str[:index+1] + ";\n"
|
||
|
close_siphon = siphon
|
||
|
siphon = None
|
||
|
else:
|
||
|
# add the whole string, move on
|
||
|
siphon[2] = count
|
||
|
siphon[1] += str + "\n"
|
||
|
|
||
|
if close_siphon is not None:
|
||
|
# Write the siphoned contents to the right place
|
||
|
siphon_name = close_siphon[0]
|
||
|
|
||
|
# Copy directives for the file
|
||
|
details = {}
|
||
|
for key in directives:
|
||
|
if ":" in key:
|
||
|
(sn, label) = key.split(":")
|
||
|
if sn == siphon_name:
|
||
|
details[label] = directives[key]
|
||
|
else:
|
||
|
details[key] = directives[key]
|
||
|
|
||
|
# Copy details for this block
|
||
|
details['file'] = filename
|
||
|
details['directory'] = directory
|
||
|
details['line_start'] = siphon_line
|
||
|
details['line_end'] = line_num
|
||
|
details['siphon_block'] = siphon_block.strip()
|
||
|
details["block"] = close_siphon[1]
|
||
|
|
||
|
# Store the item
|
||
|
self.output[siphon_name]['items'].append(details)
|
||
|
|
||
|
# All done
|
||
|
close_siphon = None
|
||
|
siphon_block = ""
|
||
|
|
||
|
# Update globals
|
||
|
for key in directives.keys():
|
||
|
if ':' not in key:
|
||
|
continue
|
||
|
|
||
|
if filename.endswith("/dir.dox"):
|
||
|
# very special! use the parent directory name
|
||
|
l = directory
|
||
|
else:
|
||
|
l = filename
|
||
|
|
||
|
(sn, label) = key.split(":")
|
||
|
|
||
|
if sn not in self.output:
|
||
|
self.output[sn] = {}
|
||
|
if 'global' not in self.output[sn]:
|
||
|
self.output[sn]['global'] = {}
|
||
|
if l not in self.output[sn]['global']:
|
||
|
self.output[sn]['global'][l] = {}
|
||
|
|
||
|
self.output[sn]['global'][l][label] = directives[key]
|
||
|
|
||
|
def deliver(self):
|
||
|
# Write out the data
|
||
|
for siphon in self.output.keys():
|
||
|
self.log.info("Saving siphon data %s." % siphon)
|
||
|
s = self.output[siphon]
|
||
|
with open(s['file'], "a") as fp:
|
||
|
json.dump(s, fp,
|
||
|
separators=(',', ': '), indent=4, sort_keys=True)
|
||
|
|