110 lines
3.3 KiB
Python
110 lines
3.3 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
import shlex
|
|
import unicodedata
|
|
from pathlib import Path
|
|
from typing import List, Set
|
|
|
|
from .errors import MaterializeError
|
|
from .models import EvaluationResult, PostPlan
|
|
|
|
|
|
def export_local(result: EvaluationResult, output_dir: Path) -> None:
|
|
if not output_dir.exists():
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
if not output_dir.is_dir():
|
|
raise MaterializeError(f"Output path is not a directory: {output_dir}")
|
|
|
|
used_names: Set[str] = set()
|
|
for post in result.posts:
|
|
metadata = _build_metadata(post)
|
|
command = _build_wp_command(post)
|
|
|
|
base_name = _normalize_name(f"{post.source.name}/{post.relative_path}")
|
|
title_name = _normalize_name(post.title)
|
|
if title_name:
|
|
dir_name = f"{base_name}-{title_name}"
|
|
else:
|
|
dir_name = base_name
|
|
dir_name = _dedupe_name(dir_name, used_names)
|
|
used_names.add(dir_name)
|
|
|
|
target_dir = output_dir / dir_name
|
|
target_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
(target_dir / "post.html").write_text(post.html, encoding="utf-8")
|
|
(target_dir / "metadata.json").write_text(
|
|
json.dumps(metadata, indent=2, sort_keys=True),
|
|
encoding="utf-8",
|
|
)
|
|
(target_dir / "wp-command.txt").write_text(command + "\n", encoding="utf-8")
|
|
|
|
|
|
def _build_metadata(post: PostPlan) -> dict:
|
|
metadata = {
|
|
"post_type": "post",
|
|
"post_status": "publish",
|
|
"post_title": post.title,
|
|
"post_content": post.html,
|
|
"post_category": post.categories,
|
|
"tags_input": post.tags,
|
|
"meta_input": {"_wp_materialize_source": post.identity},
|
|
}
|
|
if post.created_on:
|
|
metadata["post_date"] = post.created_on
|
|
if post.last_modified:
|
|
metadata["post_modified"] = post.last_modified
|
|
if post.author:
|
|
metadata["post_author"] = post.author
|
|
return metadata
|
|
|
|
|
|
def _build_wp_command(post: PostPlan) -> str:
|
|
payload = json.dumps({"_wp_materialize_source": post.identity})
|
|
args = [
|
|
"wp",
|
|
"post",
|
|
"create",
|
|
"--post_type=post",
|
|
"--post_status=publish",
|
|
f"--post_title={post.title}",
|
|
f"--post_content={post.html}",
|
|
f"--post_category={','.join(post.categories)}",
|
|
f"--tags_input={','.join(post.tags)}",
|
|
f"--meta_input={payload}",
|
|
"--porcelain",
|
|
]
|
|
if post.created_on:
|
|
args.append(f"--post_date={post.created_on}")
|
|
if post.last_modified:
|
|
args.append(f"--post_modified={post.last_modified}")
|
|
if post.author:
|
|
args.append(f"--post_author={post.author}")
|
|
return " ".join(shlex.quote(arg) for arg in args)
|
|
|
|
|
|
def _normalize_name(value: str) -> str:
|
|
text = value.strip()
|
|
text = text.replace("\\", "/")
|
|
text = text.replace("/", "-")
|
|
text = unicodedata.normalize("NFKD", text)
|
|
text = text.encode("ascii", "ignore").decode("ascii")
|
|
text = text.lower()
|
|
text = re.sub(r"[^a-z0-9._-]+", "-", text)
|
|
text = re.sub(r"-+", "-", text)
|
|
text = text.strip("-_.")
|
|
return text or "post"
|
|
|
|
|
|
def _dedupe_name(name: str, used: Set[str]) -> str:
|
|
if name not in used:
|
|
return name
|
|
index = 2
|
|
while True:
|
|
candidate = f"{name}-{index}"
|
|
if candidate not in used:
|
|
return candidate
|
|
index += 1
|