From 042c2b0903d438ea00df7b145ba1e01d3e4d9854 Mon Sep 17 00:00:00 2001 From: Michael Engel Date: Thu, 15 Jan 2026 11:42:20 +0100 Subject: [PATCH] Refactor code base to use pathlib.Path were possible Signed-off-by: Michael Engel --- ramalama/command/context.py | 9 ++- ramalama/common.py | 15 ++-- ramalama/http_client.py | 16 ++-- ramalama/kube.py | 6 +- ramalama/model_inspect/gguf_parser.py | 11 +-- ramalama/model_inspect/safetensor_parser.py | 5 +- ramalama/model_store/global_store.py | 25 +++--- ramalama/model_store/reffile.py | 26 +++---- ramalama/model_store/snapshot_file.py | 7 +- ramalama/model_store/store.py | 85 +++++++++------------ ramalama/path_utils.py | 26 +++---- ramalama/stack.py | 5 +- ramalama/transports/base.py | 43 ++++++----- test/unit/command/test_context.py | 2 +- test/unit/test_common.py | 7 +- 15 files changed, 147 insertions(+), 141 deletions(-) diff --git a/ramalama/command/context.py b/ramalama/command/context.py index 9d80a3b0..9f13779f 100644 --- a/ramalama/command/context.py +++ b/ramalama/command/context.py @@ -1,5 +1,6 @@ import argparse import os +from pathlib import Path from typing import Optional from ramalama.common import check_metal, check_nvidia @@ -112,19 +113,19 @@ class RamalamaModelContext: return self.model._get_entry_model_path(self.is_container, self.should_generate, self.dry_run) @property - def mmproj_path(self) -> Optional[str]: + def mmproj_path(self) -> Optional[Path]: return self.model._get_mmproj_path(self.is_container, self.should_generate, self.dry_run) @property - def chat_template_path(self) -> Optional[str]: + def chat_template_path(self) -> Optional[Path]: return self.model._get_chat_template_path(self.is_container, self.should_generate, self.dry_run) @property - def draft_model_path(self) -> str: + def draft_model_path(self) -> Optional[str]: if getattr(self.model, "draft_model", None): assert self.model.draft_model return self.model.draft_model._get_entry_model_path(self.is_container, self.should_generate, self.dry_run) - return "" + return None class RamalamaHostContext: diff --git a/ramalama/common.py b/ramalama/common.py index ece729ee..154afee7 100644 --- a/ramalama/common.py +++ b/ramalama/common.py @@ -15,6 +15,7 @@ import subprocess import sys from collections.abc import Callable, Sequence from functools import lru_cache +from pathlib import Path from typing import IO, TYPE_CHECKING, Any, Literal, Protocol, TypeAlias, TypedDict, cast, get_args import yaml @@ -117,7 +118,7 @@ def available(cmd: str) -> bool: def quoted(arr) -> str: """Return string with quotes around elements containing spaces.""" - return " ".join(['"' + element + '"' if ' ' in element else element for element in arr]) + return " ".join(['"' + s + '"' if ' ' in s else s for element in arr for s in [str(element)]]) def exec_cmd(args, stdout2null: bool = False, stderr2null: bool = False): @@ -251,25 +252,25 @@ def generate_sha256(to_hash: str, with_sha_prefix: bool = True) -> str: return generate_sha256_binary(to_hash.encode("utf-8"), with_sha_prefix) -def verify_checksum(filename: str) -> bool: +def verify_checksum(filepath: Path) -> bool: """ Verifies if the SHA-256 checksum of a file matches the checksum provided in - the filename. + the file path. Args: - filename (str): The filename containing the checksum prefix + filepath (Path): The filen path containing the checksum prefix (e.g., "sha256:") Returns: bool: True if the checksum matches, False otherwise. """ - if not os.path.exists(filename): + if not filepath.exists(): return False # Check if the filename starts with "sha256:" or "sha256-" and extract the checksum from filename expected_checksum = "" - fn_base = os.path.basename(filename) + fn_base = filepath.name if filepath.is_file() else "" if fn_base.startswith("sha256:"): expected_checksum = fn_base.split(":")[1] elif fn_base.startswith("sha256-"): @@ -282,7 +283,7 @@ def verify_checksum(filename: str) -> bool: # Calculate the SHA-256 checksum of the file contents sha256_hash = hashlib.sha256() - with open(filename, "rb") as f: + with open(filepath, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): sha256_hash.update(byte_block) diff --git a/ramalama/http_client.py b/ramalama/http_client.py index f9413565..e5d37f0d 100644 --- a/ramalama/http_client.py +++ b/ramalama/http_client.py @@ -5,6 +5,7 @@ import shutil import sys import time import urllib.request +from pathlib import Path import ramalama.console as console from ramalama.common import perror @@ -24,10 +25,10 @@ class HttpClient: def __init__(self): pass - def init(self, url, headers, output_file, show_progress, response_bytes=None): - output_file_partial = None - if output_file: - output_file_partial = output_file + ".partial" + def init( + self, url: str, headers: dict[str, str] | None, output_file: Path, show_progress: bool, response_bytes=None + ): + output_file_partial = Path(f"{output_file}.partial") self.file_size = self.set_resume_point(output_file_partial) self.urlopen(url, headers) @@ -49,8 +50,7 @@ class HttpClient: finally: del out # Ensure file is closed before rename - if output_file: - os.rename(output_file_partial, output_file) + os.rename(output_file_partial, output_file) def urlopen(self, url, headers): headers["Range"] = f"bytes={self.file_size}-" @@ -162,13 +162,13 @@ class HttpClient: return now_downloaded / elapsed_seconds -def download_file(url: str, dest_path: str, headers: dict[str, str] | None = None, show_progress: bool = True): +def download_file(url: str, dest_path: Path, headers: dict[str, str] | None = None, show_progress: bool = True): """ Downloads a file from a given URL to a specified destination path. Args: url (str): The URL to download from. - dest_path (str): The path to save the downloaded file. + dest_path (Path): The path to save the downloaded file. headers (dict): Optional headers to include in the request. show_progress (bool): Whether to show a progress bar during download. diff --git a/ramalama/kube.py b/ramalama/kube.py index 8fd37215..b6437ebb 100644 --- a/ramalama/kube.py +++ b/ramalama/kube.py @@ -95,7 +95,7 @@ class Kube: host_model_path = normalize_host_path_for_container(self.src_model_path) if platform.system() == "Windows": # Workaround https://github.com/containers/podman/issues/16704 - host_model_path = '/mnt' + host_model_path + host_model_path = f"/mnt{host_model_path}" mount = f""" - mountPath: {self.dest_model_path} name: model""" @@ -129,7 +129,7 @@ class Kube: host_chat_template_path = normalize_host_path_for_container(self.src_chat_template_path) if platform.system() == "Windows": # Workaround https://github.com/containers/podman/issues/16704 - host_chat_template_path = '/mnt' + host_chat_template_path + host_chat_template_path = f"/mnt{host_chat_template_path}" mount = f""" - mountPath: {self.dest_chat_template_path} name: chat_template""" @@ -143,7 +143,7 @@ class Kube: host_mmproj_path = normalize_host_path_for_container(self.src_mmproj_path) if platform.system() == "Windows": # Workaround https://github.com/containers/podman/issues/16704 - host_mmproj_path = '/mnt' + host_mmproj_path + host_mmproj_path = f"/mnt{host_mmproj_path}" mount = f""" - mountPath: {self.dest_mmproj_path} name: mmproj""" diff --git a/ramalama/model_inspect/gguf_parser.py b/ramalama/model_inspect/gguf_parser.py index 10dc7162..f5816757 100644 --- a/ramalama/model_inspect/gguf_parser.py +++ b/ramalama/model_inspect/gguf_parser.py @@ -1,6 +1,7 @@ import io import struct from enum import IntEnum +from pathlib import Path from typing import Any, Dict, cast from ramalama.endian import GGUFEndian @@ -112,7 +113,7 @@ GGUF_NUMBER_FORMATS: list[GGUFValueType] = [ class GGUFInfoParser: @staticmethod - def is_model_gguf(model_path: str) -> bool: + def is_model_gguf(model_path: Path) -> bool: try: with open(model_path, "rb") as model_file: magic_number = GGUFInfoParser.read_string(model_file, GGUFEndian.LITTLE, 4) @@ -178,7 +179,7 @@ class GGUFInfoParser: return value @staticmethod - def get_model_endianness(model_path: str) -> GGUFEndian: + def get_model_endianness(model_path: Path) -> GGUFEndian: # Pin model endianness to Little Endian by default. # Models downloaded via HuggingFace are majority Little Endian. model_endianness = GGUFEndian.LITTLE @@ -205,7 +206,7 @@ class GGUFInfoParser: return metadata @staticmethod - def parse_metadata(model_path: str) -> GGUFModelMetadata: + def parse_metadata(model_path: Path) -> GGUFModelMetadata: model_endianness = GGUFInfoParser.get_model_endianness(model_path) with open(model_path, "rb") as model: @@ -220,7 +221,7 @@ class GGUFInfoParser: return GGUFModelMetadata(GGUFInfoParser._parse_metadata(model, model_endianness)) @staticmethod - def parse(model_name: str, model_registry: str, model_path: str) -> GGUFModelInfo: + def parse(model_name: str, model_registry: str, model_path: Path) -> GGUFModelInfo: model_endianness = GGUFInfoParser.get_model_endianness(model_path) with open(model_path, "rb") as model: @@ -249,5 +250,5 @@ class GGUFInfoParser: tensors.append(Tensor(name, n_dimensions, dimensions, tensor_type.name, offset)) return GGUFModelInfo( - model_name, model_registry, model_path, gguf_version, metadata, tensors, model_endianness + model_name, model_registry, str(model_path), gguf_version, metadata, tensors, model_endianness ) diff --git a/ramalama/model_inspect/safetensor_parser.py b/ramalama/model_inspect/safetensor_parser.py index f0f0f7d7..34a4d197 100644 --- a/ramalama/model_inspect/safetensor_parser.py +++ b/ramalama/model_inspect/safetensor_parser.py @@ -1,5 +1,6 @@ import json import struct +from pathlib import Path import ramalama.console as console from ramalama.model_inspect.error import ParseError @@ -18,7 +19,7 @@ class SafetensorInfoParser: return model_name.endswith(".safetensor") or model_name.endswith(".safetensors") @staticmethod - def parse(model_name: str, model_registry: str, model_path: str) -> SafetensorModelInfo: + def parse(model_name: str, model_registry: str, model_path: Path) -> SafetensorModelInfo: try: with open(model_path, "rb") as model_file: prefix = '<' @@ -27,7 +28,7 @@ class SafetensorInfoParser: header_size = struct.unpack(typestring, model_file.read(8))[0] header = json.loads(model_file.read(header_size)) - return SafetensorModelInfo(model_name, model_registry, model_path, header) + return SafetensorModelInfo(model_name, model_registry, str(model_path), header) except Exception as ex: msg = f"Failed to parse safetensor model '{model_path}': {ex}" diff --git a/ramalama/model_store/global_store.py b/ramalama/model_store/global_store.py index 123f9258..68b4d7a3 100644 --- a/ramalama/model_store/global_store.py +++ b/ramalama/model_store/global_store.py @@ -1,6 +1,7 @@ import os from dataclasses import dataclass from datetime import datetime +from pathlib import Path from typing import Dict, List from ramalama import oci_tools @@ -22,10 +23,10 @@ class GlobalModelStore: self, base_path: str, ): - self._store_base_path = os.path.join(base_path, "store") + self._store_base_path = Path(os.path.join(base_path, "store")) @property - def path(self) -> str: + def path(self) -> Path: return self._store_base_path def list_models(self, engine: str, show_container: bool) -> Dict[str, List[ModelFile]]: @@ -33,16 +34,16 @@ class GlobalModelStore: for root, subdirs, _ in os.walk(self.path): if DIRECTORY_NAME_REFS in subdirs: - ref_dir = os.path.join(root, DIRECTORY_NAME_REFS) + ref_dir = Path(root).joinpath(DIRECTORY_NAME_REFS) for ref_file_name in os.listdir(ref_dir): - ref_file_path = os.path.join(ref_dir, ref_file_name) + ref_file_path = ref_dir.joinpath(ref_file_name) ref_file = migrate_reffile_to_refjsonfile( - ref_file_path, os.path.join(root, DIRECTORY_NAME_SNAPSHOTS) + ref_file_path, Path(root).joinpath(DIRECTORY_NAME_SNAPSHOTS) ) if ref_file is None: ref_file = RefJSONFile.from_path(ref_file_path) - model_path = root.replace(self.path, "").replace(os.sep, "", 1) + model_path = root.replace(f"{self.path}", "").replace(os.sep, "", 1) parts = model_path.split(os.sep) model_source = parts[0] @@ -55,14 +56,14 @@ class GlobalModelStore: collected_files = [] for snapshot_file in ref_file.files: is_partially_downloaded = False - snapshot_file_path = os.path.join( - root, DIRECTORY_NAME_SNAPSHOTS, ref_file.hash, snapshot_file.name + snapshot_file_path = Path(root).joinpath( + DIRECTORY_NAME_SNAPSHOTS, ref_file.hash, snapshot_file.name ) - if not os.path.exists(snapshot_file_path): - blobs_partial_file_path = os.path.join( - root, DIRECTORY_NAME_BLOBS, ref_file.hash + ".partial" + if not snapshot_file_path.exists(): + blobs_partial_file_path = Path(root).joinpath( + DIRECTORY_NAME_BLOBS, ref_file.hash + ".partial" ) - if not os.path.exists(blobs_partial_file_path): + if not blobs_partial_file_path.exists(): continue snapshot_file_path = blobs_partial_file_path diff --git a/ramalama/model_store/reffile.py b/ramalama/model_store/reffile.py index 5dcd7ef1..0abc8b07 100644 --- a/ramalama/model_store/reffile.py +++ b/ramalama/model_store/reffile.py @@ -1,6 +1,6 @@ import json import os -from dataclasses import dataclass +from dataclasses import asdict, dataclass from pathlib import Path from typing import Optional @@ -21,14 +21,14 @@ class RefFile: self.model_name: str = "" self.chat_template_name: str = "" self.mmproj_name: str = "" - self._path: str = "" + self._path: Path = Path("") @property - def path(self) -> str: + def path(self) -> Path: return self._path @staticmethod - def from_path(path: str) -> "RefFile": + def from_path(path: Path) -> "RefFile": ref_file = RefFile() ref_file._path = path with open(path, "r") as file: @@ -82,12 +82,12 @@ class RefFile: file.flush() @staticmethod - def map_to_refjsonfile(ref_file_path: str, snapshot_directory: str) -> "RefJSONFile": + def map_to_refjsonfile(ref_file_path: Path, snapshot_directory: Path) -> "RefJSONFile": ref_file = RefFile.from_path(ref_file_path) ref = RefJSONFile( hash=ref_file.hash, - path=f"{ref_file.path}.json", + path=Path(f"{ref_file.path}.json"), files=[], ) @@ -116,10 +116,10 @@ class RefFile: # # Temporary migration routine to ensure smooth transition to new RefFile format # -def migrate_reffile_to_refjsonfile(ref_file_path: str, snapshot_directory: str) -> Optional["RefJSONFile"]: +def migrate_reffile_to_refjsonfile(ref_file_path: Path, snapshot_directory: Path) -> Optional["RefJSONFile"]: # Check if a ref file in old format is present by removing the file extension - old_ref_file_path = ref_file_path.replace(".json", "") - if os.path.exists(old_ref_file_path): + old_ref_file_path = ref_file_path.with_suffix("") + if old_ref_file_path.exists(): logger.debug(f"Migrating old ref file '{old_ref_file_path}' to new format") ref: RefJSONFile = RefFile.map_to_refjsonfile(old_ref_file_path, snapshot_directory) ref.write_to_file() @@ -162,13 +162,13 @@ class StoreFile: @dataclass class RefJSONFile: hash: str - path: str + path: Path files: list[StoreFile] version: str = "v1.0.1" def to_json(self) -> str: - return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=2) + return json.dumps(asdict(self), default=str, sort_keys=True, indent=2) def write_to_file(self): with open(self.path, "w") as file: @@ -198,7 +198,7 @@ class RefJSONFile: break @staticmethod - def from_path(path: str) -> "RefJSONFile": + def from_path(path: Path) -> "RefJSONFile": with open(path, "r") as f: data = json.loads(f.read()) @@ -219,7 +219,7 @@ class RefJSONFile: files=ref_files, ) # ref file has moved - if ref_file.path != data["path"]: + if f"{ref_file.path}" != data["path"]: logger.debug(f"Updating ref file path to '{ref_file.path}'") ref_file.write_to_file() return ref_file diff --git a/ramalama/model_store/snapshot_file.py b/ramalama/model_store/snapshot_file.py index ef4551ec..5ae190fc 100644 --- a/ramalama/model_store/snapshot_file.py +++ b/ramalama/model_store/snapshot_file.py @@ -1,5 +1,6 @@ import os from enum import IntEnum +from pathlib import Path from typing import Dict, Sequence from ramalama.common import generate_sha256_binary, perror @@ -36,8 +37,8 @@ class SnapshotFile: self.should_verify_checksum: bool = should_verify_checksum self.required: bool = required - def download(self, blob_file_path: str, snapshot_dir: str) -> str: - if not os.path.exists(blob_file_path): + def download(self, blob_file_path: Path, snapshot_dir: Path) -> str: + if not blob_file_path.exists(): if self.should_show_progress: perror(f"Downloading {self.name}") download_file( @@ -50,7 +51,7 @@ class SnapshotFile: logger.debug(f"Using cached blob for {self.name} ({os.path.basename(blob_file_path)})") prefix = os.path.dirname(self.name) if prefix: - snapshot_dir = os.path.join(snapshot_dir, prefix) + snapshot_dir = snapshot_dir.joinpath(prefix) return os.path.relpath(blob_file_path, start=snapshot_dir) diff --git a/ramalama/model_store/store.py b/ramalama/model_store/store.py index 26f343d8..1db888ba 100644 --- a/ramalama/model_store/store.py +++ b/ramalama/model_store/store.py @@ -53,7 +53,7 @@ class ModelStore: self._model_organization = model_organization @property - def base_path(self) -> str: + def base_path(self) -> Path: return self._store.path @property @@ -69,32 +69,29 @@ class ModelStore: return self._model_type @property - def model_base_directory(self) -> str: - return os.path.join(self.base_path, self.model_type, self.model_organization, self.model_name) + def model_base_directory(self) -> Path: + return self.base_path.joinpath(self.model_type, self.model_organization, self.model_name) @property - def blobs_directory(self) -> str: - return os.path.join(self.model_base_directory, DIRECTORY_NAME_BLOBS) + def blobs_directory(self) -> Path: + return self.model_base_directory.joinpath(DIRECTORY_NAME_BLOBS) @property - def refs_directory(self) -> str: - return os.path.join(self.model_base_directory, DIRECTORY_NAME_REFS) + def refs_directory(self) -> Path: + return self.model_base_directory.joinpath(DIRECTORY_NAME_REFS) @property - def snapshots_directory(self) -> str: - return os.path.join(self.model_base_directory, DIRECTORY_NAME_SNAPSHOTS) + def snapshots_directory(self) -> Path: + return self.model_base_directory.joinpath(DIRECTORY_NAME_SNAPSHOTS) - def file_exists(self, file_path: str) -> bool: - return os.path.exists(file_path) - - def get_ref_file_path(self, model_tag: str) -> str: - return os.path.join(self.refs_directory, f"{model_tag}.json") + def get_ref_file_path(self, model_tag: str) -> Path: + return self.refs_directory.joinpath(f"{model_tag}.json") def get_ref_file(self, model_tag: str) -> Optional[RefJSONFile]: ref_file_path = self.get_ref_file_path(model_tag) ref_file = migrate_reffile_to_refjsonfile(ref_file_path, self.snapshots_directory) if ref_file is None: - if os.path.exists(ref_file_path): + if ref_file_path.exists(): ref_file = RefJSONFile.from_path(ref_file_path) if ref_file is not None: if ref_file.version != RefJSONFile.version: @@ -130,19 +127,19 @@ class ModelStore: return "" return sanitize_filename(ref_file.hash) - def get_snapshot_directory_from_tag(self, model_tag: str) -> str: - return os.path.join(self.snapshots_directory, self.get_snapshot_hash(model_tag)) + def get_snapshot_directory_from_tag(self, model_tag: str) -> Path: + return self.snapshots_directory.joinpath(self.get_snapshot_hash(model_tag)) - def get_snapshot_directory(self, hash: str) -> str: - return os.path.join(self.snapshots_directory, hash) + def get_snapshot_directory(self, hash: str) -> Path: + return self.snapshots_directory.joinpath(hash) - def get_snapshot_file_path(self, tag_hash: str, filename: str) -> str: - return os.path.join(self.snapshots_directory, sanitize_filename(tag_hash), filename) + def get_snapshot_file_path(self, tag_hash: str, filename: str) -> Path: + return self.snapshots_directory.joinpath(sanitize_filename(tag_hash), filename) - def get_blob_file_path(self, file_hash: str) -> str: - return os.path.join(self.blobs_directory, sanitize_filename(file_hash)) + def get_blob_file_path(self, file_hash: str) -> Path: + return self.blobs_directory.joinpath(sanitize_filename(file_hash)) - def get_safetensor_blob_path(self, model_tag: str, requested_filename: str) -> Optional[str]: + def get_safetensor_blob_path(self, model_tag: str, requested_filename: str) -> Optional[Path]: ref_file = self.get_ref_file(model_tag) if ref_file is None: return None @@ -153,26 +150,23 @@ class ModelStore: chosen = matched if matched is not None else safetensor_files[0] return self.get_blob_file_path(chosen.hash) - def get_blob_file_path_by_name(self, tag_hash: str, filename: str) -> str: - return str(Path(self.get_snapshot_file_path(tag_hash, filename)).resolve()) + def get_blob_file_path_by_name(self, tag_hash: str, filename: str) -> Path: + return self.get_snapshot_file_path(tag_hash, filename).resolve() def get_blob_file_hash(self, tag_hash: str, filename: str) -> str: - return os.path.basename(self.get_blob_file_path_by_name(tag_hash, filename)) + blob_file = self.get_blob_file_path_by_name(tag_hash, filename) + return blob_file.name if blob_file.is_file() else "" - def get_partial_blob_file_path(self, file_hash: str) -> str: - return self.get_blob_file_path(file_hash) + ".partial" + def get_partial_blob_file_path(self, file_hash: str) -> Path: + return Path(f"{self.get_blob_file_path(file_hash)}.partial") def ensure_directory_setup(self) -> None: - os.makedirs(self.blobs_directory, exist_ok=True) - os.makedirs(self.refs_directory, exist_ok=True) - os.makedirs(self.snapshots_directory, exist_ok=True) + self.blobs_directory.mkdir(exist_ok=True, parents=True) + self.refs_directory.mkdir(exist_ok=True, parents=True) + self.snapshots_directory.mkdir(exist_ok=True, parents=True) def directory_setup_exists(self) -> bool: - return ( - os.path.exists(self.blobs_directory) - and os.path.exists(self.refs_directory) - and os.path.exists(self.snapshots_directory) - ) + return self.blobs_directory.exists() and self.refs_directory.exists() and self.snapshots_directory.exists() def get_cached_files(self, model_tag: str) -> Tuple[str, list[str], bool]: cached_files: list[str] = [] @@ -213,7 +207,7 @@ class ModelStore: ref_file.write_to_file() snapshot_directory = self.get_snapshot_directory(snapshot_hash) - os.makedirs(snapshot_directory, exist_ok=True) + snapshot_directory.mkdir(exist_ok=True, parents=True) return ref_file def _download_snapshot_files( @@ -325,7 +319,7 @@ class ModelStore: for file in ref_file.files: if file.name == "chat_template_converted": # Should not exist but 0.13.0 needs_conversion logic was inverted - self._remove_blob_path(Path(self.get_snapshot_file_path(ref_file.hash, file.name))) + self._remove_blob_path(self.get_snapshot_file_path(ref_file.hash, file.name)) ref_file.remove_file(file.hash) break self._update_snapshot(ref_file, snapshot_hash, files) @@ -416,7 +410,7 @@ class ModelStore: model_tags = [ Path(entry).stem for entry in os.listdir(self.refs_directory) - if os.path.isfile(os.path.join(self.refs_directory, entry)) + if self.refs_directory.joinpath(entry).is_file() ] refs = [ref for tag in model_tags if (ref := self.get_ref_file(tag))] @@ -438,22 +432,19 @@ class ModelStore: for file in ref_file.files: blob_refcount = blob_refcounts.get(file.name, 0) if blob_refcount <= 1: - blob_absolute_path = Path(self.get_blob_file_path(file.hash)) - self._remove_blob_path(blob_absolute_path) + self._remove_blob_path(self.get_blob_file_path(file.hash)) else: logger.debug(f"Not removing blob {file} refcount={blob_refcount}") # Remove snapshot directory if snapshot_refcount <= 1: # FIXME: this only cleans up .partial files where the blob hash equals the snapshot hash - partial_blob_file_path = Path(self.get_partial_blob_file_path(ref_file.hash)) - self._remove_blob_path(partial_blob_file_path) - snapshot_directory = self.get_snapshot_directory_from_tag(model_tag) - shutil.rmtree(snapshot_directory, ignore_errors=True) + self._remove_blob_path(self.get_partial_blob_file_path(ref_file.hash)) + shutil.rmtree(self.get_snapshot_directory_from_tag(model_tag), ignore_errors=True) logger.debug(f"Snapshot removed {ref_file.hash}") else: logger.debug(f"Not removing snapshot {ref_file.hash} refcount={snapshot_refcount}") # Remove ref file, ignore if file is not found - Path(self.get_ref_file_path(model_tag)).unlink(missing_ok=True) + self.get_ref_file_path(model_tag).unlink(missing_ok=True) return True diff --git a/ramalama/path_utils.py b/ramalama/path_utils.py index 04b2b353..a9bf7a1d 100644 --- a/ramalama/path_utils.py +++ b/ramalama/path_utils.py @@ -5,7 +5,7 @@ import platform from pathlib import Path, PureWindowsPath -def normalize_host_path_for_container(host_path: str) -> str: +def normalize_host_path_for_container(host_path: Path) -> Path: """ Convert a host filesystem path to a format suitable for container volume mounts. @@ -35,22 +35,22 @@ def normalize_host_path_for_container(host_path: str) -> str: # Docker Desktop for Windows expects paths in the format /c/Users/... instead of C:\Users\... # First, resolve symlinks and make the path absolute. - path = Path(host_path).resolve() + path = host_path.resolve() # Handle UNC paths to container filesystem # e.g if the model store is placed on the podman machine VM to reduce copying # \\wsl.localhost\podman-machine-default\home\user\.local\share\ramalama\store # NOTE: UNC paths cannot be accessed implicitly from the container, would need to smb mount if path.drive.startswith("\\\\"): - return '/' + path.relative_to(path.drive).as_posix() + return Path('/' + path.relative_to(path.drive).as_posix()) if not path.drive: - return path.as_posix() + return Path(path.as_posix()) # Handle paths with drive letters drive_letter = path.drive[0].lower() # path.as_posix() on Windows is 'C:/Users/...', so we partition on ':' and take the rest. - return f"/{drive_letter}{path.as_posix().partition(':')[2]}" + return Path(f"/{drive_letter}{path.as_posix().partition(':')[2]}") def is_windows_absolute_path(path: str) -> bool: @@ -69,7 +69,7 @@ def is_windows_absolute_path(path: str) -> bool: return PureWindowsPath(path).is_absolute() -def resolve_real_path(path: str) -> str: +def resolve_real_path(path: Path) -> Path: """ Resolve a path to its real absolute path, handling symlinks. @@ -82,10 +82,10 @@ def resolve_real_path(path: str) -> str: Returns: Absolute path with symlinks resolved """ - return os.path.realpath(path) + return Path(os.path.realpath(path)) -def get_container_mount_path(host_path: str) -> str: +def get_container_mount_path(host_path: Path) -> Path: """ Get the properly formatted path for use in container mount arguments. @@ -110,7 +110,7 @@ def get_container_mount_path(host_path: str) -> str: return normalize_host_path_for_container(real_path) -def create_file_link(src: str, dst: str) -> None: +def create_file_link(src: Path, dst: Path) -> None: """ Create a link from dst to src using the best available method for the platform. @@ -134,15 +134,15 @@ def create_file_link(src: str, dst: str) -> None: - Hardlinks share the same inode, so deleting one doesn't affect the other - On Windows, hardlinks are preferred over symlinks for file operations """ - if not os.path.exists(src): + if not src.exists(): raise FileNotFoundError(f"Source file does not exist: {src}") # Ensure destination directory exists - os.makedirs(os.path.dirname(dst), exist_ok=True) + dst.parent.mkdir(exist_ok=True, parents=True) # Remove existing destination if it exists - if os.path.exists(dst) or os.path.islink(dst): - os.unlink(dst) + if dst.exists() or dst.is_symlink(): + dst.unlink() # Strategy 1: Try hardlink first (best for Windows, works without admin) try: diff --git a/ramalama/stack.py b/ramalama/stack.py index fbe7196c..d2a48766 100644 --- a/ramalama/stack.py +++ b/ramalama/stack.py @@ -1,5 +1,6 @@ import os import platform +from pathlib import Path import ramalama.kube as kube import ramalama.quadlet as quadlet @@ -60,10 +61,10 @@ class Stack: return volume_mounts def _gen_volumes(self): - host_model_path = normalize_host_path_for_container(self.model._get_entry_model_path(False, False, False)) + host_model_path = normalize_host_path_for_container(Path(self.model._get_entry_model_path(False, False, False))) if platform.system() == "Windows": # Workaround https://github.com/containers/podman/issues/16704 - host_model_path = '/mnt' + host_model_path + host_model_path = f"/mnt{host_model_path}" volumes = f""" - hostPath: path: {host_model_path} diff --git a/ramalama/transports/base.py b/ramalama/transports/base.py index 6f853ec0..0a7cabd5 100644 --- a/ramalama/transports/base.py +++ b/ramalama/transports/base.py @@ -7,6 +7,7 @@ import sys import time from abc import ABC, abstractmethod from functools import cached_property +from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, Optional if TYPE_CHECKING: @@ -225,7 +226,7 @@ class Transport(TransportBase): safetensor_files = ref_file.safetensor_model_files if safetensor_files: # Safetensor models use directory mounts, not individual files - src_path = self.model_store.get_snapshot_directory_from_tag(self.model_tag) + src_path = str(self.model_store.get_snapshot_directory_from_tag(self.model_tag)) if use_container or should_generate: dest_path = MNT_DIR else: @@ -239,16 +240,20 @@ class Transport(TransportBase): if use_container or should_generate: dest_path = f"{MNT_DIR}/{model_file.name}" else: - dest_path = self.model_store.get_blob_file_path(model_file.hash) - src_path = self.model_store.get_blob_file_path(model_file.hash) + dest_path = str(self.model_store.get_blob_file_path(model_file.hash)) + src_path = str(self.model_store.get_blob_file_path(model_file.hash)) model_parts.append((src_path, dest_path)) # Sort multi-part models by filename to ensure correct order - if len(model_parts) > 1 and any("-00001-of-" in name for _, name in model_parts): + if len(model_parts) > 1 and any("-00001-of-" in str(name) for _, name in model_parts): model_parts.sort(key=lambda x: x[1]) return model_parts + # + # Keep returning str here due to the possible return value of oci:// which would be + # reduced to oci:/ by pathlib.Path. + # def _get_entry_model_path(self, use_container: bool, should_generate: bool, dry_run: bool) -> str: """ Returns the path to the model blob on the host if use_container and should_generate are both False. @@ -279,7 +284,7 @@ class Transport(TransportBase): if safetensor_files: if use_container or should_generate: return MNT_DIR - return self.model_store.get_snapshot_directory_from_tag(self.model_tag) + return str(self.model_store.get_snapshot_directory_from_tag(self.model_tag)) elif not gguf_files: raise NoGGUFModelFileFound() @@ -294,7 +299,7 @@ class Transport(TransportBase): if use_container or should_generate: return f"{MNT_DIR}/{model_file.name}" - return self.model_store.get_blob_file_path(model_file.hash) + return str(self.model_store.get_blob_file_path(model_file.hash)) def _get_inspect_model_path(self, dry_run: bool) -> str: """Return a concrete file path for inspection. @@ -304,16 +309,17 @@ class Transport(TransportBase): return "/path/to/model" if self.model_type == 'oci': return self._get_entry_model_path(False, False, dry_run) - safetensor_blob = self.model_store.get_safetensor_blob_path(self.model_tag, self.filename) + safetensor_blob_path = self.model_store.get_safetensor_blob_path(self.model_tag, self.filename) + safetensor_blob = None if safetensor_blob_path is None else str(safetensor_blob_path) return safetensor_blob or self._get_entry_model_path(False, False, dry_run) - def _get_mmproj_path(self, use_container: bool, should_generate: bool, dry_run: bool) -> Optional[str]: + def _get_mmproj_path(self, use_container: bool, should_generate: bool, dry_run: bool) -> Optional[Path]: """ Returns the path to the mmproj blob on the host if use_container and should_generate are both False. Or returns the path to the mounted file inside a container. """ if dry_run: - return "" + return Path("") if self.model_type == 'oci': return None @@ -328,16 +334,16 @@ class Transport(TransportBase): # Use the first mmproj file mmproj_file = ref_file.mmproj_files[0] if use_container or should_generate: - return f"{MNT_DIR}/{mmproj_file.name}" + return Path(f"{MNT_DIR}/{mmproj_file.name}") return self.model_store.get_blob_file_path(mmproj_file.hash) - def _get_chat_template_path(self, use_container: bool, should_generate: bool, dry_run: bool) -> Optional[str]: + def _get_chat_template_path(self, use_container: bool, should_generate: bool, dry_run: bool) -> Optional[Path]: """ Returns the path to the chat template blob on the host if use_container and should_generate are both False. Or returns the path to the mounted file inside a container. """ if dry_run: - return "" + return Path("") if self.model_type == 'oci': return None @@ -352,7 +358,7 @@ class Transport(TransportBase): # Use the last chat template file (may have been go template converted to jinja) chat_template_file = ref_file.chat_templates[-1] if use_container or should_generate: - return f"{MNT_DIR}/{chat_template_file.name}" + return Path(f"{MNT_DIR}/{chat_template_file.name}") return self.model_store.get_blob_file_path(chat_template_file.hash) def remove(self, args) -> bool: @@ -445,7 +451,7 @@ class Transport(TransportBase): if self.draft_model: draft_model = self.draft_model._get_entry_model_path(args.container, args.generate, args.dryrun) # Convert path to container-friendly format (handles Windows path conversion) - container_draft_model = get_container_mount_path(draft_model) + container_draft_model = get_container_mount_path(Path(draft_model)) mount_opts = f"--mount=type=bind,src={container_draft_model},destination={MNT_FILE_DRAFT}" mount_opts += f",ro{self.engine.relabel()}" self.engine.add([mount_opts]) @@ -748,7 +754,7 @@ class Transport(TransportBase): compose.generate().write(output_dir) def inspect_metadata(self) -> Dict[str, Any]: - model_path = self._get_entry_model_path(False, False, False) + model_path = Path(self._get_entry_model_path(False, False, False)) if GGUFInfoParser.is_model_gguf(model_path): return GGUFInfoParser.parse_metadata(model_path).data return {} @@ -763,7 +769,10 @@ class Transport(TransportBase): ) -> Any: model_name = self.filename model_registry = self.type.lower() - model_path = self._get_inspect_model_path(dryrun) + model_path = Path(self._get_inspect_model_path(dryrun)) + if not model_path.exists(): + raise NoRefFileFound(model_name) + if GGUFInfoParser.is_model_gguf(model_path): if not show_all_metadata and get_field == "": gguf_info: GGUFModelInfo = GGUFInfoParser.parse(model_name, model_registry, model_path) @@ -782,7 +791,7 @@ class Transport(TransportBase): safetensor_info: SafetensorModelInfo = SafetensorInfoParser.parse(model_name, model_registry, model_path) return safetensor_info.serialize(json=as_json, all=show_all) - return ModelInfoBase(model_name, model_registry, model_path).serialize(json=as_json) + return ModelInfoBase(model_name, model_registry, str(model_path)).serialize(json=as_json) def print_pull_message(self, model_name) -> None: model_name = trim_model_name(model_name) diff --git a/test/unit/command/test_context.py b/test/unit/command/test_context.py index 35a8f939..165d9e69 100644 --- a/test/unit/command/test_context.py +++ b/test/unit/command/test_context.py @@ -116,4 +116,4 @@ def test_ramalama_model_context_without_draft_model(): dry_run=True, ) - assert ctx.draft_model_path == "" + assert ctx.draft_model_path is None diff --git a/test/unit/test_common.py b/test/unit/test_common.py index 05b01901..30e70baf 100644 --- a/test/unit/test_common.py +++ b/test/unit/test_common.py @@ -1,4 +1,3 @@ -import os import shutil import subprocess from contextlib import ExitStack @@ -81,11 +80,11 @@ def test_verify_checksum( if ":" in input_file_name and platform == "win32": return - full_dir_path = os.path.join(Path(__file__).parent, "verify_checksum") - file_path = os.path.join(full_dir_path, input_file_name) + full_dir_path = Path(__file__).parent / "verify_checksum" + file_path = full_dir_path / input_file_name try: - os.makedirs(full_dir_path, exist_ok=True) + full_dir_path.mkdir(exist_ok=True, parents=True) with open(file_path, "w") as f: f.write(content)