convert : detect filesystem block size for reflinks

* convert : use direct copies when possible

Using os.copy_file_range where available,
and falling back to shutil.copyfileobj otherwise.

* gguf : handle misaligned offset more cleanly
This commit is contained in:
Francis Couture-Harpin
2025-09-04 17:40:11 -04:00
parent 791bd97b3c
commit c3738cfcef
5 changed files with 132 additions and 104 deletions

View File

@@ -7,6 +7,7 @@ from typing import Literal
import os
import json
import shutil
import logging
import numpy as np
@@ -281,99 +282,116 @@ class SafetensorRemote:
@dataclass
class LocalTensorRange:
filename: Path
block_size: int
offset: int
size: int
def best_alignment_offset(ranges: tuple[LocalTensorRange, ...], alignment: int):
def best_extra_offset(ranges: tuple[LocalTensorRange, ...], current_offset: int) -> int:
hist: dict[int, int] = {}
max_block_size = 0
for r in ranges:
align_offset = r.offset % alignment
if align_offset not in hist:
hist[align_offset] = 0
hist[align_offset] += r.size
# Ensure minimal alignment is 8 bytes (common with safetensors)
# and that the block size is valid
if r.offset % 8 == 0 and r.block_size > 0:
align_offset = r.offset % r.block_size
if align_offset not in hist:
hist[align_offset] = 0
hist[align_offset] += r.size
if r.block_size > max_block_size:
max_block_size = r.block_size
best_offset = 0
best_size = 0
for offset, size in hist.items():
# Ensure minimal alignment is 8-bytes (common with safetensors)
if size > best_size and offset % 8 == 0:
if size > best_size:
best_size = size
best_offset = offset
if max_block_size > 0:
# the offset needs to be aligned properly
# or else there's probably a block size mismatch
assert current_offset % max_block_size == 0, current_offset % max_block_size
return best_offset
# (assuming this is only called where os.copy_file_range is present)
#
# Copy tensor ranges using os.copy_file_range with aligned offsets and sizes
# to make it more likely that copy-on-write is used where possible.
# Block alignment is necessary for BTRFS and XFS (and likely for ZFS too).
def reflink_tensor_ranges(fout: BufferedWriter, ranges: tuple[LocalTensorRange, ...], alignment: int = 4096):
#
# Falls back to shutil.copyfileobj when os.copy_file_range is not present.
def copy_tensor_ranges(fout: BufferedWriter, ranges: tuple[LocalTensorRange, ...]):
assert len(ranges) > 0
dst_offset = fout.tell()
assert dst_offset % alignment == 0, dst_offset % alignment
align_offset = best_alignment_offset(ranges, alignment)
if len(ranges) == 1:
r = ranges[0]
with open(r.filename, "rb") as src:
offset_src = r.offset - align_offset
offset_src_end = r.offset + r.size
if offset_src_end % alignment != 0:
offset_src_end += alignment - (offset_src_end % alignment)
size = offset_src_end - offset_src
os.copy_file_range(src.fileno(), fout.fileno(), size, offset_src, dst_offset)
dst_offset += r.size + align_offset
else:
# All ranges need to have the same alignment offset
# Non-consecutive ranges need a patch block in between when the alignment offset is non-zero
src_files: dict[Path, BufferedReader] = {}
for r in ranges:
if r.filename not in src_files:
src_files[r.filename] = open(r.filename, "rb")
extra_offset = best_extra_offset(ranges, dst_offset)
for i, r in enumerate(ranges):
this_align_offset = r.offset % alignment
src = src_files[r.filename]
if this_align_offset != align_offset:
logger.debug(f"copy-on-write can't be used ({i}/{len(ranges)})")
# relying on os.copy_file_range to fallback to a non-aligned copy
if extra_offset > 0:
# initial padding
fout.write(b"\x00" * extra_offset)
# Block 0, 1, 2, 3, 4,
# |___0000|0000000|0001111|1111111|111____|
#
# 1. blocks 0, 1 and 2 are copied from range[0] using os.copy_file_range
# 2. block 2 is partially overwritten with contents from range[1]
# 3. blocks 3 and 4 are copied from range[1] using os.copy_file_range
#
# (2 and 3 are repeated with further blocks if there are more ranges)
if i == 0:
extra_size = -align_offset
elif dst_offset % alignment == 0:
extra_size = 0
dst_offset += extra_offset
start_offset = dst_offset
src_files: dict[Path, BufferedReader] = {}
for r in ranges:
if r.filename not in src_files:
src_files[r.filename] = open(r.filename, "rb")
has_copy_file_range = hasattr(os, "copy_file_range")
for i, r in enumerate(ranges):
src = src_files[r.filename]
if has_copy_file_range:
if r.block_size > 0 and (r.offset % r.block_size) == (start_offset % r.block_size):
# Attempting to align copies for reflinking
# Block 0, 1, 2, 3, 4,
# |___0000|0000000|0001111|1111111|111____|
#
# 1. block 0 is partially overwritten with contents from range[0]
# 2. blocks 1 and 2 are copied from range[0] using os.copy_file_range
# 3. block 2 is partially overwritten with contents from range[1]
# 4. blocks 3 and 4 are copied from range[1] using os.copy_file_range
# (repeated for further ranges)
if dst_offset % r.block_size == 0:
extra_size = 0
else:
extra_size = r.block_size - (dst_offset % r.block_size)
extra_size = min(extra_size, r.size)
src.seek(r.offset)
buf = src.read(extra_size)
fout.seek(dst_offset)
fout.write(buf)
dst_offset += extra_size
if extra_size == r.size:
continue
assert dst_offset % r.block_size == 0, dst_offset % r.block_size
offset_src = r.offset + extra_size
offset_src_end = r.offset + r.size
if offset_src_end % r.block_size != 0:
offset_src_end += r.block_size - (offset_src_end % r.block_size)
size = offset_src_end - offset_src
os.copy_file_range(src.fileno(), fout.fileno(), size, offset_src, dst_offset)
dst_offset += r.size - extra_size
else:
extra_size = alignment - (dst_offset % alignment)
extra_size = min(extra_size, r.size)
src.seek(r.offset)
buf = src.read(extra_size)
fout.seek(dst_offset)
fout.write(buf)
dst_offset += extra_size
if extra_size == r.size:
continue
if r.block_size > 0:
logger.debug(f"misaligned for reflinking, falling back to copy ({i}/{len(ranges)})")
# not trying to use reflinks, but still using os.copy_file_range for speed
os.copy_file_range(src.fileno(), fout.fileno(), r.size, r.offset, dst_offset)
dst_offset += r.size
else:
# not using reflinks, fallback when os.copy_file_range is not supported
src.seek(r.offset)
fout.seek(dst_offset)
shutil.copyfileobj(src, fout, r.size)
dst_offset += r.size
assert dst_offset % alignment == 0, dst_offset % alignment
offset_src = r.offset + extra_size
offset_src_end = r.offset + r.size
if offset_src_end % alignment != 0:
offset_src_end += alignment - (offset_src_end % alignment)
size = offset_src_end - offset_src
os.copy_file_range(src.fileno(), fout.fileno(), size, offset_src, dst_offset)
dst_offset += r.size - extra_size
for f in src_files.values():
f.close()
for f in src_files.values():
f.close()
fout.seek(dst_offset)
@@ -399,10 +417,13 @@ class SafetensorsLocal:
tensors: dict[str, LocalTensor]
def __init__(self, filename: Path):
def __init__(self, filename: Path, *, reflink: bool = False):
stat = os.stat(filename)
# using the preferred block size to signal whether reflinks are desired when copying
block_size = stat.st_blksize if reflink else -1
with open(filename, "rb") as f:
metadata_length = int.from_bytes(f.read(8), byteorder='little')
file_size = os.stat(filename).st_size
file_size = stat.st_size
if file_size < 8 + metadata_length:
raise ValueError(f"Could not read complete metadata. Need {8 + metadata_length} bytes, got {file_size}")
@@ -427,9 +448,10 @@ class SafetensorsLocal:
dtype=meta["dtype"],
shape=tuple(meta["shape"]),
data_range=LocalTensorRange(
filename,
data_start_offset + meta["data_offsets"][0],
meta["data_offsets"][1] - meta["data_offsets"][0],
filename=filename,
block_size=block_size,
offset=data_start_offset + meta["data_offsets"][0],
size=meta["data_offsets"][1] - meta["data_offsets"][0],
),
)