diff --git a/README.md b/README.md index c08c86c..8ba48c9 100644 --- a/README.md +++ b/README.md @@ -78,13 +78,13 @@ State is stored separately (created on first successful apply): ## Usage -Dry-run evaluation: +Dry-run validation: ```bash -wp-materialize evaluate +wp-materialize validate ``` -Apply (evaluate, then materialize): +Apply (validate, then materialize): ```bash wp-materialize apply diff --git a/src/cli.py b/src/cli.py index c9ea642..c26b3ab 100644 --- a/src/cli.py +++ b/src/cli.py @@ -8,7 +8,7 @@ from pathlib import Path from .apply import apply_changes from .config import load_config from .errors import ConfigurationError, MaterializeError, ValidationError -from .evaluation import evaluate +from .validation import validate from .local_export import export_local from .scaffold import add_dir_to_manifest, add_file_to_manifest, create_config, create_manifest, resolve_manifest_dir from .state import load_state @@ -47,13 +47,13 @@ def main() -> int: common.add_argument( "--json", action="store_true", - help="Output evaluation summary as JSON.", + help="Output validation summary as JSON.", ) subparsers = parser.add_subparsers(dest="command", metavar="command") subparsers.add_parser( - "evaluate", + "validate", parents=[common], help="Validate config/manifests and plan changes (no WP writes).", description="Validate config/manifests, convert Markdown, and plan changes without writing to WordPress.", @@ -61,8 +61,8 @@ def main() -> int: subparsers.add_parser( "apply", parents=[common], - help="Evaluate then create/update WordPress posts and taxonomy.", - description="Evaluate, then create categories/tags and create or update posts in WordPress.", + help="Validate then create/update WordPress posts and taxonomy.", + description="Validate, then create categories/tags and create or update posts in WordPress.", ) local_parser = subparsers.add_parser( "local", @@ -172,7 +172,7 @@ def main() -> int: try: config = load_config(args.config) state = load_state(args.state) - result = evaluate( + result = validate( config, state, sync_repos=not args.no_sync, @@ -187,9 +187,9 @@ def main() -> int: return 1 if args.json: - print(_evaluation_json(result)) + print(_validation_json(result)) else: - print(_evaluation_summary(result)) + print(_validation_summary(result)) if args.command == "local": output_dir = getattr(args, "output_dir", None) @@ -224,7 +224,7 @@ def _default_state_path() -> Path: return Path.home() / ".config" / "wp-materialize" / "state.json" -def _evaluation_summary(result) -> str: +def _validation_summary(result) -> str: total = len(result.posts) updates = sum(1 for post in result.posts if post.should_update) categories = len(result.taxonomy_to_create.missing_categories) @@ -238,7 +238,7 @@ def _evaluation_summary(result) -> str: return "\n".join(lines) -def _evaluation_json(result) -> str: +def _validation_json(result) -> str: payload = { "posts": [ { diff --git a/src/evaluation.py b/src/evaluation.py deleted file mode 100644 index c71ad35..0000000 --- a/src/evaluation.py +++ /dev/null @@ -1,469 +0,0 @@ -from __future__ import annotations - -from dataclasses import dataclass -from datetime import datetime -from pathlib import Path -import shutil -from typing import Dict, List, Optional, Set - -from .config import Config -from .errors import ValidationError, ValidationIssue -from .git_utils import ensure_repo, git_first_timestamp, git_timestamp -from .manifest import load_manifest -from .markdown_utils import convert_markdown, extract_title -from .models import EvaluationResult, InheritList, PostPlan, Source, TaxonomyPlan -from .state import State -from .wp_cli import WordPressCLI - - -@dataclass -class _Context: - categories: InheritList - tags: InheritList - author: InheritList - renderer: Optional[str] - hard_line_breaks: bool - block_html: bool - subdirectories: InheritList - manifest_chain: List[Path] - - -def evaluate( - config: Config, - state: State, - sync_repos: bool, - force_new: bool = False, - skip_wp_checks: bool = False, -) -> EvaluationResult: - issues: List[ValidationIssue] = [] - - sources = _load_sources(config, sync_repos, issues) - - posts: List[PostPlan] = [] - for source, content_root in sources: - _evaluate_directory( - source=source, - directory=content_root, - context=_Context( - categories=InheritList(), - tags=InheritList(), - author=InheritList(), - renderer=config.renderer, - hard_line_breaks=config.hard_line_breaks, - block_html=config.block_html, - subdirectories=InheritList(), - manifest_chain=[], - ), - state=state, - issues=issues, - posts=posts, - force_new=force_new, - ) - - missing_categories: List[List[str]] = [] - missing_tags: List[str] = [] - if not skip_wp_checks: - if shutil.which("wp") is None: - issues.append(ValidationIssue("wp CLI not found in PATH", context=str(config.wordpress_root))) - categories = [] - tag_names: Set[str] = set() - try: - wp = WordPressCLI(config.wordpress_root) - categories = wp.list_categories() - tags = wp.list_tags() - tag_names = {tag.name for tag in tags} - except Exception as exc: - issues.append(ValidationIssue(str(exc), context=str(config.wordpress_root))) - - missing_categories, missing_tags = _plan_taxonomy(posts, categories, tag_names) - - if issues: - raise ValidationError(issues) - - return EvaluationResult( - posts=posts, - taxonomy_to_create=TaxonomyPlan(missing_categories=missing_categories, missing_tags=missing_tags), - ) - - -def _load_sources( - config: Config, - sync_repos: bool, - issues: List[ValidationIssue], -) -> List[tuple[Source, Path]]: - sources: List[tuple[Source, Path]] = [] - - for repo in config.git_repositories: - repo_path = config.repo_storage_dir / repo.name - try: - ensure_repo(repo_path, repo.url, repo.branch, sync=sync_repos) - except Exception as exc: - issues.append(ValidationIssue(str(exc), context=str(repo_path))) - continue - content_root = repo_path / repo.root_subdir if repo.root_subdir else repo_path - if not content_root.exists(): - issues.append(ValidationIssue("Repository content root missing", context=str(content_root))) - continue - sources.append( - ( - Source(name=repo.name, root_path=content_root, identity_root=repo_path, kind="git"), - content_root, - ) - ) - - for directory in config.directories: - root_path = directory.path - if not root_path.exists(): - issues.append(ValidationIssue("Directory not found", context=str(root_path))) - continue - content_root = root_path / directory.root_subdir if directory.root_subdir else root_path - if not content_root.exists(): - issues.append(ValidationIssue("Directory content root missing", context=str(content_root))) - continue - sources.append( - ( - Source(name=directory.name, root_path=content_root, identity_root=root_path, kind="dir"), - content_root, - ) - ) - - return sources - - -def _evaluate_directory( - source: Source, - directory: Path, - context: _Context, - state: State, - issues: List[ValidationIssue], - posts: List[PostPlan], - force_new: bool, -) -> None: - manifest_path = directory / ".wp-materialize.json" - manifest = load_manifest(manifest_path, issues) - if manifest is None: - return - - effective_categories = _merge_inherit(context.categories, manifest.categories) - effective_tags = _merge_inherit(context.tags, manifest.tags) - effective_author = _merge_inherit(context.author, manifest.author) - effective_renderer = manifest.renderer if manifest.renderer is not None else context.renderer - effective_hard_line_breaks = ( - manifest.hard_line_breaks - if manifest.hard_line_breaks is not None - else context.hard_line_breaks - ) - effective_block_html = ( - manifest.block_html - if manifest.block_html is not None - else context.block_html - ) - effective_subdirs = _merge_inherit(context.subdirectories, manifest.subdirectories) - - manifest_chain = context.manifest_chain + [manifest.path] - - for file_name, spec in manifest.files.items(): - file_path = directory / file_name - if not file_path.exists(): - issues.append(ValidationIssue("File not found", context=str(file_path))) - continue - - try: - content = file_path.read_text(encoding="utf-8") - except Exception as exc: - issues.append(ValidationIssue(f"Failed to read file: {exc}", context=str(file_path))) - continue - - title = spec.title - markdown_body = content - if spec.use_heading_level is not None: - extracted = extract_title( - content, - level=spec.use_heading_level, - strict=spec.use_heading_strict, - context=str(file_path), - issues=issues, - ) - if extracted is None: - continue - title, markdown_body = extracted - elif not title: - issues.append(ValidationIssue("Missing title (title or use_heading_as_title required)", context=str(file_path))) - continue - - resolved_categories = _resolve_overrides(effective_categories, spec.categories) - resolved_tags = _resolve_overrides(effective_tags, spec.tags) - - resolved_categories = _normalize_list(resolved_categories, "category", str(file_path), issues) - resolved_tags = _normalize_list(resolved_tags, "tag", str(file_path), issues) - resolved_author = _resolve_author(effective_author.content, str(file_path), issues) - - resolved_renderer = spec.renderer if spec.renderer is not None else effective_renderer - resolved_hard_line_breaks = ( - spec.hard_line_breaks - if spec.hard_line_breaks is not None - else effective_hard_line_breaks - ) - resolved_block_html = ( - spec.block_html - if spec.block_html is not None - else effective_block_html - ) - html = convert_markdown( - markdown_body, - context=str(file_path), - issues=issues, - renderer=resolved_renderer or "default", - hard_line_breaks=resolved_hard_line_breaks, - block_html=resolved_block_html, - ) - if html is None: - continue - - relative_path = _relative_path(file_path, source.identity_root, issues) - if relative_path is None: - continue - - timestamps = [] - ts = _timestamp_for_path(source, source.identity_root, relative_path, issues) - if ts is None: - continue - timestamps.append(ts) - - for manifest_file in manifest_chain: - manifest_rel = _relative_path(manifest_file, source.identity_root, issues) - if manifest_rel is None: - continue - ts_manifest = _timestamp_for_path(source, source.identity_root, manifest_rel, issues) - if ts_manifest is None: - continue - timestamps.append(ts_manifest) - - source_timestamp = max(timestamps) - identity = f"{source.name}:{relative_path}" - cached_entry = state.posts.get(identity) - cached_ts = cached_entry.source_timestamp if cached_entry else None - should_update = True if force_new else (cached_ts is None or source_timestamp > cached_ts) - created_on, last_modified = _resolve_post_datetimes( - source=source, - identity_root=source.identity_root, - relative_path=relative_path, - spec=spec, - issues=issues, - ) - - posts.append( - PostPlan( - source=source, - identity=identity, - relative_path=relative_path, - absolute_path=file_path, - title=title, - html=html, - categories=resolved_categories, - tags=resolved_tags, - author=resolved_author, - source_timestamp=source_timestamp, - cached_timestamp=cached_ts, - should_update=should_update, - created_on=created_on, - last_modified=last_modified, - ) - ) - - for subdir in effective_subdirs.content: - subdir_path = directory / subdir - if not subdir_path.exists(): - issues.append(ValidationIssue("Missing subdirectory", context=str(subdir_path))) - continue - _evaluate_directory( - source=source, - directory=subdir_path, - context=_Context( - categories=effective_categories, - tags=effective_tags, - author=effective_author, - renderer=effective_renderer, - hard_line_breaks=effective_hard_line_breaks, - block_html=effective_block_html, - subdirectories=effective_subdirs, - manifest_chain=manifest_chain, - ), - state=state, - issues=issues, - posts=posts, - force_new=force_new, - ) - - -def _merge_inherit(parent: InheritList, child: InheritList) -> InheritList: - if child.inherit: - content = parent.content + child.content - else: - content = child.content - return InheritList(content=content, inherit=True) - - -def _resolve_overrides(parent: InheritList, override: Optional[InheritList]) -> List[str]: - if override is None: - return list(parent.content) - if override.inherit: - return parent.content + override.content - return list(override.content) - - -def _normalize_list(values: List[str], label: str, context: str, issues: List[ValidationIssue]) -> List[str]: - normalized: List[str] = [] - seen: Set[str] = set() - for value in values: - cleaned = value.strip() - if not cleaned: - issues.append(ValidationIssue(f"Empty {label} entry", context=context)) - continue - if label == "category": - parts = [part.strip() for part in cleaned.split("/")] - if any(not part for part in parts): - issues.append(ValidationIssue(f"Invalid category path: {cleaned}", context=context)) - continue - cleaned = "/".join(parts) - if cleaned not in seen: - seen.add(cleaned) - normalized.append(cleaned) - return normalized - - -def _resolve_author(values: List[str], context: str, issues: List[ValidationIssue]) -> Optional[str]: - normalized = _normalize_list(values, "author", context, issues) - if not normalized: - return None - if len(normalized) > 1: - issues.append(ValidationIssue("Multiple authors specified; only one is allowed", context=context)) - return None - return normalized[0] - - -def _relative_path(path: Path, root: Path, issues: List[ValidationIssue]) -> Optional[str]: - try: - return str(path.relative_to(root)) - except ValueError: - issues.append(ValidationIssue("Path is outside identity root", context=str(path))) - return None - - -def _timestamp_for_path( - source: Source, - identity_root: Path, - relative_path: str, - issues: List[ValidationIssue], -) -> Optional[int]: - if source.kind == "git": - try: - return git_timestamp(identity_root, relative_path) - except Exception as exc: - issues.append(ValidationIssue(str(exc), context=relative_path)) - return None - try: - return int((identity_root / relative_path).stat().st_mtime) - except Exception as exc: - issues.append(ValidationIssue(f"Timestamp lookup failed: {exc}", context=relative_path)) - return None - - -def _resolve_post_datetimes( - source: Source, - identity_root: Path, - relative_path: str, - spec, - issues: List[ValidationIssue], -) -> tuple[Optional[str], Optional[str]]: - created_dt = spec.created_on - modified_dt = spec.last_modified - - if created_dt is None or modified_dt is None: - inferred = _infer_file_timestamps(source, identity_root, relative_path, issues) - if inferred is None: - return None, None - inferred_created, inferred_modified = inferred - if created_dt is None: - created_dt = datetime.fromtimestamp(inferred_created) - if modified_dt is None: - modified_dt = datetime.fromtimestamp(inferred_modified) - - if created_dt and modified_dt and modified_dt < created_dt: - issues.append( - ValidationIssue("last_modified cannot be earlier than created_on", context=relative_path) - ) - return None, None - - created_on = _format_wp_datetime(created_dt) if created_dt else None - last_modified = _format_wp_datetime(modified_dt) if modified_dt else None - return created_on, last_modified - - -def _infer_file_timestamps( - source: Source, - identity_root: Path, - relative_path: str, - issues: List[ValidationIssue], -) -> Optional[tuple[int, int]]: - if source.kind == "git": - try: - created_ts = git_first_timestamp(identity_root, relative_path) - modified_ts = git_timestamp(identity_root, relative_path) - return created_ts, modified_ts - except Exception: - pass - try: - stat = (identity_root / relative_path).stat() - return int(stat.st_ctime), int(stat.st_mtime) - except Exception as exc: - issues.append(ValidationIssue(f"Timestamp lookup failed: {exc}", context=relative_path)) - return None - - -def _format_wp_datetime(value: datetime) -> str: - return value.strftime("%Y-%m-%d %H:%M:%S") - - -def _plan_taxonomy( - posts: List[PostPlan], - categories, # list of CategoryTerm - existing_tags: Set[str], -) -> tuple[List[List[str]], List[str]]: - category_map: Dict[tuple[int, str], int] = {} - for category in categories: - category_map[(category.parent, category.name)] = category.term_id - - missing_paths: List[List[str]] = [] - seen_missing: Set[tuple[str, ...]] = set() - missing_tags: List[str] = [] - seen_tags: Set[str] = set() - - for post in posts: - if not post.should_update: - continue - for tag in post.tags: - if tag not in existing_tags: - if tag not in seen_tags: - seen_tags.add(tag) - missing_tags.append(tag) - for path in post.categories: - segments = [segment for segment in path.split("/") if segment] - if not segments: - continue - parent = 0 - missing = False - for segment in segments: - key = (parent, segment) - if key in category_map: - parent = category_map[key] - continue - missing = True - break - if missing: - key = tuple(segments) - if key not in seen_missing: - seen_missing.add(key) - missing_paths.append(list(segments)) - - return missing_paths, missing_tags