1
0
mirror of https://github.com/containers/ramalama.git synced 2026-02-05 06:46:39 +01:00

Refactor code base to use pathlib.Path were possible

Signed-off-by: Michael Engel <mengel@redhat.com>
This commit is contained in:
Michael Engel
2026-01-15 11:42:20 +01:00
parent 41148c2aa4
commit 042c2b0903
15 changed files with 147 additions and 141 deletions

View File

@@ -1,5 +1,6 @@
import argparse
import os
from pathlib import Path
from typing import Optional
from ramalama.common import check_metal, check_nvidia
@@ -112,19 +113,19 @@ class RamalamaModelContext:
return self.model._get_entry_model_path(self.is_container, self.should_generate, self.dry_run)
@property
def mmproj_path(self) -> Optional[str]:
def mmproj_path(self) -> Optional[Path]:
return self.model._get_mmproj_path(self.is_container, self.should_generate, self.dry_run)
@property
def chat_template_path(self) -> Optional[str]:
def chat_template_path(self) -> Optional[Path]:
return self.model._get_chat_template_path(self.is_container, self.should_generate, self.dry_run)
@property
def draft_model_path(self) -> str:
def draft_model_path(self) -> Optional[str]:
if getattr(self.model, "draft_model", None):
assert self.model.draft_model
return self.model.draft_model._get_entry_model_path(self.is_container, self.should_generate, self.dry_run)
return ""
return None
class RamalamaHostContext:

View File

@@ -15,6 +15,7 @@ import subprocess
import sys
from collections.abc import Callable, Sequence
from functools import lru_cache
from pathlib import Path
from typing import IO, TYPE_CHECKING, Any, Literal, Protocol, TypeAlias, TypedDict, cast, get_args
import yaml
@@ -117,7 +118,7 @@ def available(cmd: str) -> bool:
def quoted(arr) -> str:
"""Return string with quotes around elements containing spaces."""
return " ".join(['"' + element + '"' if ' ' in element else element for element in arr])
return " ".join(['"' + s + '"' if ' ' in s else s for element in arr for s in [str(element)]])
def exec_cmd(args, stdout2null: bool = False, stderr2null: bool = False):
@@ -251,25 +252,25 @@ def generate_sha256(to_hash: str, with_sha_prefix: bool = True) -> str:
return generate_sha256_binary(to_hash.encode("utf-8"), with_sha_prefix)
def verify_checksum(filename: str) -> bool:
def verify_checksum(filepath: Path) -> bool:
"""
Verifies if the SHA-256 checksum of a file matches the checksum provided in
the filename.
the file path.
Args:
filename (str): The filename containing the checksum prefix
filepath (Path): The filen path containing the checksum prefix
(e.g., "sha256:<checksum>")
Returns:
bool: True if the checksum matches, False otherwise.
"""
if not os.path.exists(filename):
if not filepath.exists():
return False
# Check if the filename starts with "sha256:" or "sha256-" and extract the checksum from filename
expected_checksum = ""
fn_base = os.path.basename(filename)
fn_base = filepath.name if filepath.is_file() else ""
if fn_base.startswith("sha256:"):
expected_checksum = fn_base.split(":")[1]
elif fn_base.startswith("sha256-"):
@@ -282,7 +283,7 @@ def verify_checksum(filename: str) -> bool:
# Calculate the SHA-256 checksum of the file contents
sha256_hash = hashlib.sha256()
with open(filename, "rb") as f:
with open(filepath, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)

View File

@@ -5,6 +5,7 @@ import shutil
import sys
import time
import urllib.request
from pathlib import Path
import ramalama.console as console
from ramalama.common import perror
@@ -24,10 +25,10 @@ class HttpClient:
def __init__(self):
pass
def init(self, url, headers, output_file, show_progress, response_bytes=None):
output_file_partial = None
if output_file:
output_file_partial = output_file + ".partial"
def init(
self, url: str, headers: dict[str, str] | None, output_file: Path, show_progress: bool, response_bytes=None
):
output_file_partial = Path(f"{output_file}.partial")
self.file_size = self.set_resume_point(output_file_partial)
self.urlopen(url, headers)
@@ -49,8 +50,7 @@ class HttpClient:
finally:
del out # Ensure file is closed before rename
if output_file:
os.rename(output_file_partial, output_file)
os.rename(output_file_partial, output_file)
def urlopen(self, url, headers):
headers["Range"] = f"bytes={self.file_size}-"
@@ -162,13 +162,13 @@ class HttpClient:
return now_downloaded / elapsed_seconds
def download_file(url: str, dest_path: str, headers: dict[str, str] | None = None, show_progress: bool = True):
def download_file(url: str, dest_path: Path, headers: dict[str, str] | None = None, show_progress: bool = True):
"""
Downloads a file from a given URL to a specified destination path.
Args:
url (str): The URL to download from.
dest_path (str): The path to save the downloaded file.
dest_path (Path): The path to save the downloaded file.
headers (dict): Optional headers to include in the request.
show_progress (bool): Whether to show a progress bar during download.

View File

@@ -95,7 +95,7 @@ class Kube:
host_model_path = normalize_host_path_for_container(self.src_model_path)
if platform.system() == "Windows":
# Workaround https://github.com/containers/podman/issues/16704
host_model_path = '/mnt' + host_model_path
host_model_path = f"/mnt{host_model_path}"
mount = f"""
- mountPath: {self.dest_model_path}
name: model"""
@@ -129,7 +129,7 @@ class Kube:
host_chat_template_path = normalize_host_path_for_container(self.src_chat_template_path)
if platform.system() == "Windows":
# Workaround https://github.com/containers/podman/issues/16704
host_chat_template_path = '/mnt' + host_chat_template_path
host_chat_template_path = f"/mnt{host_chat_template_path}"
mount = f"""
- mountPath: {self.dest_chat_template_path}
name: chat_template"""
@@ -143,7 +143,7 @@ class Kube:
host_mmproj_path = normalize_host_path_for_container(self.src_mmproj_path)
if platform.system() == "Windows":
# Workaround https://github.com/containers/podman/issues/16704
host_mmproj_path = '/mnt' + host_mmproj_path
host_mmproj_path = f"/mnt{host_mmproj_path}"
mount = f"""
- mountPath: {self.dest_mmproj_path}
name: mmproj"""

View File

@@ -1,6 +1,7 @@
import io
import struct
from enum import IntEnum
from pathlib import Path
from typing import Any, Dict, cast
from ramalama.endian import GGUFEndian
@@ -112,7 +113,7 @@ GGUF_NUMBER_FORMATS: list[GGUFValueType] = [
class GGUFInfoParser:
@staticmethod
def is_model_gguf(model_path: str) -> bool:
def is_model_gguf(model_path: Path) -> bool:
try:
with open(model_path, "rb") as model_file:
magic_number = GGUFInfoParser.read_string(model_file, GGUFEndian.LITTLE, 4)
@@ -178,7 +179,7 @@ class GGUFInfoParser:
return value
@staticmethod
def get_model_endianness(model_path: str) -> GGUFEndian:
def get_model_endianness(model_path: Path) -> GGUFEndian:
# Pin model endianness to Little Endian by default.
# Models downloaded via HuggingFace are majority Little Endian.
model_endianness = GGUFEndian.LITTLE
@@ -205,7 +206,7 @@ class GGUFInfoParser:
return metadata
@staticmethod
def parse_metadata(model_path: str) -> GGUFModelMetadata:
def parse_metadata(model_path: Path) -> GGUFModelMetadata:
model_endianness = GGUFInfoParser.get_model_endianness(model_path)
with open(model_path, "rb") as model:
@@ -220,7 +221,7 @@ class GGUFInfoParser:
return GGUFModelMetadata(GGUFInfoParser._parse_metadata(model, model_endianness))
@staticmethod
def parse(model_name: str, model_registry: str, model_path: str) -> GGUFModelInfo:
def parse(model_name: str, model_registry: str, model_path: Path) -> GGUFModelInfo:
model_endianness = GGUFInfoParser.get_model_endianness(model_path)
with open(model_path, "rb") as model:
@@ -249,5 +250,5 @@ class GGUFInfoParser:
tensors.append(Tensor(name, n_dimensions, dimensions, tensor_type.name, offset))
return GGUFModelInfo(
model_name, model_registry, model_path, gguf_version, metadata, tensors, model_endianness
model_name, model_registry, str(model_path), gguf_version, metadata, tensors, model_endianness
)

View File

@@ -1,5 +1,6 @@
import json
import struct
from pathlib import Path
import ramalama.console as console
from ramalama.model_inspect.error import ParseError
@@ -18,7 +19,7 @@ class SafetensorInfoParser:
return model_name.endswith(".safetensor") or model_name.endswith(".safetensors")
@staticmethod
def parse(model_name: str, model_registry: str, model_path: str) -> SafetensorModelInfo:
def parse(model_name: str, model_registry: str, model_path: Path) -> SafetensorModelInfo:
try:
with open(model_path, "rb") as model_file:
prefix = '<'
@@ -27,7 +28,7 @@ class SafetensorInfoParser:
header_size = struct.unpack(typestring, model_file.read(8))[0]
header = json.loads(model_file.read(header_size))
return SafetensorModelInfo(model_name, model_registry, model_path, header)
return SafetensorModelInfo(model_name, model_registry, str(model_path), header)
except Exception as ex:
msg = f"Failed to parse safetensor model '{model_path}': {ex}"

View File

@@ -1,6 +1,7 @@
import os
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Dict, List
from ramalama import oci_tools
@@ -22,10 +23,10 @@ class GlobalModelStore:
self,
base_path: str,
):
self._store_base_path = os.path.join(base_path, "store")
self._store_base_path = Path(os.path.join(base_path, "store"))
@property
def path(self) -> str:
def path(self) -> Path:
return self._store_base_path
def list_models(self, engine: str, show_container: bool) -> Dict[str, List[ModelFile]]:
@@ -33,16 +34,16 @@ class GlobalModelStore:
for root, subdirs, _ in os.walk(self.path):
if DIRECTORY_NAME_REFS in subdirs:
ref_dir = os.path.join(root, DIRECTORY_NAME_REFS)
ref_dir = Path(root).joinpath(DIRECTORY_NAME_REFS)
for ref_file_name in os.listdir(ref_dir):
ref_file_path = os.path.join(ref_dir, ref_file_name)
ref_file_path = ref_dir.joinpath(ref_file_name)
ref_file = migrate_reffile_to_refjsonfile(
ref_file_path, os.path.join(root, DIRECTORY_NAME_SNAPSHOTS)
ref_file_path, Path(root).joinpath(DIRECTORY_NAME_SNAPSHOTS)
)
if ref_file is None:
ref_file = RefJSONFile.from_path(ref_file_path)
model_path = root.replace(self.path, "").replace(os.sep, "", 1)
model_path = root.replace(f"{self.path}", "").replace(os.sep, "", 1)
parts = model_path.split(os.sep)
model_source = parts[0]
@@ -55,14 +56,14 @@ class GlobalModelStore:
collected_files = []
for snapshot_file in ref_file.files:
is_partially_downloaded = False
snapshot_file_path = os.path.join(
root, DIRECTORY_NAME_SNAPSHOTS, ref_file.hash, snapshot_file.name
snapshot_file_path = Path(root).joinpath(
DIRECTORY_NAME_SNAPSHOTS, ref_file.hash, snapshot_file.name
)
if not os.path.exists(snapshot_file_path):
blobs_partial_file_path = os.path.join(
root, DIRECTORY_NAME_BLOBS, ref_file.hash + ".partial"
if not snapshot_file_path.exists():
blobs_partial_file_path = Path(root).joinpath(
DIRECTORY_NAME_BLOBS, ref_file.hash + ".partial"
)
if not os.path.exists(blobs_partial_file_path):
if not blobs_partial_file_path.exists():
continue
snapshot_file_path = blobs_partial_file_path

View File

@@ -1,6 +1,6 @@
import json
import os
from dataclasses import dataclass
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Optional
@@ -21,14 +21,14 @@ class RefFile:
self.model_name: str = ""
self.chat_template_name: str = ""
self.mmproj_name: str = ""
self._path: str = ""
self._path: Path = Path("")
@property
def path(self) -> str:
def path(self) -> Path:
return self._path
@staticmethod
def from_path(path: str) -> "RefFile":
def from_path(path: Path) -> "RefFile":
ref_file = RefFile()
ref_file._path = path
with open(path, "r") as file:
@@ -82,12 +82,12 @@ class RefFile:
file.flush()
@staticmethod
def map_to_refjsonfile(ref_file_path: str, snapshot_directory: str) -> "RefJSONFile":
def map_to_refjsonfile(ref_file_path: Path, snapshot_directory: Path) -> "RefJSONFile":
ref_file = RefFile.from_path(ref_file_path)
ref = RefJSONFile(
hash=ref_file.hash,
path=f"{ref_file.path}.json",
path=Path(f"{ref_file.path}.json"),
files=[],
)
@@ -116,10 +116,10 @@ class RefFile:
#
# Temporary migration routine to ensure smooth transition to new RefFile format
#
def migrate_reffile_to_refjsonfile(ref_file_path: str, snapshot_directory: str) -> Optional["RefJSONFile"]:
def migrate_reffile_to_refjsonfile(ref_file_path: Path, snapshot_directory: Path) -> Optional["RefJSONFile"]:
# Check if a ref file in old format is present by removing the file extension
old_ref_file_path = ref_file_path.replace(".json", "")
if os.path.exists(old_ref_file_path):
old_ref_file_path = ref_file_path.with_suffix("")
if old_ref_file_path.exists():
logger.debug(f"Migrating old ref file '{old_ref_file_path}' to new format")
ref: RefJSONFile = RefFile.map_to_refjsonfile(old_ref_file_path, snapshot_directory)
ref.write_to_file()
@@ -162,13 +162,13 @@ class StoreFile:
@dataclass
class RefJSONFile:
hash: str
path: str
path: Path
files: list[StoreFile]
version: str = "v1.0.1"
def to_json(self) -> str:
return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=2)
return json.dumps(asdict(self), default=str, sort_keys=True, indent=2)
def write_to_file(self):
with open(self.path, "w") as file:
@@ -198,7 +198,7 @@ class RefJSONFile:
break
@staticmethod
def from_path(path: str) -> "RefJSONFile":
def from_path(path: Path) -> "RefJSONFile":
with open(path, "r") as f:
data = json.loads(f.read())
@@ -219,7 +219,7 @@ class RefJSONFile:
files=ref_files,
)
# ref file has moved
if ref_file.path != data["path"]:
if f"{ref_file.path}" != data["path"]:
logger.debug(f"Updating ref file path to '{ref_file.path}'")
ref_file.write_to_file()
return ref_file

View File

@@ -1,5 +1,6 @@
import os
from enum import IntEnum
from pathlib import Path
from typing import Dict, Sequence
from ramalama.common import generate_sha256_binary, perror
@@ -36,8 +37,8 @@ class SnapshotFile:
self.should_verify_checksum: bool = should_verify_checksum
self.required: bool = required
def download(self, blob_file_path: str, snapshot_dir: str) -> str:
if not os.path.exists(blob_file_path):
def download(self, blob_file_path: Path, snapshot_dir: Path) -> str:
if not blob_file_path.exists():
if self.should_show_progress:
perror(f"Downloading {self.name}")
download_file(
@@ -50,7 +51,7 @@ class SnapshotFile:
logger.debug(f"Using cached blob for {self.name} ({os.path.basename(blob_file_path)})")
prefix = os.path.dirname(self.name)
if prefix:
snapshot_dir = os.path.join(snapshot_dir, prefix)
snapshot_dir = snapshot_dir.joinpath(prefix)
return os.path.relpath(blob_file_path, start=snapshot_dir)

View File

@@ -53,7 +53,7 @@ class ModelStore:
self._model_organization = model_organization
@property
def base_path(self) -> str:
def base_path(self) -> Path:
return self._store.path
@property
@@ -69,32 +69,29 @@ class ModelStore:
return self._model_type
@property
def model_base_directory(self) -> str:
return os.path.join(self.base_path, self.model_type, self.model_organization, self.model_name)
def model_base_directory(self) -> Path:
return self.base_path.joinpath(self.model_type, self.model_organization, self.model_name)
@property
def blobs_directory(self) -> str:
return os.path.join(self.model_base_directory, DIRECTORY_NAME_BLOBS)
def blobs_directory(self) -> Path:
return self.model_base_directory.joinpath(DIRECTORY_NAME_BLOBS)
@property
def refs_directory(self) -> str:
return os.path.join(self.model_base_directory, DIRECTORY_NAME_REFS)
def refs_directory(self) -> Path:
return self.model_base_directory.joinpath(DIRECTORY_NAME_REFS)
@property
def snapshots_directory(self) -> str:
return os.path.join(self.model_base_directory, DIRECTORY_NAME_SNAPSHOTS)
def snapshots_directory(self) -> Path:
return self.model_base_directory.joinpath(DIRECTORY_NAME_SNAPSHOTS)
def file_exists(self, file_path: str) -> bool:
return os.path.exists(file_path)
def get_ref_file_path(self, model_tag: str) -> str:
return os.path.join(self.refs_directory, f"{model_tag}.json")
def get_ref_file_path(self, model_tag: str) -> Path:
return self.refs_directory.joinpath(f"{model_tag}.json")
def get_ref_file(self, model_tag: str) -> Optional[RefJSONFile]:
ref_file_path = self.get_ref_file_path(model_tag)
ref_file = migrate_reffile_to_refjsonfile(ref_file_path, self.snapshots_directory)
if ref_file is None:
if os.path.exists(ref_file_path):
if ref_file_path.exists():
ref_file = RefJSONFile.from_path(ref_file_path)
if ref_file is not None:
if ref_file.version != RefJSONFile.version:
@@ -130,19 +127,19 @@ class ModelStore:
return ""
return sanitize_filename(ref_file.hash)
def get_snapshot_directory_from_tag(self, model_tag: str) -> str:
return os.path.join(self.snapshots_directory, self.get_snapshot_hash(model_tag))
def get_snapshot_directory_from_tag(self, model_tag: str) -> Path:
return self.snapshots_directory.joinpath(self.get_snapshot_hash(model_tag))
def get_snapshot_directory(self, hash: str) -> str:
return os.path.join(self.snapshots_directory, hash)
def get_snapshot_directory(self, hash: str) -> Path:
return self.snapshots_directory.joinpath(hash)
def get_snapshot_file_path(self, tag_hash: str, filename: str) -> str:
return os.path.join(self.snapshots_directory, sanitize_filename(tag_hash), filename)
def get_snapshot_file_path(self, tag_hash: str, filename: str) -> Path:
return self.snapshots_directory.joinpath(sanitize_filename(tag_hash), filename)
def get_blob_file_path(self, file_hash: str) -> str:
return os.path.join(self.blobs_directory, sanitize_filename(file_hash))
def get_blob_file_path(self, file_hash: str) -> Path:
return self.blobs_directory.joinpath(sanitize_filename(file_hash))
def get_safetensor_blob_path(self, model_tag: str, requested_filename: str) -> Optional[str]:
def get_safetensor_blob_path(self, model_tag: str, requested_filename: str) -> Optional[Path]:
ref_file = self.get_ref_file(model_tag)
if ref_file is None:
return None
@@ -153,26 +150,23 @@ class ModelStore:
chosen = matched if matched is not None else safetensor_files[0]
return self.get_blob_file_path(chosen.hash)
def get_blob_file_path_by_name(self, tag_hash: str, filename: str) -> str:
return str(Path(self.get_snapshot_file_path(tag_hash, filename)).resolve())
def get_blob_file_path_by_name(self, tag_hash: str, filename: str) -> Path:
return self.get_snapshot_file_path(tag_hash, filename).resolve()
def get_blob_file_hash(self, tag_hash: str, filename: str) -> str:
return os.path.basename(self.get_blob_file_path_by_name(tag_hash, filename))
blob_file = self.get_blob_file_path_by_name(tag_hash, filename)
return blob_file.name if blob_file.is_file() else ""
def get_partial_blob_file_path(self, file_hash: str) -> str:
return self.get_blob_file_path(file_hash) + ".partial"
def get_partial_blob_file_path(self, file_hash: str) -> Path:
return Path(f"{self.get_blob_file_path(file_hash)}.partial")
def ensure_directory_setup(self) -> None:
os.makedirs(self.blobs_directory, exist_ok=True)
os.makedirs(self.refs_directory, exist_ok=True)
os.makedirs(self.snapshots_directory, exist_ok=True)
self.blobs_directory.mkdir(exist_ok=True, parents=True)
self.refs_directory.mkdir(exist_ok=True, parents=True)
self.snapshots_directory.mkdir(exist_ok=True, parents=True)
def directory_setup_exists(self) -> bool:
return (
os.path.exists(self.blobs_directory)
and os.path.exists(self.refs_directory)
and os.path.exists(self.snapshots_directory)
)
return self.blobs_directory.exists() and self.refs_directory.exists() and self.snapshots_directory.exists()
def get_cached_files(self, model_tag: str) -> Tuple[str, list[str], bool]:
cached_files: list[str] = []
@@ -213,7 +207,7 @@ class ModelStore:
ref_file.write_to_file()
snapshot_directory = self.get_snapshot_directory(snapshot_hash)
os.makedirs(snapshot_directory, exist_ok=True)
snapshot_directory.mkdir(exist_ok=True, parents=True)
return ref_file
def _download_snapshot_files(
@@ -325,7 +319,7 @@ class ModelStore:
for file in ref_file.files:
if file.name == "chat_template_converted":
# Should not exist but 0.13.0 needs_conversion logic was inverted
self._remove_blob_path(Path(self.get_snapshot_file_path(ref_file.hash, file.name)))
self._remove_blob_path(self.get_snapshot_file_path(ref_file.hash, file.name))
ref_file.remove_file(file.hash)
break
self._update_snapshot(ref_file, snapshot_hash, files)
@@ -416,7 +410,7 @@ class ModelStore:
model_tags = [
Path(entry).stem
for entry in os.listdir(self.refs_directory)
if os.path.isfile(os.path.join(self.refs_directory, entry))
if self.refs_directory.joinpath(entry).is_file()
]
refs = [ref for tag in model_tags if (ref := self.get_ref_file(tag))]
@@ -438,22 +432,19 @@ class ModelStore:
for file in ref_file.files:
blob_refcount = blob_refcounts.get(file.name, 0)
if blob_refcount <= 1:
blob_absolute_path = Path(self.get_blob_file_path(file.hash))
self._remove_blob_path(blob_absolute_path)
self._remove_blob_path(self.get_blob_file_path(file.hash))
else:
logger.debug(f"Not removing blob {file} refcount={blob_refcount}")
# Remove snapshot directory
if snapshot_refcount <= 1:
# FIXME: this only cleans up .partial files where the blob hash equals the snapshot hash
partial_blob_file_path = Path(self.get_partial_blob_file_path(ref_file.hash))
self._remove_blob_path(partial_blob_file_path)
snapshot_directory = self.get_snapshot_directory_from_tag(model_tag)
shutil.rmtree(snapshot_directory, ignore_errors=True)
self._remove_blob_path(self.get_partial_blob_file_path(ref_file.hash))
shutil.rmtree(self.get_snapshot_directory_from_tag(model_tag), ignore_errors=True)
logger.debug(f"Snapshot removed {ref_file.hash}")
else:
logger.debug(f"Not removing snapshot {ref_file.hash} refcount={snapshot_refcount}")
# Remove ref file, ignore if file is not found
Path(self.get_ref_file_path(model_tag)).unlink(missing_ok=True)
self.get_ref_file_path(model_tag).unlink(missing_ok=True)
return True

View File

@@ -5,7 +5,7 @@ import platform
from pathlib import Path, PureWindowsPath
def normalize_host_path_for_container(host_path: str) -> str:
def normalize_host_path_for_container(host_path: Path) -> Path:
"""
Convert a host filesystem path to a format suitable for container volume mounts.
@@ -35,22 +35,22 @@ def normalize_host_path_for_container(host_path: str) -> str:
# Docker Desktop for Windows expects paths in the format /c/Users/... instead of C:\Users\...
# First, resolve symlinks and make the path absolute.
path = Path(host_path).resolve()
path = host_path.resolve()
# Handle UNC paths to container filesystem
# e.g if the model store is placed on the podman machine VM to reduce copying
# \\wsl.localhost\podman-machine-default\home\user\.local\share\ramalama\store
# NOTE: UNC paths cannot be accessed implicitly from the container, would need to smb mount
if path.drive.startswith("\\\\"):
return '/' + path.relative_to(path.drive).as_posix()
return Path('/' + path.relative_to(path.drive).as_posix())
if not path.drive:
return path.as_posix()
return Path(path.as_posix())
# Handle paths with drive letters
drive_letter = path.drive[0].lower()
# path.as_posix() on Windows is 'C:/Users/...', so we partition on ':' and take the rest.
return f"/{drive_letter}{path.as_posix().partition(':')[2]}"
return Path(f"/{drive_letter}{path.as_posix().partition(':')[2]}")
def is_windows_absolute_path(path: str) -> bool:
@@ -69,7 +69,7 @@ def is_windows_absolute_path(path: str) -> bool:
return PureWindowsPath(path).is_absolute()
def resolve_real_path(path: str) -> str:
def resolve_real_path(path: Path) -> Path:
"""
Resolve a path to its real absolute path, handling symlinks.
@@ -82,10 +82,10 @@ def resolve_real_path(path: str) -> str:
Returns:
Absolute path with symlinks resolved
"""
return os.path.realpath(path)
return Path(os.path.realpath(path))
def get_container_mount_path(host_path: str) -> str:
def get_container_mount_path(host_path: Path) -> Path:
"""
Get the properly formatted path for use in container mount arguments.
@@ -110,7 +110,7 @@ def get_container_mount_path(host_path: str) -> str:
return normalize_host_path_for_container(real_path)
def create_file_link(src: str, dst: str) -> None:
def create_file_link(src: Path, dst: Path) -> None:
"""
Create a link from dst to src using the best available method for the platform.
@@ -134,15 +134,15 @@ def create_file_link(src: str, dst: str) -> None:
- Hardlinks share the same inode, so deleting one doesn't affect the other
- On Windows, hardlinks are preferred over symlinks for file operations
"""
if not os.path.exists(src):
if not src.exists():
raise FileNotFoundError(f"Source file does not exist: {src}")
# Ensure destination directory exists
os.makedirs(os.path.dirname(dst), exist_ok=True)
dst.parent.mkdir(exist_ok=True, parents=True)
# Remove existing destination if it exists
if os.path.exists(dst) or os.path.islink(dst):
os.unlink(dst)
if dst.exists() or dst.is_symlink():
dst.unlink()
# Strategy 1: Try hardlink first (best for Windows, works without admin)
try:

View File

@@ -1,5 +1,6 @@
import os
import platform
from pathlib import Path
import ramalama.kube as kube
import ramalama.quadlet as quadlet
@@ -60,10 +61,10 @@ class Stack:
return volume_mounts
def _gen_volumes(self):
host_model_path = normalize_host_path_for_container(self.model._get_entry_model_path(False, False, False))
host_model_path = normalize_host_path_for_container(Path(self.model._get_entry_model_path(False, False, False)))
if platform.system() == "Windows":
# Workaround https://github.com/containers/podman/issues/16704
host_model_path = '/mnt' + host_model_path
host_model_path = f"/mnt{host_model_path}"
volumes = f"""
- hostPath:
path: {host_model_path}

View File

@@ -7,6 +7,7 @@ import sys
import time
from abc import ABC, abstractmethod
from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Optional
if TYPE_CHECKING:
@@ -225,7 +226,7 @@ class Transport(TransportBase):
safetensor_files = ref_file.safetensor_model_files
if safetensor_files:
# Safetensor models use directory mounts, not individual files
src_path = self.model_store.get_snapshot_directory_from_tag(self.model_tag)
src_path = str(self.model_store.get_snapshot_directory_from_tag(self.model_tag))
if use_container or should_generate:
dest_path = MNT_DIR
else:
@@ -239,16 +240,20 @@ class Transport(TransportBase):
if use_container or should_generate:
dest_path = f"{MNT_DIR}/{model_file.name}"
else:
dest_path = self.model_store.get_blob_file_path(model_file.hash)
src_path = self.model_store.get_blob_file_path(model_file.hash)
dest_path = str(self.model_store.get_blob_file_path(model_file.hash))
src_path = str(self.model_store.get_blob_file_path(model_file.hash))
model_parts.append((src_path, dest_path))
# Sort multi-part models by filename to ensure correct order
if len(model_parts) > 1 and any("-00001-of-" in name for _, name in model_parts):
if len(model_parts) > 1 and any("-00001-of-" in str(name) for _, name in model_parts):
model_parts.sort(key=lambda x: x[1])
return model_parts
#
# Keep returning str here due to the possible return value of oci:// which would be
# reduced to oci:/ by pathlib.Path.
#
def _get_entry_model_path(self, use_container: bool, should_generate: bool, dry_run: bool) -> str:
"""
Returns the path to the model blob on the host if use_container and should_generate are both False.
@@ -279,7 +284,7 @@ class Transport(TransportBase):
if safetensor_files:
if use_container or should_generate:
return MNT_DIR
return self.model_store.get_snapshot_directory_from_tag(self.model_tag)
return str(self.model_store.get_snapshot_directory_from_tag(self.model_tag))
elif not gguf_files:
raise NoGGUFModelFileFound()
@@ -294,7 +299,7 @@ class Transport(TransportBase):
if use_container or should_generate:
return f"{MNT_DIR}/{model_file.name}"
return self.model_store.get_blob_file_path(model_file.hash)
return str(self.model_store.get_blob_file_path(model_file.hash))
def _get_inspect_model_path(self, dry_run: bool) -> str:
"""Return a concrete file path for inspection.
@@ -304,16 +309,17 @@ class Transport(TransportBase):
return "/path/to/model"
if self.model_type == 'oci':
return self._get_entry_model_path(False, False, dry_run)
safetensor_blob = self.model_store.get_safetensor_blob_path(self.model_tag, self.filename)
safetensor_blob_path = self.model_store.get_safetensor_blob_path(self.model_tag, self.filename)
safetensor_blob = None if safetensor_blob_path is None else str(safetensor_blob_path)
return safetensor_blob or self._get_entry_model_path(False, False, dry_run)
def _get_mmproj_path(self, use_container: bool, should_generate: bool, dry_run: bool) -> Optional[str]:
def _get_mmproj_path(self, use_container: bool, should_generate: bool, dry_run: bool) -> Optional[Path]:
"""
Returns the path to the mmproj blob on the host if use_container and should_generate are both False.
Or returns the path to the mounted file inside a container.
"""
if dry_run:
return ""
return Path("")
if self.model_type == 'oci':
return None
@@ -328,16 +334,16 @@ class Transport(TransportBase):
# Use the first mmproj file
mmproj_file = ref_file.mmproj_files[0]
if use_container or should_generate:
return f"{MNT_DIR}/{mmproj_file.name}"
return Path(f"{MNT_DIR}/{mmproj_file.name}")
return self.model_store.get_blob_file_path(mmproj_file.hash)
def _get_chat_template_path(self, use_container: bool, should_generate: bool, dry_run: bool) -> Optional[str]:
def _get_chat_template_path(self, use_container: bool, should_generate: bool, dry_run: bool) -> Optional[Path]:
"""
Returns the path to the chat template blob on the host if use_container and should_generate are both False.
Or returns the path to the mounted file inside a container.
"""
if dry_run:
return ""
return Path("")
if self.model_type == 'oci':
return None
@@ -352,7 +358,7 @@ class Transport(TransportBase):
# Use the last chat template file (may have been go template converted to jinja)
chat_template_file = ref_file.chat_templates[-1]
if use_container or should_generate:
return f"{MNT_DIR}/{chat_template_file.name}"
return Path(f"{MNT_DIR}/{chat_template_file.name}")
return self.model_store.get_blob_file_path(chat_template_file.hash)
def remove(self, args) -> bool:
@@ -445,7 +451,7 @@ class Transport(TransportBase):
if self.draft_model:
draft_model = self.draft_model._get_entry_model_path(args.container, args.generate, args.dryrun)
# Convert path to container-friendly format (handles Windows path conversion)
container_draft_model = get_container_mount_path(draft_model)
container_draft_model = get_container_mount_path(Path(draft_model))
mount_opts = f"--mount=type=bind,src={container_draft_model},destination={MNT_FILE_DRAFT}"
mount_opts += f",ro{self.engine.relabel()}"
self.engine.add([mount_opts])
@@ -748,7 +754,7 @@ class Transport(TransportBase):
compose.generate().write(output_dir)
def inspect_metadata(self) -> Dict[str, Any]:
model_path = self._get_entry_model_path(False, False, False)
model_path = Path(self._get_entry_model_path(False, False, False))
if GGUFInfoParser.is_model_gguf(model_path):
return GGUFInfoParser.parse_metadata(model_path).data
return {}
@@ -763,7 +769,10 @@ class Transport(TransportBase):
) -> Any:
model_name = self.filename
model_registry = self.type.lower()
model_path = self._get_inspect_model_path(dryrun)
model_path = Path(self._get_inspect_model_path(dryrun))
if not model_path.exists():
raise NoRefFileFound(model_name)
if GGUFInfoParser.is_model_gguf(model_path):
if not show_all_metadata and get_field == "":
gguf_info: GGUFModelInfo = GGUFInfoParser.parse(model_name, model_registry, model_path)
@@ -782,7 +791,7 @@ class Transport(TransportBase):
safetensor_info: SafetensorModelInfo = SafetensorInfoParser.parse(model_name, model_registry, model_path)
return safetensor_info.serialize(json=as_json, all=show_all)
return ModelInfoBase(model_name, model_registry, model_path).serialize(json=as_json)
return ModelInfoBase(model_name, model_registry, str(model_path)).serialize(json=as_json)
def print_pull_message(self, model_name) -> None:
model_name = trim_model_name(model_name)

View File

@@ -116,4 +116,4 @@ def test_ramalama_model_context_without_draft_model():
dry_run=True,
)
assert ctx.draft_model_path == ""
assert ctx.draft_model_path is None

View File

@@ -1,4 +1,3 @@
import os
import shutil
import subprocess
from contextlib import ExitStack
@@ -81,11 +80,11 @@ def test_verify_checksum(
if ":" in input_file_name and platform == "win32":
return
full_dir_path = os.path.join(Path(__file__).parent, "verify_checksum")
file_path = os.path.join(full_dir_path, input_file_name)
full_dir_path = Path(__file__).parent / "verify_checksum"
file_path = full_dir_path / input_file_name
try:
os.makedirs(full_dir_path, exist_ok=True)
full_dir_path.mkdir(exist_ok=True, parents=True)
with open(file_path, "w") as f:
f.write(content)