-
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathinit.py
More file actions
executable file
·420 lines (351 loc) · 16.4 KB
/
Copy pathinit.py
File metadata and controls
executable file
·420 lines (351 loc) · 16.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
#!/usr/bin/env python3
"""
OpenSecOps Foundation Environment Initialization Script
This script initializes a new OpenSecOps Foundation development environment by discovering
and setting up all Foundation components for development and deployment.
What it does:
- Discovers all Foundation-* repositories in the parent directory
- Runs git setup (./setup) for each component to configure dual-repository workflow
- Distributes latest deployment scripts via refresh mechanism
- Validates component configurations and dependencies
- Prepares the environment for unified deployment via deploy-all
This is typically run once when setting up a new development environment or when
onboarding new Foundation components to ensure they're properly integrated with
the deployment system.
Usage:
./init
The script provides a foundation-wide initialization that ensures all components
are ready for development and deployment within the OpenSecOps ecosystem.
"""
import argparse
import os
import subprocess
import sys
# Define colors
YELLOW = "\033[93m"
LIGHT_BLUE = "\033[94m"
GREEN = "\033[92m"
RED = "\033[91m"
END = "\033[0m"
BOLD = "\033[1m"
def printc(color, string, **kwargs):
print(f"{color}{string}{END}", **kwargs)
def check_software(software):
try:
subprocess.check_call([software, '--version'], stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
except (FileNotFoundError, subprocess.CalledProcessError):
return False
return True
def get_required_python_version():
with open('.python-version', 'r') as file:
return file.read().strip()
def setup_python_environment(required_version):
installed_versions = subprocess.check_output(['pyenv', 'versions', '--bare'], encoding='utf-8').splitlines()
if required_version not in installed_versions:
printc(YELLOW, f"Installing Python {required_version}... ", end="")
subprocess.check_call(['pyenv', 'install', required_version])
subprocess.check_call(['pyenv', 'local', required_version])
def _installer_root():
# init.py lives at <Installer>/scripts/init.py; requirements.{in,txt}
# live at <Installer>/. Resolve symlinks so this works whether init.py
# is invoked via the top-level `./init` symlink or via `scripts/init.py`.
return os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
def _load_verifier():
# Late-import the shared verifier so that sigstore (a pinned Python dep)
# can be installed by install_python_packages() first. Returns the
# verify_release function or None if import fails.
import importlib.util
spec = importlib.util.spec_from_file_location(
'_verify_release',
os.path.join(os.path.dirname(os.path.realpath(__file__)), '_verify_release.py'))
mod = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(mod)
except ImportError:
return None
return mod.verify_release
def verify_installer_self(verifier):
# Verify that the Installer's own checkout corresponds to a signed
# GitHub Release. Closes the bootstrap gap: a customer who pulls a
# tampered Installer would have their malicious init.py refuse to
# proceed at this point (assuming the attacker hasn't also stripped
# this check out — TOFU applies to first install only; every
# subsequent update is verified before doing anything destructive).
return verifier("Installer", repo_dir=_installer_root())
def _read_direct_deps_with_versions():
"""Return [(name, pinned_version), ...] for the Installer's direct deps.
Direct-dep *names* come from requirements.in (one transitive layer below
the boto3.in include). *Versions* come from the corresponding == pin in
requirements.txt. Both files are read at customer-install time; their
bytes are part of the cloned Installer repo, so no network access is
needed for this step.
"""
import re
root = _installer_root()
in_path = os.path.join(root, "requirements.in")
txt_path = os.path.join(root, "requirements.txt")
boto3_in_path = os.path.join(root, "templates", "boto3.in")
direct_names = []
def read_in_file(path):
with open(path, "r", encoding="utf-8") as f:
for line in f:
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
if stripped.startswith("-r "):
# Resolve relative include from the dir of `path`.
nested = os.path.join(os.path.dirname(path), stripped[3:].strip())
read_in_file(nested)
continue
# Strip version specifiers / comments to get the bare name.
name = re.split(r"[<>=!~;\s#]", stripped, 1)[0]
if name and name not in direct_names:
direct_names.append(name)
read_in_file(in_path)
# Map name → pinned version from requirements.txt (only the direct
# deps; transitives are present too but we don't display them).
pinned = {}
pin_re = re.compile(r"^([A-Za-z0-9._-]+)==(\S+?)\s*(?:\\)?\s*$")
with open(txt_path, "r", encoding="utf-8") as f:
for line in f:
m = pin_re.match(line)
if m:
pinned[m.group(1).lower()] = m.group(2)
return [(name, pinned.get(name.lower(), "?")) for name in direct_names]
def _all_direct_deps_at_pinned_versions(direct_deps):
"""Fast-path check: are all direct deps already importable at the
exact pinned versions? Lets repeated `./init` runs skip the pip
invocation when nothing has changed.
"""
try:
from importlib.metadata import version as get_version, PackageNotFoundError
except ImportError:
return False
for name, pinned_version in direct_deps:
try:
if get_version(name) != pinned_version:
return False
except PackageNotFoundError:
return False
return True
def install_python_packages():
"""Install the Installer's pinned, hash-verified runtime deps.
Deps + their hashes live in <Installer>/requirements.txt (committed,
generated by the maintainer via compile-requirements.sh from
requirements.in). The customer's pyenv-managed Python receives them
via `pip install --require-hashes -r requirements.txt`, so
tampering or PyPI substitution is detected at install time.
Silent on the happy path: when everything is already at the pinned
versions there is nothing to do and nothing to report. Output appears
only when a (re)install is actually needed — the case worth surfacing.
"""
direct_deps = _read_direct_deps_with_versions()
if _all_direct_deps_at_pinned_versions(direct_deps):
return
printc(GREEN, "Pinned, hash-verified dependencies need (re)installing:")
for name, version in direct_deps:
printc(GREEN, f" {name}=={version}")
req_path = os.path.join(_installer_root(), "requirements.txt")
printc(GREEN, " ... installing (hash-verified)... ", end="")
subprocess.check_call(
[sys.executable, "-m", "pip", "install",
"--require-hashes", "-r", req_path],
stdout=subprocess.DEVNULL,
)
printc(GREEN, "OK")
def _resync_existing_clone(path, name, no_verify=False):
# Customer-side clones are vendored copies — no legitimate local
# edits or commits — so the safe operation is to replace local
# state with origin's. Handles stale clones, detached HEAD, wrong
# branch, and the recreated-remote case (unrelated histories;
# reset --hard does not care). Refuses only on a dirty working
# tree, since that's the one state where a customer might have
# unsaved work to inspect.
before = subprocess.run(
['git', '-C', path, 'rev-parse', 'HEAD'],
capture_output=True, text=True).stdout.strip()
dirty = subprocess.run(
['git', '-C', path, 'status', '--porcelain'],
capture_output=True, text=True).stdout
if dirty.strip():
printc(RED, "Dirty working tree")
printc(RED,
f" {name} has uncommitted local changes. Inspect with "
f"`git -C {path} status`, then either commit/stash or "
f"`rm -rf {path}` and re-run ./init to fetch a fresh copy.")
return False
# -P (--prune-tags) is essential for the recreated-remote case
# so stale local tags don't confuse the downstream
# `git describe --tags --exact-match HEAD` in _verify_release.py.
fetch = subprocess.run(
['git', '-C', path, 'fetch', '-p', '-P', 'origin'],
capture_output=True, text=True)
if fetch.returncode != 0:
printc(RED, "Fetch failed")
printc(RED, f" {fetch.stderr.strip()}")
return False
# Single move that handles stale, divergent, detached,
# wrong-branch, and recreated-remote uniformly.
reset = subprocess.run(
['git', '-C', path, 'reset', '--hard', 'origin/main'],
capture_output=True, text=True)
if reset.returncode != 0:
printc(RED, "Reset failed")
printc(RED, f" {reset.stderr.strip()}")
return False
after = subprocess.run(
['git', '-C', path, 'rev-parse', 'HEAD'],
capture_output=True, text=True).stdout.strip()
tag = subprocess.run(
['git', '-C', path, 'describe', '--tags', '--exact-match', 'HEAD'],
capture_output=True, text=True)
if tag.returncode != 0:
if no_verify:
# Verification disabled for this run; tag state is irrelevant.
# Report only whether the resync actually moved HEAD.
printc(LIGHT_BLUE if before != after else GREEN,
"Changes" if before != after else "No changes")
return True
# origin/main is between releases — maintainer-side condition,
# surfaced here rather than letting verification fail later
# with a less actionable message.
printc(YELLOW, "No release tag on HEAD")
printc(YELLOW,
f" {name}: origin/main is not on a release tag. "
f"./deploy verification will refuse until the next "
f"signed release is published.")
return True
label = tag.stdout.strip()
if before != after:
printc(LIGHT_BLUE, f"Changes ({label})")
else:
printc(GREEN, f"No changes ({label})")
return True
def clone_repo(url, path, name, no_verify=False):
if os.path.exists(path):
printc(YELLOW, f"\rUpdating repo {name}... ", end="")
if not _resync_existing_clone(path, name, no_verify=no_verify):
return
else:
printc(YELLOW, f"\rDownloading repo {path}... ", end="")
subprocess.run(['git', 'clone', url, path, '--quiet'], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
printc(GREEN, "OK")
# Dynamically obtain the current working directory before any changes
original_dir = os.getcwd()
try:
# Change to the repo directory only after successful cloning/updating
os.chdir(path)
# Check and set up the Python environment for the repo
required_python_version = get_required_python_version()
setup_python_environment(required_python_version)
# Install necessary Python packages for the repo. Silent unless a
# (re)install is actually needed.
install_python_packages()
finally:
# Always revert to the original working directory
os.chdir(original_dir)
def main():
# Print header
printc(LIGHT_BLUE + BOLD, "Setting up your OpenSecOps client workspace...")
# Get the parent directory from the CWD
current_dir = os.getcwd()
parent_dir = os.path.dirname(current_dir)
# Parse arguments. `app` is an optional positional (SOAR/Foundation);
# --no-verify is the development escape hatch that skips release
# verification entirely (loud, audited — never for customer installs).
parser = argparse.ArgumentParser()
parser.add_argument('app', nargs='?', default=None,
help="The application to set up (SOAR or Foundation).")
parser.add_argument('--no-verify', dest='no_verify', action='store_true',
help='Skip release signature verification (development only; '
'loud override, printed for audit)')
args = parser.parse_args()
no_verify = args.no_verify
# Get the argument and convert it to lowercase
if args.app:
app = args.app.lower()
else:
# Check if any repos are installed for SOAR or Foundation
installed_dirs = os.listdir(parent_dir)
installed_soar_repos = [d for d in installed_dirs if d.startswith('SOAR')]
installed_foundation_repos = [d for d in installed_dirs if d.startswith('Foundation')]
if installed_soar_repos and installed_foundation_repos:
printc(RED, "Both OpenSecOps SOAR and OpenSecOps Foundation repos are installed.")
printc(RED, "Please specify 'SOAR' or 'Foundation' as an argument.")
return
elif installed_soar_repos:
app = 'soar'
printc(LIGHT_BLUE, "Only OpenSecOps SOAR is installed, assuming 'SOAR' is what you want.")
elif installed_foundation_repos:
app = 'foundation'
printc(LIGHT_BLUE, "Only OpenSecOps Foundation is installed, assuming 'Foundation' is what you want.")
else:
printc(RED, "No OpenSecOps SOAR or OpenSecOps Foundation repos are installed.")
printc(RED, "Please specify 'SOAR' or 'Foundation' as an argument.")
return
print()
# Check for necessary software
necessary_software = ['aws', 'sam', 'pyenv', 'git']
printc(YELLOW, f"Checking prerequisites ({', '.join(necessary_software)})... ", end="")
not_installed = []
for software in necessary_software:
if not check_software(software):
not_installed += software
printc(RED, f"Please install {software} before running this script.")
if not_installed:
return
else:
printc(GREEN, "OK")
print()
required_python_version = get_required_python_version()
setup_python_environment(required_python_version)
install_python_packages()
# --- Release verification setup ---------------------------------------
# sigstore was just pip-installed above, so the shared verifier module
# can now be imported. On first-ever install (no sigstore yet at module
# load time of init.py), this is the earliest point we can call it.
verifier = _load_verifier()
if no_verify:
# Single, quiet notice for the whole run. Per-repo verification is
# skipped entirely below — no banners, no tag-state commentary.
printc(RED + BOLD, "\nRelease verification disabled (--no-verify).")
elif verifier is None:
printc(YELLOW,
"sigstore not available; release verification is unavailable for this run. "
"Re-run ./init to pick it up.")
else:
# Self-verify the Installer's own current checkout. If the customer
# pulled a tampered Installer, this is where we catch it.
printc(LIGHT_BLUE, "\nVerifying Installer release signature...")
if not verify_installer_self(verifier):
printc(RED, "Installer self-verification FAILED. Refusing to proceed.")
return
# We can now load toml
import toml
# Load configuration file based on the argument
config_file = f"apps/{app}/repos.toml"
if not os.path.exists(config_file):
printc(RED, f"Configuration file {config_file} does not exist.")
return
config = toml.load(config_file)
# Clone necessary repos, verifying each one's signed release as we go.
# Eager check here means problems are surfaced at init rather than only
# at deploy time (deploy.py runs the same verifier just-in-time too).
base_url = config['GitHub']['source_base_url']
verify_failures = []
for repo in config['repos']:
repo_name = repo['name']
repo_url = base_url + repo_name + '.git'
repo_path = os.path.join(parent_dir, repo_name)
clone_repo(repo_url, repo_path, repo_name, no_verify=no_verify)
if verifier is not None and not no_verify:
if not verifier(repo_name, repo_dir=repo_path):
verify_failures.append(repo_name)
if verify_failures:
printc(RED,
f"\n{len(verify_failures)} component(s) failed release verification: "
f"{', '.join(verify_failures)}")
printc(RED,
"Do not run ./deploy in these components until the issue is resolved.")
if __name__ == '__main__':
main()