renamed evaluate to validate

This commit is contained in:
2026-02-08 18:30:11 -05:00
parent b272c928d4
commit 094a283ba9
3 changed files with 13 additions and 482 deletions

View File

@@ -78,13 +78,13 @@ State is stored separately (created on first successful apply):
## Usage
Dry-run evaluation:
Dry-run validation:
```bash
wp-materialize evaluate
wp-materialize validate
```
Apply (evaluate, then materialize):
Apply (validate, then materialize):
```bash
wp-materialize apply

View File

@@ -8,7 +8,7 @@ from pathlib import Path
from .apply import apply_changes
from .config import load_config
from .errors import ConfigurationError, MaterializeError, ValidationError
from .evaluation import evaluate
from .validation import validate
from .local_export import export_local
from .scaffold import add_dir_to_manifest, add_file_to_manifest, create_config, create_manifest, resolve_manifest_dir
from .state import load_state
@@ -47,13 +47,13 @@ def main() -> int:
common.add_argument(
"--json",
action="store_true",
help="Output evaluation summary as JSON.",
help="Output validation summary as JSON.",
)
subparsers = parser.add_subparsers(dest="command", metavar="command")
subparsers.add_parser(
"evaluate",
"validate",
parents=[common],
help="Validate config/manifests and plan changes (no WP writes).",
description="Validate config/manifests, convert Markdown, and plan changes without writing to WordPress.",
@@ -61,8 +61,8 @@ def main() -> int:
subparsers.add_parser(
"apply",
parents=[common],
help="Evaluate then create/update WordPress posts and taxonomy.",
description="Evaluate, then create categories/tags and create or update posts in WordPress.",
help="Validate then create/update WordPress posts and taxonomy.",
description="Validate, then create categories/tags and create or update posts in WordPress.",
)
local_parser = subparsers.add_parser(
"local",
@@ -172,7 +172,7 @@ def main() -> int:
try:
config = load_config(args.config)
state = load_state(args.state)
result = evaluate(
result = validate(
config,
state,
sync_repos=not args.no_sync,
@@ -187,9 +187,9 @@ def main() -> int:
return 1
if args.json:
print(_evaluation_json(result))
print(_validation_json(result))
else:
print(_evaluation_summary(result))
print(_validation_summary(result))
if args.command == "local":
output_dir = getattr(args, "output_dir", None)
@@ -224,7 +224,7 @@ def _default_state_path() -> Path:
return Path.home() / ".config" / "wp-materialize" / "state.json"
def _evaluation_summary(result) -> str:
def _validation_summary(result) -> str:
total = len(result.posts)
updates = sum(1 for post in result.posts if post.should_update)
categories = len(result.taxonomy_to_create.missing_categories)
@@ -238,7 +238,7 @@ def _evaluation_summary(result) -> str:
return "\n".join(lines)
def _evaluation_json(result) -> str:
def _validation_json(result) -> str:
payload = {
"posts": [
{

View File

@@ -1,469 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
import shutil
from typing import Dict, List, Optional, Set
from .config import Config
from .errors import ValidationError, ValidationIssue
from .git_utils import ensure_repo, git_first_timestamp, git_timestamp
from .manifest import load_manifest
from .markdown_utils import convert_markdown, extract_title
from .models import EvaluationResult, InheritList, PostPlan, Source, TaxonomyPlan
from .state import State
from .wp_cli import WordPressCLI
@dataclass
class _Context:
categories: InheritList
tags: InheritList
author: InheritList
renderer: Optional[str]
hard_line_breaks: bool
block_html: bool
subdirectories: InheritList
manifest_chain: List[Path]
def evaluate(
config: Config,
state: State,
sync_repos: bool,
force_new: bool = False,
skip_wp_checks: bool = False,
) -> EvaluationResult:
issues: List[ValidationIssue] = []
sources = _load_sources(config, sync_repos, issues)
posts: List[PostPlan] = []
for source, content_root in sources:
_evaluate_directory(
source=source,
directory=content_root,
context=_Context(
categories=InheritList(),
tags=InheritList(),
author=InheritList(),
renderer=config.renderer,
hard_line_breaks=config.hard_line_breaks,
block_html=config.block_html,
subdirectories=InheritList(),
manifest_chain=[],
),
state=state,
issues=issues,
posts=posts,
force_new=force_new,
)
missing_categories: List[List[str]] = []
missing_tags: List[str] = []
if not skip_wp_checks:
if shutil.which("wp") is None:
issues.append(ValidationIssue("wp CLI not found in PATH", context=str(config.wordpress_root)))
categories = []
tag_names: Set[str] = set()
try:
wp = WordPressCLI(config.wordpress_root)
categories = wp.list_categories()
tags = wp.list_tags()
tag_names = {tag.name for tag in tags}
except Exception as exc:
issues.append(ValidationIssue(str(exc), context=str(config.wordpress_root)))
missing_categories, missing_tags = _plan_taxonomy(posts, categories, tag_names)
if issues:
raise ValidationError(issues)
return EvaluationResult(
posts=posts,
taxonomy_to_create=TaxonomyPlan(missing_categories=missing_categories, missing_tags=missing_tags),
)
def _load_sources(
config: Config,
sync_repos: bool,
issues: List[ValidationIssue],
) -> List[tuple[Source, Path]]:
sources: List[tuple[Source, Path]] = []
for repo in config.git_repositories:
repo_path = config.repo_storage_dir / repo.name
try:
ensure_repo(repo_path, repo.url, repo.branch, sync=sync_repos)
except Exception as exc:
issues.append(ValidationIssue(str(exc), context=str(repo_path)))
continue
content_root = repo_path / repo.root_subdir if repo.root_subdir else repo_path
if not content_root.exists():
issues.append(ValidationIssue("Repository content root missing", context=str(content_root)))
continue
sources.append(
(
Source(name=repo.name, root_path=content_root, identity_root=repo_path, kind="git"),
content_root,
)
)
for directory in config.directories:
root_path = directory.path
if not root_path.exists():
issues.append(ValidationIssue("Directory not found", context=str(root_path)))
continue
content_root = root_path / directory.root_subdir if directory.root_subdir else root_path
if not content_root.exists():
issues.append(ValidationIssue("Directory content root missing", context=str(content_root)))
continue
sources.append(
(
Source(name=directory.name, root_path=content_root, identity_root=root_path, kind="dir"),
content_root,
)
)
return sources
def _evaluate_directory(
source: Source,
directory: Path,
context: _Context,
state: State,
issues: List[ValidationIssue],
posts: List[PostPlan],
force_new: bool,
) -> None:
manifest_path = directory / ".wp-materialize.json"
manifest = load_manifest(manifest_path, issues)
if manifest is None:
return
effective_categories = _merge_inherit(context.categories, manifest.categories)
effective_tags = _merge_inherit(context.tags, manifest.tags)
effective_author = _merge_inherit(context.author, manifest.author)
effective_renderer = manifest.renderer if manifest.renderer is not None else context.renderer
effective_hard_line_breaks = (
manifest.hard_line_breaks
if manifest.hard_line_breaks is not None
else context.hard_line_breaks
)
effective_block_html = (
manifest.block_html
if manifest.block_html is not None
else context.block_html
)
effective_subdirs = _merge_inherit(context.subdirectories, manifest.subdirectories)
manifest_chain = context.manifest_chain + [manifest.path]
for file_name, spec in manifest.files.items():
file_path = directory / file_name
if not file_path.exists():
issues.append(ValidationIssue("File not found", context=str(file_path)))
continue
try:
content = file_path.read_text(encoding="utf-8")
except Exception as exc:
issues.append(ValidationIssue(f"Failed to read file: {exc}", context=str(file_path)))
continue
title = spec.title
markdown_body = content
if spec.use_heading_level is not None:
extracted = extract_title(
content,
level=spec.use_heading_level,
strict=spec.use_heading_strict,
context=str(file_path),
issues=issues,
)
if extracted is None:
continue
title, markdown_body = extracted
elif not title:
issues.append(ValidationIssue("Missing title (title or use_heading_as_title required)", context=str(file_path)))
continue
resolved_categories = _resolve_overrides(effective_categories, spec.categories)
resolved_tags = _resolve_overrides(effective_tags, spec.tags)
resolved_categories = _normalize_list(resolved_categories, "category", str(file_path), issues)
resolved_tags = _normalize_list(resolved_tags, "tag", str(file_path), issues)
resolved_author = _resolve_author(effective_author.content, str(file_path), issues)
resolved_renderer = spec.renderer if spec.renderer is not None else effective_renderer
resolved_hard_line_breaks = (
spec.hard_line_breaks
if spec.hard_line_breaks is not None
else effective_hard_line_breaks
)
resolved_block_html = (
spec.block_html
if spec.block_html is not None
else effective_block_html
)
html = convert_markdown(
markdown_body,
context=str(file_path),
issues=issues,
renderer=resolved_renderer or "default",
hard_line_breaks=resolved_hard_line_breaks,
block_html=resolved_block_html,
)
if html is None:
continue
relative_path = _relative_path(file_path, source.identity_root, issues)
if relative_path is None:
continue
timestamps = []
ts = _timestamp_for_path(source, source.identity_root, relative_path, issues)
if ts is None:
continue
timestamps.append(ts)
for manifest_file in manifest_chain:
manifest_rel = _relative_path(manifest_file, source.identity_root, issues)
if manifest_rel is None:
continue
ts_manifest = _timestamp_for_path(source, source.identity_root, manifest_rel, issues)
if ts_manifest is None:
continue
timestamps.append(ts_manifest)
source_timestamp = max(timestamps)
identity = f"{source.name}:{relative_path}"
cached_entry = state.posts.get(identity)
cached_ts = cached_entry.source_timestamp if cached_entry else None
should_update = True if force_new else (cached_ts is None or source_timestamp > cached_ts)
created_on, last_modified = _resolve_post_datetimes(
source=source,
identity_root=source.identity_root,
relative_path=relative_path,
spec=spec,
issues=issues,
)
posts.append(
PostPlan(
source=source,
identity=identity,
relative_path=relative_path,
absolute_path=file_path,
title=title,
html=html,
categories=resolved_categories,
tags=resolved_tags,
author=resolved_author,
source_timestamp=source_timestamp,
cached_timestamp=cached_ts,
should_update=should_update,
created_on=created_on,
last_modified=last_modified,
)
)
for subdir in effective_subdirs.content:
subdir_path = directory / subdir
if not subdir_path.exists():
issues.append(ValidationIssue("Missing subdirectory", context=str(subdir_path)))
continue
_evaluate_directory(
source=source,
directory=subdir_path,
context=_Context(
categories=effective_categories,
tags=effective_tags,
author=effective_author,
renderer=effective_renderer,
hard_line_breaks=effective_hard_line_breaks,
block_html=effective_block_html,
subdirectories=effective_subdirs,
manifest_chain=manifest_chain,
),
state=state,
issues=issues,
posts=posts,
force_new=force_new,
)
def _merge_inherit(parent: InheritList, child: InheritList) -> InheritList:
if child.inherit:
content = parent.content + child.content
else:
content = child.content
return InheritList(content=content, inherit=True)
def _resolve_overrides(parent: InheritList, override: Optional[InheritList]) -> List[str]:
if override is None:
return list(parent.content)
if override.inherit:
return parent.content + override.content
return list(override.content)
def _normalize_list(values: List[str], label: str, context: str, issues: List[ValidationIssue]) -> List[str]:
normalized: List[str] = []
seen: Set[str] = set()
for value in values:
cleaned = value.strip()
if not cleaned:
issues.append(ValidationIssue(f"Empty {label} entry", context=context))
continue
if label == "category":
parts = [part.strip() for part in cleaned.split("/")]
if any(not part for part in parts):
issues.append(ValidationIssue(f"Invalid category path: {cleaned}", context=context))
continue
cleaned = "/".join(parts)
if cleaned not in seen:
seen.add(cleaned)
normalized.append(cleaned)
return normalized
def _resolve_author(values: List[str], context: str, issues: List[ValidationIssue]) -> Optional[str]:
normalized = _normalize_list(values, "author", context, issues)
if not normalized:
return None
if len(normalized) > 1:
issues.append(ValidationIssue("Multiple authors specified; only one is allowed", context=context))
return None
return normalized[0]
def _relative_path(path: Path, root: Path, issues: List[ValidationIssue]) -> Optional[str]:
try:
return str(path.relative_to(root))
except ValueError:
issues.append(ValidationIssue("Path is outside identity root", context=str(path)))
return None
def _timestamp_for_path(
source: Source,
identity_root: Path,
relative_path: str,
issues: List[ValidationIssue],
) -> Optional[int]:
if source.kind == "git":
try:
return git_timestamp(identity_root, relative_path)
except Exception as exc:
issues.append(ValidationIssue(str(exc), context=relative_path))
return None
try:
return int((identity_root / relative_path).stat().st_mtime)
except Exception as exc:
issues.append(ValidationIssue(f"Timestamp lookup failed: {exc}", context=relative_path))
return None
def _resolve_post_datetimes(
source: Source,
identity_root: Path,
relative_path: str,
spec,
issues: List[ValidationIssue],
) -> tuple[Optional[str], Optional[str]]:
created_dt = spec.created_on
modified_dt = spec.last_modified
if created_dt is None or modified_dt is None:
inferred = _infer_file_timestamps(source, identity_root, relative_path, issues)
if inferred is None:
return None, None
inferred_created, inferred_modified = inferred
if created_dt is None:
created_dt = datetime.fromtimestamp(inferred_created)
if modified_dt is None:
modified_dt = datetime.fromtimestamp(inferred_modified)
if created_dt and modified_dt and modified_dt < created_dt:
issues.append(
ValidationIssue("last_modified cannot be earlier than created_on", context=relative_path)
)
return None, None
created_on = _format_wp_datetime(created_dt) if created_dt else None
last_modified = _format_wp_datetime(modified_dt) if modified_dt else None
return created_on, last_modified
def _infer_file_timestamps(
source: Source,
identity_root: Path,
relative_path: str,
issues: List[ValidationIssue],
) -> Optional[tuple[int, int]]:
if source.kind == "git":
try:
created_ts = git_first_timestamp(identity_root, relative_path)
modified_ts = git_timestamp(identity_root, relative_path)
return created_ts, modified_ts
except Exception:
pass
try:
stat = (identity_root / relative_path).stat()
return int(stat.st_ctime), int(stat.st_mtime)
except Exception as exc:
issues.append(ValidationIssue(f"Timestamp lookup failed: {exc}", context=relative_path))
return None
def _format_wp_datetime(value: datetime) -> str:
return value.strftime("%Y-%m-%d %H:%M:%S")
def _plan_taxonomy(
posts: List[PostPlan],
categories, # list of CategoryTerm
existing_tags: Set[str],
) -> tuple[List[List[str]], List[str]]:
category_map: Dict[tuple[int, str], int] = {}
for category in categories:
category_map[(category.parent, category.name)] = category.term_id
missing_paths: List[List[str]] = []
seen_missing: Set[tuple[str, ...]] = set()
missing_tags: List[str] = []
seen_tags: Set[str] = set()
for post in posts:
if not post.should_update:
continue
for tag in post.tags:
if tag not in existing_tags:
if tag not in seen_tags:
seen_tags.add(tag)
missing_tags.append(tag)
for path in post.categories:
segments = [segment for segment in path.split("/") if segment]
if not segments:
continue
parent = 0
missing = False
for segment in segments:
key = (parent, segment)
if key in category_map:
parent = category_map[key]
continue
missing = True
break
if missing:
key = tuple(segments)
if key not in seen_missing:
seen_missing.add(key)
missing_paths.append(list(segments))
return missing_paths, missing_tags