renamed evaluate to validate
This commit is contained in:
469
src/validation.py
Normal file
469
src/validation.py
Normal file
@@ -0,0 +1,469 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
import shutil
|
||||||
|
from typing import Dict, List, Optional, Set
|
||||||
|
|
||||||
|
from .config import Config
|
||||||
|
from .errors import ValidationError, ValidationIssue
|
||||||
|
from .git_utils import ensure_repo, git_first_timestamp, git_timestamp
|
||||||
|
from .manifest import load_manifest
|
||||||
|
from .markdown_utils import convert_markdown, extract_title
|
||||||
|
from .models import EvaluationResult, InheritList, PostPlan, Source, TaxonomyPlan
|
||||||
|
from .state import State
|
||||||
|
from .wp_cli import WordPressCLI
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class _Context:
|
||||||
|
categories: InheritList
|
||||||
|
tags: InheritList
|
||||||
|
author: InheritList
|
||||||
|
renderer: Optional[str]
|
||||||
|
hard_line_breaks: bool
|
||||||
|
block_html: bool
|
||||||
|
subdirectories: InheritList
|
||||||
|
manifest_chain: List[Path]
|
||||||
|
|
||||||
|
|
||||||
|
def validate(
|
||||||
|
config: Config,
|
||||||
|
state: State,
|
||||||
|
sync_repos: bool,
|
||||||
|
force_new: bool = False,
|
||||||
|
skip_wp_checks: bool = False,
|
||||||
|
) -> EvaluationResult:
|
||||||
|
issues: List[ValidationIssue] = []
|
||||||
|
|
||||||
|
sources = _load_sources(config, sync_repos, issues)
|
||||||
|
|
||||||
|
posts: List[PostPlan] = []
|
||||||
|
for source, content_root in sources:
|
||||||
|
_validate_directory(
|
||||||
|
source=source,
|
||||||
|
directory=content_root,
|
||||||
|
context=_Context(
|
||||||
|
categories=InheritList(),
|
||||||
|
tags=InheritList(),
|
||||||
|
author=InheritList(),
|
||||||
|
renderer=config.renderer,
|
||||||
|
hard_line_breaks=config.hard_line_breaks,
|
||||||
|
block_html=config.block_html,
|
||||||
|
subdirectories=InheritList(),
|
||||||
|
manifest_chain=[],
|
||||||
|
),
|
||||||
|
state=state,
|
||||||
|
issues=issues,
|
||||||
|
posts=posts,
|
||||||
|
force_new=force_new,
|
||||||
|
)
|
||||||
|
|
||||||
|
missing_categories: List[List[str]] = []
|
||||||
|
missing_tags: List[str] = []
|
||||||
|
if not skip_wp_checks:
|
||||||
|
if shutil.which("wp") is None:
|
||||||
|
issues.append(ValidationIssue("wp CLI not found in PATH", context=str(config.wordpress_root)))
|
||||||
|
categories = []
|
||||||
|
tag_names: Set[str] = set()
|
||||||
|
try:
|
||||||
|
wp = WordPressCLI(config.wordpress_root)
|
||||||
|
categories = wp.list_categories()
|
||||||
|
tags = wp.list_tags()
|
||||||
|
tag_names = {tag.name for tag in tags}
|
||||||
|
except Exception as exc:
|
||||||
|
issues.append(ValidationIssue(str(exc), context=str(config.wordpress_root)))
|
||||||
|
|
||||||
|
missing_categories, missing_tags = _plan_taxonomy(posts, categories, tag_names)
|
||||||
|
|
||||||
|
if issues:
|
||||||
|
raise ValidationError(issues)
|
||||||
|
|
||||||
|
return EvaluationResult(
|
||||||
|
posts=posts,
|
||||||
|
taxonomy_to_create=TaxonomyPlan(missing_categories=missing_categories, missing_tags=missing_tags),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _load_sources(
|
||||||
|
config: Config,
|
||||||
|
sync_repos: bool,
|
||||||
|
issues: List[ValidationIssue],
|
||||||
|
) -> List[tuple[Source, Path]]:
|
||||||
|
sources: List[tuple[Source, Path]] = []
|
||||||
|
|
||||||
|
for repo in config.git_repositories:
|
||||||
|
repo_path = config.repo_storage_dir / repo.name
|
||||||
|
try:
|
||||||
|
ensure_repo(repo_path, repo.url, repo.branch, sync=sync_repos)
|
||||||
|
except Exception as exc:
|
||||||
|
issues.append(ValidationIssue(str(exc), context=str(repo_path)))
|
||||||
|
continue
|
||||||
|
content_root = repo_path / repo.root_subdir if repo.root_subdir else repo_path
|
||||||
|
if not content_root.exists():
|
||||||
|
issues.append(ValidationIssue("Repository content root missing", context=str(content_root)))
|
||||||
|
continue
|
||||||
|
sources.append(
|
||||||
|
(
|
||||||
|
Source(name=repo.name, root_path=content_root, identity_root=repo_path, kind="git"),
|
||||||
|
content_root,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
for directory in config.directories:
|
||||||
|
root_path = directory.path
|
||||||
|
if not root_path.exists():
|
||||||
|
issues.append(ValidationIssue("Directory not found", context=str(root_path)))
|
||||||
|
continue
|
||||||
|
content_root = root_path / directory.root_subdir if directory.root_subdir else root_path
|
||||||
|
if not content_root.exists():
|
||||||
|
issues.append(ValidationIssue("Directory content root missing", context=str(content_root)))
|
||||||
|
continue
|
||||||
|
sources.append(
|
||||||
|
(
|
||||||
|
Source(name=directory.name, root_path=content_root, identity_root=root_path, kind="dir"),
|
||||||
|
content_root,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
return sources
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_directory(
|
||||||
|
source: Source,
|
||||||
|
directory: Path,
|
||||||
|
context: _Context,
|
||||||
|
state: State,
|
||||||
|
issues: List[ValidationIssue],
|
||||||
|
posts: List[PostPlan],
|
||||||
|
force_new: bool,
|
||||||
|
) -> None:
|
||||||
|
manifest_path = directory / ".wp-materialize.json"
|
||||||
|
manifest = load_manifest(manifest_path, issues)
|
||||||
|
if manifest is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
effective_categories = _merge_inherit(context.categories, manifest.categories)
|
||||||
|
effective_tags = _merge_inherit(context.tags, manifest.tags)
|
||||||
|
effective_author = _merge_inherit(context.author, manifest.author)
|
||||||
|
effective_renderer = manifest.renderer if manifest.renderer is not None else context.renderer
|
||||||
|
effective_hard_line_breaks = (
|
||||||
|
manifest.hard_line_breaks
|
||||||
|
if manifest.hard_line_breaks is not None
|
||||||
|
else context.hard_line_breaks
|
||||||
|
)
|
||||||
|
effective_block_html = (
|
||||||
|
manifest.block_html
|
||||||
|
if manifest.block_html is not None
|
||||||
|
else context.block_html
|
||||||
|
)
|
||||||
|
effective_subdirs = _merge_inherit(context.subdirectories, manifest.subdirectories)
|
||||||
|
|
||||||
|
manifest_chain = context.manifest_chain + [manifest.path]
|
||||||
|
|
||||||
|
for file_name, spec in manifest.files.items():
|
||||||
|
file_path = directory / file_name
|
||||||
|
if not file_path.exists():
|
||||||
|
issues.append(ValidationIssue("File not found", context=str(file_path)))
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
content = file_path.read_text(encoding="utf-8")
|
||||||
|
except Exception as exc:
|
||||||
|
issues.append(ValidationIssue(f"Failed to read file: {exc}", context=str(file_path)))
|
||||||
|
continue
|
||||||
|
|
||||||
|
title = spec.title
|
||||||
|
markdown_body = content
|
||||||
|
if spec.use_heading_level is not None:
|
||||||
|
extracted = extract_title(
|
||||||
|
content,
|
||||||
|
level=spec.use_heading_level,
|
||||||
|
strict=spec.use_heading_strict,
|
||||||
|
context=str(file_path),
|
||||||
|
issues=issues,
|
||||||
|
)
|
||||||
|
if extracted is None:
|
||||||
|
continue
|
||||||
|
title, markdown_body = extracted
|
||||||
|
elif not title:
|
||||||
|
issues.append(ValidationIssue("Missing title (title or use_heading_as_title required)", context=str(file_path)))
|
||||||
|
continue
|
||||||
|
|
||||||
|
resolved_categories = _resolve_overrides(effective_categories, spec.categories)
|
||||||
|
resolved_tags = _resolve_overrides(effective_tags, spec.tags)
|
||||||
|
|
||||||
|
resolved_categories = _normalize_list(resolved_categories, "category", str(file_path), issues)
|
||||||
|
resolved_tags = _normalize_list(resolved_tags, "tag", str(file_path), issues)
|
||||||
|
resolved_author = _resolve_author(effective_author.content, str(file_path), issues)
|
||||||
|
|
||||||
|
resolved_renderer = spec.renderer if spec.renderer is not None else effective_renderer
|
||||||
|
resolved_hard_line_breaks = (
|
||||||
|
spec.hard_line_breaks
|
||||||
|
if spec.hard_line_breaks is not None
|
||||||
|
else effective_hard_line_breaks
|
||||||
|
)
|
||||||
|
resolved_block_html = (
|
||||||
|
spec.block_html
|
||||||
|
if spec.block_html is not None
|
||||||
|
else effective_block_html
|
||||||
|
)
|
||||||
|
html = convert_markdown(
|
||||||
|
markdown_body,
|
||||||
|
context=str(file_path),
|
||||||
|
issues=issues,
|
||||||
|
renderer=resolved_renderer or "default",
|
||||||
|
hard_line_breaks=resolved_hard_line_breaks,
|
||||||
|
block_html=resolved_block_html,
|
||||||
|
)
|
||||||
|
if html is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
relative_path = _relative_path(file_path, source.identity_root, issues)
|
||||||
|
if relative_path is None:
|
||||||
|
continue
|
||||||
|
|
||||||
|
timestamps = []
|
||||||
|
ts = _timestamp_for_path(source, source.identity_root, relative_path, issues)
|
||||||
|
if ts is None:
|
||||||
|
continue
|
||||||
|
timestamps.append(ts)
|
||||||
|
|
||||||
|
for manifest_file in manifest_chain:
|
||||||
|
manifest_rel = _relative_path(manifest_file, source.identity_root, issues)
|
||||||
|
if manifest_rel is None:
|
||||||
|
continue
|
||||||
|
ts_manifest = _timestamp_for_path(source, source.identity_root, manifest_rel, issues)
|
||||||
|
if ts_manifest is None:
|
||||||
|
continue
|
||||||
|
timestamps.append(ts_manifest)
|
||||||
|
|
||||||
|
source_timestamp = max(timestamps)
|
||||||
|
identity = f"{source.name}:{relative_path}"
|
||||||
|
cached_entry = state.posts.get(identity)
|
||||||
|
cached_ts = cached_entry.source_timestamp if cached_entry else None
|
||||||
|
should_update = True if force_new else (cached_ts is None or source_timestamp > cached_ts)
|
||||||
|
created_on, last_modified = _resolve_post_datetimes(
|
||||||
|
source=source,
|
||||||
|
identity_root=source.identity_root,
|
||||||
|
relative_path=relative_path,
|
||||||
|
spec=spec,
|
||||||
|
issues=issues,
|
||||||
|
)
|
||||||
|
|
||||||
|
posts.append(
|
||||||
|
PostPlan(
|
||||||
|
source=source,
|
||||||
|
identity=identity,
|
||||||
|
relative_path=relative_path,
|
||||||
|
absolute_path=file_path,
|
||||||
|
title=title,
|
||||||
|
html=html,
|
||||||
|
categories=resolved_categories,
|
||||||
|
tags=resolved_tags,
|
||||||
|
author=resolved_author,
|
||||||
|
source_timestamp=source_timestamp,
|
||||||
|
cached_timestamp=cached_ts,
|
||||||
|
should_update=should_update,
|
||||||
|
created_on=created_on,
|
||||||
|
last_modified=last_modified,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
for subdir in effective_subdirs.content:
|
||||||
|
subdir_path = directory / subdir
|
||||||
|
if not subdir_path.exists():
|
||||||
|
issues.append(ValidationIssue("Missing subdirectory", context=str(subdir_path)))
|
||||||
|
continue
|
||||||
|
_validate_directory(
|
||||||
|
source=source,
|
||||||
|
directory=subdir_path,
|
||||||
|
context=_Context(
|
||||||
|
categories=effective_categories,
|
||||||
|
tags=effective_tags,
|
||||||
|
author=effective_author,
|
||||||
|
renderer=effective_renderer,
|
||||||
|
hard_line_breaks=effective_hard_line_breaks,
|
||||||
|
block_html=effective_block_html,
|
||||||
|
subdirectories=effective_subdirs,
|
||||||
|
manifest_chain=manifest_chain,
|
||||||
|
),
|
||||||
|
state=state,
|
||||||
|
issues=issues,
|
||||||
|
posts=posts,
|
||||||
|
force_new=force_new,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _merge_inherit(parent: InheritList, child: InheritList) -> InheritList:
|
||||||
|
if child.inherit:
|
||||||
|
content = parent.content + child.content
|
||||||
|
else:
|
||||||
|
content = child.content
|
||||||
|
return InheritList(content=content, inherit=True)
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_overrides(parent: InheritList, override: Optional[InheritList]) -> List[str]:
|
||||||
|
if override is None:
|
||||||
|
return list(parent.content)
|
||||||
|
if override.inherit:
|
||||||
|
return parent.content + override.content
|
||||||
|
return list(override.content)
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize_list(values: List[str], label: str, context: str, issues: List[ValidationIssue]) -> List[str]:
|
||||||
|
normalized: List[str] = []
|
||||||
|
seen: Set[str] = set()
|
||||||
|
for value in values:
|
||||||
|
cleaned = value.strip()
|
||||||
|
if not cleaned:
|
||||||
|
issues.append(ValidationIssue(f"Empty {label} entry", context=context))
|
||||||
|
continue
|
||||||
|
if label == "category":
|
||||||
|
parts = [part.strip() for part in cleaned.split("/")]
|
||||||
|
if any(not part for part in parts):
|
||||||
|
issues.append(ValidationIssue(f"Invalid category path: {cleaned}", context=context))
|
||||||
|
continue
|
||||||
|
cleaned = "/".join(parts)
|
||||||
|
if cleaned not in seen:
|
||||||
|
seen.add(cleaned)
|
||||||
|
normalized.append(cleaned)
|
||||||
|
return normalized
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_author(values: List[str], context: str, issues: List[ValidationIssue]) -> Optional[str]:
|
||||||
|
normalized = _normalize_list(values, "author", context, issues)
|
||||||
|
if not normalized:
|
||||||
|
return None
|
||||||
|
if len(normalized) > 1:
|
||||||
|
issues.append(ValidationIssue("Multiple authors specified; only one is allowed", context=context))
|
||||||
|
return None
|
||||||
|
return normalized[0]
|
||||||
|
|
||||||
|
|
||||||
|
def _relative_path(path: Path, root: Path, issues: List[ValidationIssue]) -> Optional[str]:
|
||||||
|
try:
|
||||||
|
return str(path.relative_to(root))
|
||||||
|
except ValueError:
|
||||||
|
issues.append(ValidationIssue("Path is outside identity root", context=str(path)))
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _timestamp_for_path(
|
||||||
|
source: Source,
|
||||||
|
identity_root: Path,
|
||||||
|
relative_path: str,
|
||||||
|
issues: List[ValidationIssue],
|
||||||
|
) -> Optional[int]:
|
||||||
|
if source.kind == "git":
|
||||||
|
try:
|
||||||
|
return git_timestamp(identity_root, relative_path)
|
||||||
|
except Exception as exc:
|
||||||
|
issues.append(ValidationIssue(str(exc), context=relative_path))
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
return int((identity_root / relative_path).stat().st_mtime)
|
||||||
|
except Exception as exc:
|
||||||
|
issues.append(ValidationIssue(f"Timestamp lookup failed: {exc}", context=relative_path))
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_post_datetimes(
|
||||||
|
source: Source,
|
||||||
|
identity_root: Path,
|
||||||
|
relative_path: str,
|
||||||
|
spec,
|
||||||
|
issues: List[ValidationIssue],
|
||||||
|
) -> tuple[Optional[str], Optional[str]]:
|
||||||
|
created_dt = spec.created_on
|
||||||
|
modified_dt = spec.last_modified
|
||||||
|
|
||||||
|
if created_dt is None or modified_dt is None:
|
||||||
|
inferred = _infer_file_timestamps(source, identity_root, relative_path, issues)
|
||||||
|
if inferred is None:
|
||||||
|
return None, None
|
||||||
|
inferred_created, inferred_modified = inferred
|
||||||
|
if created_dt is None:
|
||||||
|
created_dt = datetime.fromtimestamp(inferred_created)
|
||||||
|
if modified_dt is None:
|
||||||
|
modified_dt = datetime.fromtimestamp(inferred_modified)
|
||||||
|
|
||||||
|
if created_dt and modified_dt and modified_dt < created_dt:
|
||||||
|
issues.append(
|
||||||
|
ValidationIssue("last_modified cannot be earlier than created_on", context=relative_path)
|
||||||
|
)
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
created_on = _format_wp_datetime(created_dt) if created_dt else None
|
||||||
|
last_modified = _format_wp_datetime(modified_dt) if modified_dt else None
|
||||||
|
return created_on, last_modified
|
||||||
|
|
||||||
|
|
||||||
|
def _infer_file_timestamps(
|
||||||
|
source: Source,
|
||||||
|
identity_root: Path,
|
||||||
|
relative_path: str,
|
||||||
|
issues: List[ValidationIssue],
|
||||||
|
) -> Optional[tuple[int, int]]:
|
||||||
|
if source.kind == "git":
|
||||||
|
try:
|
||||||
|
created_ts = git_first_timestamp(identity_root, relative_path)
|
||||||
|
modified_ts = git_timestamp(identity_root, relative_path)
|
||||||
|
return created_ts, modified_ts
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
stat = (identity_root / relative_path).stat()
|
||||||
|
return int(stat.st_ctime), int(stat.st_mtime)
|
||||||
|
except Exception as exc:
|
||||||
|
issues.append(ValidationIssue(f"Timestamp lookup failed: {exc}", context=relative_path))
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _format_wp_datetime(value: datetime) -> str:
|
||||||
|
return value.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
||||||
|
|
||||||
|
def _plan_taxonomy(
|
||||||
|
posts: List[PostPlan],
|
||||||
|
categories, # list of CategoryTerm
|
||||||
|
existing_tags: Set[str],
|
||||||
|
) -> tuple[List[List[str]], List[str]]:
|
||||||
|
category_map: Dict[tuple[int, str], int] = {}
|
||||||
|
for category in categories:
|
||||||
|
category_map[(category.parent, category.name)] = category.term_id
|
||||||
|
|
||||||
|
missing_paths: List[List[str]] = []
|
||||||
|
seen_missing: Set[tuple[str, ...]] = set()
|
||||||
|
missing_tags: List[str] = []
|
||||||
|
seen_tags: Set[str] = set()
|
||||||
|
|
||||||
|
for post in posts:
|
||||||
|
if not post.should_update:
|
||||||
|
continue
|
||||||
|
for tag in post.tags:
|
||||||
|
if tag not in existing_tags:
|
||||||
|
if tag not in seen_tags:
|
||||||
|
seen_tags.add(tag)
|
||||||
|
missing_tags.append(tag)
|
||||||
|
for path in post.categories:
|
||||||
|
segments = [segment for segment in path.split("/") if segment]
|
||||||
|
if not segments:
|
||||||
|
continue
|
||||||
|
parent = 0
|
||||||
|
missing = False
|
||||||
|
for segment in segments:
|
||||||
|
key = (parent, segment)
|
||||||
|
if key in category_map:
|
||||||
|
parent = category_map[key]
|
||||||
|
continue
|
||||||
|
missing = True
|
||||||
|
break
|
||||||
|
if missing:
|
||||||
|
key = tuple(segments)
|
||||||
|
if key not in seen_missing:
|
||||||
|
seen_missing.add(key)
|
||||||
|
missing_paths.append(list(segments))
|
||||||
|
|
||||||
|
return missing_paths, missing_tags
|
||||||
Reference in New Issue
Block a user