diff --git a/src/validation.py b/src/validation.py new file mode 100644 index 0000000..c7fb169 --- /dev/null +++ b/src/validation.py @@ -0,0 +1,469 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +import shutil +from typing import Dict, List, Optional, Set + +from .config import Config +from .errors import ValidationError, ValidationIssue +from .git_utils import ensure_repo, git_first_timestamp, git_timestamp +from .manifest import load_manifest +from .markdown_utils import convert_markdown, extract_title +from .models import EvaluationResult, InheritList, PostPlan, Source, TaxonomyPlan +from .state import State +from .wp_cli import WordPressCLI + + +@dataclass +class _Context: + categories: InheritList + tags: InheritList + author: InheritList + renderer: Optional[str] + hard_line_breaks: bool + block_html: bool + subdirectories: InheritList + manifest_chain: List[Path] + + +def validate( + config: Config, + state: State, + sync_repos: bool, + force_new: bool = False, + skip_wp_checks: bool = False, +) -> EvaluationResult: + issues: List[ValidationIssue] = [] + + sources = _load_sources(config, sync_repos, issues) + + posts: List[PostPlan] = [] + for source, content_root in sources: + _validate_directory( + source=source, + directory=content_root, + context=_Context( + categories=InheritList(), + tags=InheritList(), + author=InheritList(), + renderer=config.renderer, + hard_line_breaks=config.hard_line_breaks, + block_html=config.block_html, + subdirectories=InheritList(), + manifest_chain=[], + ), + state=state, + issues=issues, + posts=posts, + force_new=force_new, + ) + + missing_categories: List[List[str]] = [] + missing_tags: List[str] = [] + if not skip_wp_checks: + if shutil.which("wp") is None: + issues.append(ValidationIssue("wp CLI not found in PATH", context=str(config.wordpress_root))) + categories = [] + tag_names: Set[str] = set() + try: + wp = WordPressCLI(config.wordpress_root) + categories = wp.list_categories() + tags = wp.list_tags() + tag_names = {tag.name for tag in tags} + except Exception as exc: + issues.append(ValidationIssue(str(exc), context=str(config.wordpress_root))) + + missing_categories, missing_tags = _plan_taxonomy(posts, categories, tag_names) + + if issues: + raise ValidationError(issues) + + return EvaluationResult( + posts=posts, + taxonomy_to_create=TaxonomyPlan(missing_categories=missing_categories, missing_tags=missing_tags), + ) + + +def _load_sources( + config: Config, + sync_repos: bool, + issues: List[ValidationIssue], +) -> List[tuple[Source, Path]]: + sources: List[tuple[Source, Path]] = [] + + for repo in config.git_repositories: + repo_path = config.repo_storage_dir / repo.name + try: + ensure_repo(repo_path, repo.url, repo.branch, sync=sync_repos) + except Exception as exc: + issues.append(ValidationIssue(str(exc), context=str(repo_path))) + continue + content_root = repo_path / repo.root_subdir if repo.root_subdir else repo_path + if not content_root.exists(): + issues.append(ValidationIssue("Repository content root missing", context=str(content_root))) + continue + sources.append( + ( + Source(name=repo.name, root_path=content_root, identity_root=repo_path, kind="git"), + content_root, + ) + ) + + for directory in config.directories: + root_path = directory.path + if not root_path.exists(): + issues.append(ValidationIssue("Directory not found", context=str(root_path))) + continue + content_root = root_path / directory.root_subdir if directory.root_subdir else root_path + if not content_root.exists(): + issues.append(ValidationIssue("Directory content root missing", context=str(content_root))) + continue + sources.append( + ( + Source(name=directory.name, root_path=content_root, identity_root=root_path, kind="dir"), + content_root, + ) + ) + + return sources + + +def _validate_directory( + source: Source, + directory: Path, + context: _Context, + state: State, + issues: List[ValidationIssue], + posts: List[PostPlan], + force_new: bool, +) -> None: + manifest_path = directory / ".wp-materialize.json" + manifest = load_manifest(manifest_path, issues) + if manifest is None: + return + + effective_categories = _merge_inherit(context.categories, manifest.categories) + effective_tags = _merge_inherit(context.tags, manifest.tags) + effective_author = _merge_inherit(context.author, manifest.author) + effective_renderer = manifest.renderer if manifest.renderer is not None else context.renderer + effective_hard_line_breaks = ( + manifest.hard_line_breaks + if manifest.hard_line_breaks is not None + else context.hard_line_breaks + ) + effective_block_html = ( + manifest.block_html + if manifest.block_html is not None + else context.block_html + ) + effective_subdirs = _merge_inherit(context.subdirectories, manifest.subdirectories) + + manifest_chain = context.manifest_chain + [manifest.path] + + for file_name, spec in manifest.files.items(): + file_path = directory / file_name + if not file_path.exists(): + issues.append(ValidationIssue("File not found", context=str(file_path))) + continue + + try: + content = file_path.read_text(encoding="utf-8") + except Exception as exc: + issues.append(ValidationIssue(f"Failed to read file: {exc}", context=str(file_path))) + continue + + title = spec.title + markdown_body = content + if spec.use_heading_level is not None: + extracted = extract_title( + content, + level=spec.use_heading_level, + strict=spec.use_heading_strict, + context=str(file_path), + issues=issues, + ) + if extracted is None: + continue + title, markdown_body = extracted + elif not title: + issues.append(ValidationIssue("Missing title (title or use_heading_as_title required)", context=str(file_path))) + continue + + resolved_categories = _resolve_overrides(effective_categories, spec.categories) + resolved_tags = _resolve_overrides(effective_tags, spec.tags) + + resolved_categories = _normalize_list(resolved_categories, "category", str(file_path), issues) + resolved_tags = _normalize_list(resolved_tags, "tag", str(file_path), issues) + resolved_author = _resolve_author(effective_author.content, str(file_path), issues) + + resolved_renderer = spec.renderer if spec.renderer is not None else effective_renderer + resolved_hard_line_breaks = ( + spec.hard_line_breaks + if spec.hard_line_breaks is not None + else effective_hard_line_breaks + ) + resolved_block_html = ( + spec.block_html + if spec.block_html is not None + else effective_block_html + ) + html = convert_markdown( + markdown_body, + context=str(file_path), + issues=issues, + renderer=resolved_renderer or "default", + hard_line_breaks=resolved_hard_line_breaks, + block_html=resolved_block_html, + ) + if html is None: + continue + + relative_path = _relative_path(file_path, source.identity_root, issues) + if relative_path is None: + continue + + timestamps = [] + ts = _timestamp_for_path(source, source.identity_root, relative_path, issues) + if ts is None: + continue + timestamps.append(ts) + + for manifest_file in manifest_chain: + manifest_rel = _relative_path(manifest_file, source.identity_root, issues) + if manifest_rel is None: + continue + ts_manifest = _timestamp_for_path(source, source.identity_root, manifest_rel, issues) + if ts_manifest is None: + continue + timestamps.append(ts_manifest) + + source_timestamp = max(timestamps) + identity = f"{source.name}:{relative_path}" + cached_entry = state.posts.get(identity) + cached_ts = cached_entry.source_timestamp if cached_entry else None + should_update = True if force_new else (cached_ts is None or source_timestamp > cached_ts) + created_on, last_modified = _resolve_post_datetimes( + source=source, + identity_root=source.identity_root, + relative_path=relative_path, + spec=spec, + issues=issues, + ) + + posts.append( + PostPlan( + source=source, + identity=identity, + relative_path=relative_path, + absolute_path=file_path, + title=title, + html=html, + categories=resolved_categories, + tags=resolved_tags, + author=resolved_author, + source_timestamp=source_timestamp, + cached_timestamp=cached_ts, + should_update=should_update, + created_on=created_on, + last_modified=last_modified, + ) + ) + + for subdir in effective_subdirs.content: + subdir_path = directory / subdir + if not subdir_path.exists(): + issues.append(ValidationIssue("Missing subdirectory", context=str(subdir_path))) + continue + _validate_directory( + source=source, + directory=subdir_path, + context=_Context( + categories=effective_categories, + tags=effective_tags, + author=effective_author, + renderer=effective_renderer, + hard_line_breaks=effective_hard_line_breaks, + block_html=effective_block_html, + subdirectories=effective_subdirs, + manifest_chain=manifest_chain, + ), + state=state, + issues=issues, + posts=posts, + force_new=force_new, + ) + + +def _merge_inherit(parent: InheritList, child: InheritList) -> InheritList: + if child.inherit: + content = parent.content + child.content + else: + content = child.content + return InheritList(content=content, inherit=True) + + +def _resolve_overrides(parent: InheritList, override: Optional[InheritList]) -> List[str]: + if override is None: + return list(parent.content) + if override.inherit: + return parent.content + override.content + return list(override.content) + + +def _normalize_list(values: List[str], label: str, context: str, issues: List[ValidationIssue]) -> List[str]: + normalized: List[str] = [] + seen: Set[str] = set() + for value in values: + cleaned = value.strip() + if not cleaned: + issues.append(ValidationIssue(f"Empty {label} entry", context=context)) + continue + if label == "category": + parts = [part.strip() for part in cleaned.split("/")] + if any(not part for part in parts): + issues.append(ValidationIssue(f"Invalid category path: {cleaned}", context=context)) + continue + cleaned = "/".join(parts) + if cleaned not in seen: + seen.add(cleaned) + normalized.append(cleaned) + return normalized + + +def _resolve_author(values: List[str], context: str, issues: List[ValidationIssue]) -> Optional[str]: + normalized = _normalize_list(values, "author", context, issues) + if not normalized: + return None + if len(normalized) > 1: + issues.append(ValidationIssue("Multiple authors specified; only one is allowed", context=context)) + return None + return normalized[0] + + +def _relative_path(path: Path, root: Path, issues: List[ValidationIssue]) -> Optional[str]: + try: + return str(path.relative_to(root)) + except ValueError: + issues.append(ValidationIssue("Path is outside identity root", context=str(path))) + return None + + +def _timestamp_for_path( + source: Source, + identity_root: Path, + relative_path: str, + issues: List[ValidationIssue], +) -> Optional[int]: + if source.kind == "git": + try: + return git_timestamp(identity_root, relative_path) + except Exception as exc: + issues.append(ValidationIssue(str(exc), context=relative_path)) + return None + try: + return int((identity_root / relative_path).stat().st_mtime) + except Exception as exc: + issues.append(ValidationIssue(f"Timestamp lookup failed: {exc}", context=relative_path)) + return None + + +def _resolve_post_datetimes( + source: Source, + identity_root: Path, + relative_path: str, + spec, + issues: List[ValidationIssue], +) -> tuple[Optional[str], Optional[str]]: + created_dt = spec.created_on + modified_dt = spec.last_modified + + if created_dt is None or modified_dt is None: + inferred = _infer_file_timestamps(source, identity_root, relative_path, issues) + if inferred is None: + return None, None + inferred_created, inferred_modified = inferred + if created_dt is None: + created_dt = datetime.fromtimestamp(inferred_created) + if modified_dt is None: + modified_dt = datetime.fromtimestamp(inferred_modified) + + if created_dt and modified_dt and modified_dt < created_dt: + issues.append( + ValidationIssue("last_modified cannot be earlier than created_on", context=relative_path) + ) + return None, None + + created_on = _format_wp_datetime(created_dt) if created_dt else None + last_modified = _format_wp_datetime(modified_dt) if modified_dt else None + return created_on, last_modified + + +def _infer_file_timestamps( + source: Source, + identity_root: Path, + relative_path: str, + issues: List[ValidationIssue], +) -> Optional[tuple[int, int]]: + if source.kind == "git": + try: + created_ts = git_first_timestamp(identity_root, relative_path) + modified_ts = git_timestamp(identity_root, relative_path) + return created_ts, modified_ts + except Exception: + pass + try: + stat = (identity_root / relative_path).stat() + return int(stat.st_ctime), int(stat.st_mtime) + except Exception as exc: + issues.append(ValidationIssue(f"Timestamp lookup failed: {exc}", context=relative_path)) + return None + + +def _format_wp_datetime(value: datetime) -> str: + return value.strftime("%Y-%m-%d %H:%M:%S") + + +def _plan_taxonomy( + posts: List[PostPlan], + categories, # list of CategoryTerm + existing_tags: Set[str], +) -> tuple[List[List[str]], List[str]]: + category_map: Dict[tuple[int, str], int] = {} + for category in categories: + category_map[(category.parent, category.name)] = category.term_id + + missing_paths: List[List[str]] = [] + seen_missing: Set[tuple[str, ...]] = set() + missing_tags: List[str] = [] + seen_tags: Set[str] = set() + + for post in posts: + if not post.should_update: + continue + for tag in post.tags: + if tag not in existing_tags: + if tag not in seen_tags: + seen_tags.add(tag) + missing_tags.append(tag) + for path in post.categories: + segments = [segment for segment in path.split("/") if segment] + if not segments: + continue + parent = 0 + missing = False + for segment in segments: + key = (parent, segment) + if key in category_map: + parent = category_map[key] + continue + missing = True + break + if missing: + key = tuple(segments) + if key not in seen_missing: + seen_missing.add(key) + missing_paths.append(list(segments)) + + return missing_paths, missing_tags