initial commit: codex draft

This commit is contained in:
2026-02-04 21:29:17 -05:00
commit 68bfab9c17
19 changed files with 1838 additions and 0 deletions

3
src/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
__all__ = ["__version__"]
__version__ = "0.1.0"

101
src/apply.py Normal file
View File

@@ -0,0 +1,101 @@
from __future__ import annotations
import time
from typing import Dict, List, Set
from .errors import WordPressError
from .models import EvaluationResult, PostPlan
from .state import PostState, State, save_state
from .wp_cli import CategoryTerm, WordPressCLI
def apply_changes(
result: EvaluationResult,
wp: WordPressCLI,
state: State,
state_path,
) -> None:
categories = wp.list_categories()
category_map = _build_category_map(categories)
_create_missing_categories(result, wp, category_map)
successes: Set[str] = set()
try:
for post in result.posts:
if not post.should_update:
continue
_apply_post(post, wp, category_map)
state.posts[post.identity] = PostState(
source_timestamp=post.source_timestamp,
materialized_at=int(time.time()),
)
successes.add(post.identity)
except Exception:
if successes:
save_state(state_path, state)
raise
save_state(state_path, state)
def _build_category_map(categories: List[CategoryTerm]) -> Dict[tuple[int, str], int]:
return {(category.parent, category.name): category.term_id for category in categories}
def _create_missing_categories(
result: EvaluationResult,
wp: WordPressCLI,
category_map: Dict[tuple[int, str], int],
) -> None:
paths = result.categories_to_create.missing_paths
paths = sorted(paths, key=len)
seen: Set[tuple[str, ...]] = set()
for segments in paths:
key = tuple(segments)
if key in seen:
continue
seen.add(key)
parent = 0
for segment in segments:
map_key = (parent, segment)
if map_key in category_map:
parent = category_map[map_key]
continue
new_id = wp.create_category(segment, parent)
category_map[(parent, segment)] = new_id
parent = new_id
def _apply_post(post: PostPlan, wp: WordPressCLI, category_map: Dict[tuple[int, str], int]) -> None:
category_ids: List[int] = []
for path in post.categories:
segments = [segment for segment in path.split("/") if segment]
if not segments:
continue
parent = 0
for segment in segments:
map_key = (parent, segment)
if map_key not in category_map:
raise WordPressError(f"Missing category during apply: {path}")
parent = category_map[map_key]
category_ids.append(parent)
post_id = wp.find_post_id(post.identity)
if post_id is None:
wp.create_post(
title=post.title,
content=post.html,
categories=category_ids,
tags=post.tags,
source_identity=post.identity,
)
return
wp.update_post(
post_id=post_id,
title=post.title,
content=post.html,
categories=category_ids,
tags=post.tags,
)

101
src/cli.py Normal file
View File

@@ -0,0 +1,101 @@
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from .apply import apply_changes
from .config import load_config
from .errors import ConfigurationError, MaterializeError, ValidationError
from .evaluation import evaluate
from .state import load_state
from .wp_cli import WordPressCLI
def main() -> int:
parser = argparse.ArgumentParser(description="wp-materialize")
parser.add_argument("command", nargs="?", choices=["evaluate", "apply"], default="evaluate")
parser.add_argument("--config", type=Path, default=_default_config_path())
parser.add_argument("--state", type=Path, default=_default_state_path())
parser.add_argument("--no-sync", action="store_true", help="Skip git clone/pull")
parser.add_argument("--json", action="store_true", help="Output evaluation summary as JSON")
args = parser.parse_args()
try:
config = load_config(args.config)
state = load_state(args.state)
result = evaluate(config, state, sync_repos=not args.no_sync)
except ValidationError as exc:
_print_validation_error(exc)
return 1
except (ConfigurationError, MaterializeError) as exc:
print(f"Error: {exc}", file=sys.stderr)
return 1
if args.json:
print(_evaluation_json(result))
else:
print(_evaluation_summary(result))
if args.command == "apply":
wp = WordPressCLI(config.wordpress_root)
try:
apply_changes(result, wp, state, args.state)
except MaterializeError as exc:
print(f"Error: {exc}", file=sys.stderr)
return 1
print("Apply complete")
return 0
def _default_config_path() -> Path:
return Path.home() / ".config" / "wp-materialize" / "config.json"
def _default_state_path() -> Path:
return Path.home() / ".config" / "wp-materialize" / "state.json"
def _evaluation_summary(result) -> str:
total = len(result.posts)
updates = sum(1 for post in result.posts if post.should_update)
categories = len(result.categories_to_create.missing_paths)
lines = [
f"Posts: {total}",
f"Posts to update: {updates}",
f"Categories to create: {categories}",
]
return "\n".join(lines)
def _evaluation_json(result) -> str:
payload = {
"posts": [
{
"identity": post.identity,
"relative_path": post.relative_path,
"title": post.title,
"source_timestamp": post.source_timestamp,
"cached_timestamp": post.cached_timestamp,
"should_update": post.should_update,
"categories": post.categories,
"tags": post.tags,
}
for post in result.posts
],
"categories_to_create": result.categories_to_create.missing_paths,
}
return json.dumps(payload, indent=2)
def _print_validation_error(exc: ValidationError) -> None:
print("Validation failed:", file=sys.stderr)
for issue in exc.issues:
print(f"- {issue.format()}", file=sys.stderr)
if __name__ == "__main__":
raise SystemExit(main())

104
src/config.py Normal file
View File

@@ -0,0 +1,104 @@
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional
from .errors import ConfigurationError
@dataclass(frozen=True)
class GitRepository:
name: str
url: str
branch: str
root_subdir: Optional[str]
@dataclass(frozen=True)
class DirectorySpec:
name: str
path: Path
root_subdir: Optional[str]
@dataclass(frozen=True)
class Config:
wordpress_root: Path
repo_storage_dir: Path
git_repositories: List[GitRepository]
directories: List[DirectorySpec]
def _expect_keys(obj: dict, allowed: set[str], context: str) -> None:
extra = set(obj.keys()) - allowed
if extra:
raise ConfigurationError(f"Unexpected keys in {context}: {sorted(extra)}")
def load_config(path: Path) -> Config:
if not path.exists():
raise ConfigurationError(f"Config not found: {path}")
try:
data = json.loads(path.read_text())
except json.JSONDecodeError as exc:
raise ConfigurationError(f"Invalid JSON in config: {exc}") from exc
if not isinstance(data, dict):
raise ConfigurationError("Config must be a JSON object")
_expect_keys(data, {"wordpress_root", "repo_storage_dir", "git_repositories", "directories"}, "config")
wordpress_root = _require_path(data, "wordpress_root", required=True)
repo_storage_dir = _require_path(data, "repo_storage_dir", required=True)
git_repositories = []
for idx, repo in enumerate(data.get("git_repositories", []) or []):
if not isinstance(repo, dict):
raise ConfigurationError(f"git_repositories[{idx}] must be an object")
_expect_keys(repo, {"name", "url", "branch", "root_subdir"}, f"git_repositories[{idx}]")
name = _require_str(repo, "name", context=f"git_repositories[{idx}]")
url = _require_str(repo, "url", context=f"git_repositories[{idx}]")
branch = repo.get("branch", "main")
if not isinstance(branch, str):
raise ConfigurationError(f"git_repositories[{idx}].branch must be a string")
root_subdir = repo.get("root_subdir")
if root_subdir is not None and not isinstance(root_subdir, str):
raise ConfigurationError(f"git_repositories[{idx}].root_subdir must be a string")
git_repositories.append(GitRepository(name=name, url=url, branch=branch, root_subdir=root_subdir))
directories = []
for idx, entry in enumerate(data.get("directories", []) or []):
if not isinstance(entry, dict):
raise ConfigurationError(f"directories[{idx}] must be an object")
_expect_keys(entry, {"name", "path", "root_subdir"}, f"directories[{idx}]")
name = _require_str(entry, "name", context=f"directories[{idx}]")
path_value = _require_str(entry, "path", context=f"directories[{idx}]")
root_subdir = entry.get("root_subdir")
if root_subdir is not None and not isinstance(root_subdir, str):
raise ConfigurationError(f"directories[{idx}].root_subdir must be a string")
directories.append(DirectorySpec(name=name, path=Path(path_value), root_subdir=root_subdir))
return Config(
wordpress_root=wordpress_root,
repo_storage_dir=repo_storage_dir,
git_repositories=git_repositories,
directories=directories,
)
def _require_str(data: dict, key: str, context: str) -> str:
value = data.get(key)
if not isinstance(value, str) or not value.strip():
raise ConfigurationError(f"{context}.{key} must be a non-empty string")
return value
def _require_path(data: dict, key: str, required: bool) -> Path:
value = data.get(key)
if value is None and not required:
return Path(".")
if not isinstance(value, str) or not value.strip():
raise ConfigurationError(f"{key} must be a non-empty string")
return Path(value)

34
src/errors.py Normal file
View File

@@ -0,0 +1,34 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Iterable, List
class MaterializeError(Exception):
"""Base error for wp-materialize."""
@dataclass
class ValidationIssue:
message: str
context: str | None = None
def format(self) -> str:
if self.context:
return f"{self.context}: {self.message}"
return self.message
class ValidationError(MaterializeError):
def __init__(self, issues: Iterable[ValidationIssue]):
self.issues: List[ValidationIssue] = list(issues)
message = "\n".join(issue.format() for issue in self.issues)
super().__init__(message)
class ConfigurationError(MaterializeError):
pass
class WordPressError(MaterializeError):
pass

331
src/evaluation.py Normal file
View File

@@ -0,0 +1,331 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import shutil
from typing import Dict, List, Optional, Set
from .config import Config
from .errors import ValidationError, ValidationIssue
from .git_utils import ensure_repo, git_timestamp
from .manifest import load_manifest
from .markdown_utils import convert_markdown, extract_title
from .models import CategoryPlan, EvaluationResult, InheritList, Manifest, PostPlan, Source
from .state import State
from .wp_cli import WordPressCLI
@dataclass
class _Context:
categories: InheritList
tags: InheritList
subdirectories: InheritList
manifest_chain: List[Path]
def evaluate(config: Config, state: State, sync_repos: bool) -> EvaluationResult:
issues: List[ValidationIssue] = []
sources = _load_sources(config, sync_repos, issues)
posts: List[PostPlan] = []
for source, content_root in sources:
_evaluate_directory(
source=source,
directory=content_root,
context=_Context(
categories=InheritList(),
tags=InheritList(),
subdirectories=InheritList(),
manifest_chain=[],
),
state=state,
issues=issues,
posts=posts,
)
if shutil.which("wp") is None:
issues.append(ValidationIssue("wp CLI not found in PATH", context=str(config.wordpress_root)))
categories = []
tag_names: Set[str] = set()
try:
wp = WordPressCLI(config.wordpress_root)
categories = wp.list_categories()
tags = wp.list_tags()
tag_names = {tag.name for tag in tags}
except Exception as exc:
issues.append(ValidationIssue(str(exc), context=str(config.wordpress_root)))
missing_categories = _plan_categories(posts, categories, issues, tag_names)
if issues:
raise ValidationError(issues)
return EvaluationResult(posts=posts, categories_to_create=CategoryPlan(missing_paths=missing_categories))
def _load_sources(
config: Config,
sync_repos: bool,
issues: List[ValidationIssue],
) -> List[tuple[Source, Path]]:
sources: List[tuple[Source, Path]] = []
for repo in config.git_repositories:
repo_path = config.repo_storage_dir / repo.name
try:
ensure_repo(repo_path, repo.url, repo.branch, sync=sync_repos)
except Exception as exc:
issues.append(ValidationIssue(str(exc), context=str(repo_path)))
continue
content_root = repo_path / repo.root_subdir if repo.root_subdir else repo_path
if not content_root.exists():
issues.append(ValidationIssue("Repository content root missing", context=str(content_root)))
continue
sources.append(
(
Source(name=repo.name, root_path=content_root, identity_root=repo_path, kind="git"),
content_root,
)
)
for directory in config.directories:
root_path = directory.path
if not root_path.exists():
issues.append(ValidationIssue("Directory not found", context=str(root_path)))
continue
content_root = root_path / directory.root_subdir if directory.root_subdir else root_path
if not content_root.exists():
issues.append(ValidationIssue("Directory content root missing", context=str(content_root)))
continue
sources.append(
(
Source(name=directory.name, root_path=content_root, identity_root=root_path, kind="dir"),
content_root,
)
)
return sources
def _evaluate_directory(
source: Source,
directory: Path,
context: _Context,
state: State,
issues: List[ValidationIssue],
posts: List[PostPlan],
) -> None:
manifest_path = directory / ".wp-materialize.json"
manifest = load_manifest(manifest_path, issues)
if manifest is None:
return
effective_categories = _merge_inherit(context.categories, manifest.categories)
effective_tags = _merge_inherit(context.tags, manifest.tags)
effective_subdirs = _merge_inherit(context.subdirectories, manifest.subdirectories)
manifest_chain = context.manifest_chain + [manifest.path]
for file_name, spec in manifest.files.items():
file_path = directory / file_name
if not file_path.exists():
issues.append(ValidationIssue("File not found", context=str(file_path)))
continue
try:
content = file_path.read_text(encoding="utf-8")
except Exception as exc:
issues.append(ValidationIssue(f"Failed to read file: {exc}", context=str(file_path)))
continue
title = spec.title
markdown_body = content
if spec.use_heading_level is not None:
extracted = extract_title(
content,
level=spec.use_heading_level,
strict=spec.use_heading_strict,
context=str(file_path),
issues=issues,
)
if extracted is None:
continue
title, markdown_body = extracted
elif not title:
issues.append(ValidationIssue("Missing title (title or use_heading_as_title required)", context=str(file_path)))
continue
resolved_categories = _resolve_overrides(effective_categories, spec.categories)
resolved_tags = _resolve_overrides(effective_tags, spec.tags)
resolved_categories = _normalize_list(resolved_categories, "category", str(file_path), issues)
resolved_tags = _normalize_list(resolved_tags, "tag", str(file_path), issues)
html = convert_markdown(markdown_body, context=str(file_path), issues=issues)
if html is None:
continue
relative_path = _relative_path(file_path, source.identity_root, issues)
if relative_path is None:
continue
timestamps = []
ts = _timestamp_for_path(source, source.identity_root, relative_path, issues)
if ts is None:
continue
timestamps.append(ts)
for manifest_file in manifest_chain:
manifest_rel = _relative_path(manifest_file, source.identity_root, issues)
if manifest_rel is None:
continue
ts_manifest = _timestamp_for_path(source, source.identity_root, manifest_rel, issues)
if ts_manifest is None:
continue
timestamps.append(ts_manifest)
source_timestamp = max(timestamps)
identity = f"{source.name}:{relative_path}"
cached_entry = state.posts.get(identity)
cached_ts = cached_entry.source_timestamp if cached_entry else None
should_update = cached_ts is None or source_timestamp > cached_ts
posts.append(
PostPlan(
source=source,
identity=identity,
relative_path=relative_path,
absolute_path=file_path,
title=title,
html=html,
categories=resolved_categories,
tags=resolved_tags,
source_timestamp=source_timestamp,
cached_timestamp=cached_ts,
should_update=should_update,
)
)
for subdir in effective_subdirs.content:
subdir_path = directory / subdir
if not subdir_path.exists():
issues.append(ValidationIssue("Missing subdirectory", context=str(subdir_path)))
continue
_evaluate_directory(
source=source,
directory=subdir_path,
context=_Context(
categories=effective_categories,
tags=effective_tags,
subdirectories=effective_subdirs,
manifest_chain=manifest_chain,
),
state=state,
issues=issues,
posts=posts,
)
def _merge_inherit(parent: InheritList, child: InheritList) -> InheritList:
if child.inherit:
content = parent.content + child.content
else:
content = child.content
return InheritList(content=content, inherit=True)
def _resolve_overrides(parent: InheritList, override: Optional[InheritList]) -> List[str]:
if override is None:
return list(parent.content)
if override.inherit:
return parent.content + override.content
return list(override.content)
def _normalize_list(values: List[str], label: str, context: str, issues: List[ValidationIssue]) -> List[str]:
normalized: List[str] = []
seen: Set[str] = set()
for value in values:
cleaned = value.strip()
if not cleaned:
issues.append(ValidationIssue(f"Empty {label} entry", context=context))
continue
if label == "category":
parts = [part.strip() for part in cleaned.split("/")]
if any(not part for part in parts):
issues.append(ValidationIssue(f"Invalid category path: {cleaned}", context=context))
continue
cleaned = "/".join(parts)
if cleaned not in seen:
seen.add(cleaned)
normalized.append(cleaned)
return normalized
def _relative_path(path: Path, root: Path, issues: List[ValidationIssue]) -> Optional[str]:
try:
return str(path.relative_to(root))
except ValueError:
issues.append(ValidationIssue("Path is outside identity root", context=str(path)))
return None
def _timestamp_for_path(
source: Source,
identity_root: Path,
relative_path: str,
issues: List[ValidationIssue],
) -> Optional[int]:
if source.kind == "git":
try:
return git_timestamp(identity_root, relative_path)
except Exception as exc:
issues.append(ValidationIssue(str(exc), context=relative_path))
return None
try:
return int((identity_root / relative_path).stat().st_mtime)
except Exception as exc:
issues.append(ValidationIssue(f"Timestamp lookup failed: {exc}", context=relative_path))
return None
def _plan_categories(
posts: List[PostPlan],
categories, # list of CategoryTerm
issues: List[ValidationIssue],
existing_tags: Set[str],
) -> List[List[str]]:
category_map: Dict[tuple[int, str], int] = {}
for category in categories:
category_map[(category.parent, category.name)] = category.term_id
missing_paths: List[List[str]] = []
seen_missing: Set[tuple[str, ...]] = set()
for post in posts:
if not post.should_update:
continue
for tag in post.tags:
if tag not in existing_tags:
issues.append(ValidationIssue(f"Tag does not exist: {tag}", context=post.relative_path))
for path in post.categories:
segments = [segment for segment in path.split("/") if segment]
if not segments:
continue
parent = 0
missing = False
for segment in segments:
key = (parent, segment)
if key in category_map:
parent = category_map[key]
continue
missing = True
break
if missing:
key = tuple(segments)
if key not in seen_missing:
seen_missing.add(key)
missing_paths.append(list(segments))
return missing_paths

52
src/git_utils.py Normal file
View File

@@ -0,0 +1,52 @@
from __future__ import annotations
import subprocess
from pathlib import Path
from .errors import ConfigurationError
def ensure_repo(repo_path: Path, url: str, branch: str, sync: bool) -> None:
repo_path.parent.mkdir(parents=True, exist_ok=True)
if not (repo_path / ".git").exists():
if not sync:
raise ConfigurationError(f"Repository missing and sync disabled: {repo_path}")
_run(["git", "clone", "--branch", branch, url, str(repo_path)], cwd=repo_path.parent)
return
if not sync:
return
_run(["git", "fetch", "--all", "--prune"], cwd=repo_path)
_run(["git", "checkout", branch], cwd=repo_path)
_run(["git", "pull", "--ff-only"], cwd=repo_path)
def git_timestamp(repo_root: Path, relative_path: str) -> int:
result = _run(
["git", "log", "-1", "--format=%ct", "--", relative_path],
cwd=repo_root,
capture_output=True,
)
output = result.stdout.strip()
if not output:
raise ConfigurationError(f"No git timestamp for {relative_path}")
try:
return int(output)
except ValueError as exc:
raise ConfigurationError(f"Invalid git timestamp for {relative_path}: {output}") from exc
def _run(cmd: list[str], cwd: Path, capture_output: bool = False) -> subprocess.CompletedProcess:
try:
return subprocess.run(
cmd,
cwd=str(cwd),
check=True,
text=True,
capture_output=capture_output,
)
except subprocess.CalledProcessError as exc:
stderr = exc.stderr.strip() if exc.stderr else ""
raise ConfigurationError(f"Command failed: {' '.join(cmd)}\n{stderr}") from exc

131
src/manifest.py Normal file
View File

@@ -0,0 +1,131 @@
from __future__ import annotations
import json
from pathlib import Path
from typing import Dict
from .errors import ValidationIssue
from .models import FileSpec, InheritList, Manifest
def load_manifest(path: Path, issues: list[ValidationIssue]) -> Manifest | None:
if not path.exists():
issues.append(ValidationIssue("Missing manifest", context=str(path)))
return None
try:
data = json.loads(path.read_text())
except json.JSONDecodeError as exc:
issues.append(ValidationIssue(f"Invalid JSON: {exc}", context=str(path)))
return None
if not isinstance(data, dict):
issues.append(ValidationIssue("Manifest must be a JSON object", context=str(path)))
return None
allowed = {"categories", "tags", "subdirectories", "files"}
extra = set(data.keys()) - allowed
if extra:
issues.append(ValidationIssue(f"Unexpected keys: {sorted(extra)}", context=str(path)))
return None
categories = _parse_inherit_list(data.get("categories"), issues, f"{path}:categories")
tags = _parse_inherit_list(data.get("tags"), issues, f"{path}:tags")
subdirectories = _parse_inherit_list(data.get("subdirectories"), issues, f"{path}:subdirectories")
files: Dict[str, FileSpec] = {}
raw_files = data.get("files", {}) or {}
if not isinstance(raw_files, dict):
issues.append(ValidationIssue("files must be an object", context=str(path)))
return None
for file_name, file_cfg in raw_files.items():
if not isinstance(file_name, str):
issues.append(ValidationIssue("file name must be a string", context=str(path)))
continue
if not isinstance(file_cfg, dict):
issues.append(ValidationIssue(f"{file_name} must be an object", context=str(path)))
continue
extra_file = set(file_cfg.keys()) - {"title", "use_heading_as_title", "categories", "tags"}
if extra_file:
issues.append(
ValidationIssue(f"{file_name} has unexpected keys: {sorted(extra_file)}", context=str(path))
)
continue
title = file_cfg.get("title")
if title is not None and (not isinstance(title, str) or not title.strip()):
issues.append(ValidationIssue(f"{file_name}.title must be a non-empty string", context=str(path)))
continue
use_heading = file_cfg.get("use_heading_as_title")
use_level = None
use_strict = True
if use_heading is not None:
if not isinstance(use_heading, dict):
issues.append(ValidationIssue(f"{file_name}.use_heading_as_title must be an object", context=str(path)))
continue
extra_heading = set(use_heading.keys()) - {"level", "strict"}
if extra_heading:
issues.append(
ValidationIssue(
f"{file_name}.use_heading_as_title has unexpected keys: {sorted(extra_heading)}",
context=str(path),
)
)
continue
level = use_heading.get("level")
strict = use_heading.get("strict", True)
if not isinstance(level, int) or level < 1 or level > 6:
issues.append(ValidationIssue(f"{file_name}.use_heading_as_title.level must be 1-6", context=str(path)))
continue
if not isinstance(strict, bool):
issues.append(
ValidationIssue(f"{file_name}.use_heading_as_title.strict must be boolean", context=str(path))
)
continue
use_level = level
use_strict = strict
categories_override = _parse_inherit_list(file_cfg.get("categories"), issues, f"{path}:{file_name}:categories")
tags_override = _parse_inherit_list(file_cfg.get("tags"), issues, f"{path}:{file_name}:tags")
files[file_name] = FileSpec(
title=title,
use_heading_level=use_level,
use_heading_strict=use_strict,
categories=categories_override,
tags=tags_override,
)
return Manifest(
path=path,
categories=categories,
tags=tags,
subdirectories=subdirectories,
files=files,
)
def _parse_inherit_list(value: object, issues: list[ValidationIssue], context: str) -> InheritList:
if value is None:
return InheritList()
if not isinstance(value, dict):
issues.append(ValidationIssue("Must be an object", context=context))
return InheritList()
extra = set(value.keys()) - {"content", "inherit"}
if extra:
issues.append(ValidationIssue(f"Unexpected keys: {sorted(extra)}", context=context))
return InheritList()
content = value.get("content", [])
inherit = value.get("inherit", True)
if not isinstance(content, list) or any(not isinstance(item, str) for item in content):
issues.append(ValidationIssue("content must be a list of strings", context=context))
content = []
if not isinstance(inherit, bool):
issues.append(ValidationIssue("inherit must be boolean", context=context))
inherit = True
return InheritList(content=[item for item in content if isinstance(item, str)], inherit=inherit)

62
src/markdown_utils.py Normal file
View File

@@ -0,0 +1,62 @@
from __future__ import annotations
import re
import markdown as md_lib
from .errors import ValidationIssue
_HEADING_RE = re.compile(r"^(#{1,6})(\s+.*)$")
def extract_title(markdown_text: str, level: int, strict: bool, context: str, issues: list[ValidationIssue]) -> tuple[str, str] | None:
pattern = re.compile(rf"^{'#' * level}\s+(.*)$", re.MULTILINE)
matches = list(pattern.finditer(markdown_text))
if strict and len(matches) != 1:
issues.append(
ValidationIssue(
f"Expected exactly one level-{level} heading, found {len(matches)}",
context=context,
)
)
return None
if not matches:
issues.append(ValidationIssue(f"Missing level-{level} heading", context=context))
return None
match = matches[0]
title = match.group(1).strip()
if not title:
issues.append(ValidationIssue("Heading title cannot be empty", context=context))
return None
lines = markdown_text.splitlines()
line_index = markdown_text[: match.start()].count("\n")
lines.pop(line_index)
body = "\n".join(lines)
body = _promote_headings(body)
return title, body
def _promote_headings(text: str) -> str:
promoted_lines = []
for line in text.splitlines():
match = _HEADING_RE.match(line)
if not match:
promoted_lines.append(line)
continue
hashes, rest = match.groups()
level = len(hashes)
if level > 1:
level -= 1
promoted_lines.append("#" * level + rest)
return "\n".join(promoted_lines)
def convert_markdown(markdown_text: str, context: str, issues: list[ValidationIssue]) -> str | None:
try:
return md_lib.markdown(markdown_text, extensions=["extra"], output_format="html5")
except Exception as exc: # pragma: no cover - depends on markdown internals
issues.append(ValidationIssue(f"Markdown conversion failed: {exc}", context=context))
return None

63
src/models.py Normal file
View File

@@ -0,0 +1,63 @@
from __future__ import annotations
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, List, Optional
@dataclass(frozen=True)
class InheritList:
content: List[str] = field(default_factory=list)
inherit: bool = True
@dataclass(frozen=True)
class FileSpec:
title: Optional[str]
use_heading_level: Optional[int]
use_heading_strict: bool
categories: Optional[InheritList]
tags: Optional[InheritList]
@dataclass(frozen=True)
class Manifest:
path: Path
categories: InheritList
tags: InheritList
subdirectories: InheritList
files: Dict[str, FileSpec]
@dataclass(frozen=True)
class Source:
name: str
root_path: Path
identity_root: Path
kind: str # "git" or "dir"
@dataclass
class PostPlan:
source: Source
identity: str
relative_path: str
absolute_path: Path
title: str
html: str
categories: List[str]
tags: List[str]
source_timestamp: int
cached_timestamp: Optional[int]
should_update: bool
@dataclass
class CategoryPlan:
missing_paths: List[List[str]]
@dataclass
class EvaluationResult:
posts: List[PostPlan]
categories_to_create: CategoryPlan

61
src/state.py Normal file
View File

@@ -0,0 +1,61 @@
from __future__ import annotations
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict
from .errors import ConfigurationError
@dataclass
class PostState:
source_timestamp: int
materialized_at: int
@dataclass
class State:
posts: Dict[str, PostState] = field(default_factory=dict)
def load_state(path: Path) -> State:
if not path.exists():
return State()
try:
data = json.loads(path.read_text())
except json.JSONDecodeError as exc:
raise ConfigurationError(f"Invalid JSON in state file: {exc}") from exc
if not isinstance(data, dict):
raise ConfigurationError("State must be a JSON object")
posts_data = data.get("posts", {})
if not isinstance(posts_data, dict):
raise ConfigurationError("State.posts must be an object")
posts: Dict[str, PostState] = {}
for identity, entry in posts_data.items():
if not isinstance(entry, dict):
raise ConfigurationError(f"State.posts.{identity} must be an object")
source_ts = entry.get("source_timestamp")
materialized_at = entry.get("materialized_at")
if not isinstance(source_ts, int) or not isinstance(materialized_at, int):
raise ConfigurationError(f"State.posts.{identity} timestamps must be integers")
posts[identity] = PostState(source_timestamp=source_ts, materialized_at=materialized_at)
return State(posts=posts)
def save_state(path: Path, state: State) -> None:
payload = {
"posts": {
identity: {
"source_timestamp": entry.source_timestamp,
"materialized_at": entry.materialized_at,
}
for identity, entry in state.posts.items()
}
}
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(payload, indent=2, sort_keys=True))

170
src/wp_cli.py Normal file
View File

@@ -0,0 +1,170 @@
from __future__ import annotations
import json
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional
from .errors import WordPressError
@dataclass(frozen=True)
class CategoryTerm:
term_id: int
name: str
parent: int
@dataclass(frozen=True)
class TagTerm:
term_id: int
name: str
class WordPressCLI:
def __init__(self, root: Path):
self.root = root
def list_categories(self) -> List[CategoryTerm]:
data = self._run_json([
"wp",
"term",
"list",
"category",
"--fields=term_id,name,parent",
"--format=json",
])
categories: List[CategoryTerm] = []
for entry in data:
categories.append(
CategoryTerm(
term_id=int(entry["term_id"]),
name=entry["name"],
parent=int(entry["parent"]) if entry.get("parent") is not None else 0,
)
)
return categories
def list_tags(self) -> List[TagTerm]:
data = self._run_json([
"wp",
"term",
"list",
"post_tag",
"--fields=term_id,name",
"--format=json",
])
tags: List[TagTerm] = []
for entry in data:
tags.append(TagTerm(term_id=int(entry["term_id"]), name=entry["name"]))
return tags
def create_category(self, name: str, parent: int) -> int:
result = self._run(
[
"wp",
"term",
"create",
"category",
name,
f"--parent={parent}",
"--porcelain",
],
capture_output=True,
)
output = result.stdout.strip()
try:
return int(output)
except ValueError as exc:
raise WordPressError(f"Invalid category id from wp cli: {output}") from exc
def find_post_id(self, source_identity: str) -> Optional[int]:
result = self._run(
[
"wp",
"post",
"list",
"--post_type=post",
"--meta_key=_wp_materialize_source",
f"--meta_value={source_identity}",
"--field=ID",
],
capture_output=True,
)
output = result.stdout.strip()
if not output:
return None
try:
return int(output.splitlines()[0])
except ValueError as exc:
raise WordPressError(f"Invalid post id from wp cli: {output}") from exc
def create_post(
self,
title: str,
content: str,
categories: List[int],
tags: List[str],
source_identity: str,
) -> int:
payload = json.dumps({"_wp_materialize_source": source_identity})
args = [
"wp",
"post",
"create",
"--post_type=post",
"--post_status=publish",
f"--post_title={title}",
f"--post_content={content}",
f"--post_category={','.join(str(cat) for cat in categories)}",
f"--tags_input={','.join(tags)}",
f"--meta_input={payload}",
"--porcelain",
]
result = self._run(args, capture_output=True)
output = result.stdout.strip()
try:
return int(output)
except ValueError as exc:
raise WordPressError(f"Invalid post id from wp cli: {output}") from exc
def update_post(
self,
post_id: int,
title: str,
content: str,
categories: List[int],
tags: List[str],
) -> None:
args = [
"wp",
"post",
"update",
str(post_id),
f"--post_title={title}",
f"--post_content={content}",
f"--post_category={','.join(str(cat) for cat in categories)}",
f"--tags_input={','.join(tags)}",
]
self._run(args)
def _run_json(self, cmd: List[str]):
result = self._run(cmd, capture_output=True)
try:
return json.loads(result.stdout)
except json.JSONDecodeError as exc:
raise WordPressError(f"Invalid JSON from wp cli: {exc}\n{result.stdout}") from exc
def _run(self, cmd: List[str], capture_output: bool = False) -> subprocess.CompletedProcess:
try:
return subprocess.run(
cmd,
cwd=str(self.root),
check=True,
text=True,
capture_output=capture_output,
)
except subprocess.CalledProcessError as exc:
stderr = exc.stderr.strip() if exc.stderr else ""
raise WordPressError(f"WordPress CLI failed: {' '.join(cmd)}\n{stderr}") from exc