Closed ghost closed 1 year ago
Hi @botasb Are talking about semgrep or pattern matcher? semgrep is already doing parallel processing.
Hi. Patter matcher. I have many many rules... Would it be easy to rewrite them for semgrep? Any advice?
I will look into possible performance improvements.
Regarding re-writing to semgrep, it really depends. Pattern matching is faster for most simpler patterns whereas semgrep can handle complex syntax aware rules. The complexity can increase the scan times.
Hi! Just FYI. Maybe not the most elegant solution, but at least I managed to speed things up with this class modification:
class PatternMatcher:
def __init__(self, options: dict) -> None:
self.matcher = matchers.MatchCommand()
self.scan_rules = get_rules(options.get('match_rules'))
self.show_progress = options.get('show_progress')
exts = options.get('match_extensions')
if exts:
self.exts = [ext.lower() for ext in exts]
else:
self.exts = []
self.findings = {}
def validate(sel, rule):
if not isinstance(rule, dict):
raise exceptions.InvalidRuleFormatError(
'Pattern Matcher Rule format is invalid.')
if not rule.get('type'):
raise exceptions.TypeKeyMissingError(
'The rule is missing the key \'type\'')
if not rule.get('pattern'):
raise exceptions.PatternKeyMissingError(
'The rule is missing the key \'pattern\'')
all_mts = [m for m in dir(matchers) if m.startswith('R')]
pattern_name = rule['type']
if pattern_name not in all_mts:
supported = ', '.join(all_mts)
raise exceptions.MatcherNotFoundError(
f'Matcher \'{pattern_name}\' is not supported.'
f' Available matchers are {supported}',
)
def scan(self, paths: list) -> dict:
"""Scan file(s) or directory."""
if not (self.scan_rules and paths):
return
self.validate_rules()
if self.show_progress:
pbar = common.ProgressBar('Pattern Match', len(paths))
paths = pbar.progrees_loop(paths)
for sfile in paths:
ext = sfile.suffix.lower()
if self.exts and ext not in self.exts:
continue
if sfile.stat().st_size / 1000 / 1000 > 5:
# Skip scanning files greater than 5 MB
print(f'Skipping large file {sfile.as_posix()}')
continue
data = sfile.read_text('utf-8', 'ignore')
self.pattern_matcher(data, sfile, ext)
return self.findings
def validate_rules(self):
"""Validate Rules before scanning."""
with Pool(max_workers=100) as pool:
res = pool.map(self.validate, self.scan_rules)
def scan_rule(self, data, file_path, ext, rule):
case = rule.get('input_case')
if case == 'lower':
tmp_data = data.lower()
elif case == 'upper':
tmp_data = data.upper()
else:
tmp_data = data
if ext in ('.html', '.xml'):
fmt_data = strip_comments2(tmp_data)
else:
fmt_data = strip_comments(tmp_data)
response = {'matches':None,'rule':rule}
matches = self.matcher._find_match(
rule['type'],
fmt_data,
rule)
if matches:
response['matches'] = matches
return response
def pattern_matcher(self, data, file_path, ext):
"""Static Analysis Pattern Matcher."""
try:
with Pool(max_workers=100) as pool:
res = pool.map(self.scan_rule, itertools.repeat(data,len(self.scan_rules)), itertools.repeat(file_path,len(self.scan_rules)), itertools.repeat(ext,len(self.scan_rules)), self.scan_rules)
for r in res:
if r['matches']:
self.add_finding(file_path,r['rule'], r['matches'])
except Exception:
raise exceptions.RuleProcessingError('Rule processing error.')
def find(self, file_path, rule, match):
crule = deepcopy(rule)
file_details = {
'file_path': file_path.as_posix(),
'match_string': match[0],
'match_position': match[1],
'match_lines': match[2],
}
metadata = crule.get('metadata', {})
metadata['description'] = crule['message']
metadata['severity'] = crule['severity']
return {'file_details':file_details, 'metadata':metadata}
def add_finding(self, file_path, rule, matches):
"""Add Code Analysis Findings."""
with Pool(max_workers=100) as pool:
res = pool.map(self.find, itertools.repeat(file_path,len(matches)),itertools.repeat(rule,len(matches)), matches)
for r in res:
if rule['id'] in self.findings:
self.findings[rule['id']]['files'].append(r['file_details'])
else:
self.findings[rule['id']] = {
'files': [r['file_details']],
'metadata': r['metadata'],
}
to_sort = self.findings[rule['id']]['files']
self.findings[rule['id']]['files'] = sorted(
to_sort,
key=itemgetter('file_path', 'match_string', 'match_lines'))
Thanks, will refer this when I work on this.
H there,
Thank you for your code. It's quite useful. I am thinking whether could it be improved by parallelizing the for loops through the rules. When there are many rules, it is quite slow. Any thought about this?
Regards!