Source code for goodmap.db

import json
import logging
import os
import tempfile
from functools import partial
from typing import Any

from goodmap.core import get_queried_data
from goodmap.data_models.location import LocationBase
from goodmap.exceptions import (
    AlreadyExistsError,
    LocationAlreadyExistsError,
    LocationNotFoundError,
    ReportNotFoundError,
)

logger = logging.getLogger(__name__)

# TODO file is temporary solution to be compatible with old, static code,
#  it should be replaced with dynamic solution


def __parse_pagination_params(query):
    """Extract and validate pagination parameters from query."""
    try:
        page = max(1, int(query.get("page", ["1"])[0]))
    except (ValueError, IndexError, TypeError):
        page = 1

    per_page_raw = query.get("per_page", ["20"])[0] if query.get("per_page") else "20"
    if per_page_raw == "all":
        per_page = None
    else:
        try:
            per_page = max(1, min(int(per_page_raw), 1000))  # Cap at 1000
        except (ValueError, TypeError):
            per_page = 20

    sort_by = query.get("sort_by", [None])[0]
    sort_order = query.get("sort_order", ["asc"])[0] if query.get("sort_order") else "asc"

    return page, per_page, sort_by, sort_order.lower()


def __build_pagination_response(items, total, page, per_page):
    """Build standardized pagination response."""
    if per_page:
        total_pages = (total + per_page - 1) // per_page
    else:
        total_pages = 1
        per_page = total

    return {
        "items": items,
        "pagination": {
            "total": total,
            "page": page,
            "per_page": per_page,
            "total_pages": total_pages,
        },
    }


[docs] def json_file_atomic_dump(data, file_path): """Write JSON data to a file atomically using a temporary file and rename. Args: data: Data to serialize as JSON. file_path: Destination file path. """ dir_name = os.path.dirname(file_path) with tempfile.NamedTemporaryFile("w", dir=dir_name, delete=False) as temp_file: json.dump(data, temp_file) temp_file.flush() os.fsync(temp_file.fileno()) os.replace(temp_file.name, file_path)
[docs] class PaginationHelper: """Common pagination utility to eliminate duplication across backends."""
[docs] @staticmethod def get_sort_key(item, sort_by): """Extract sort key from item for both dict and object types.""" if sort_by == "name" and hasattr(item, "name"): value = item.name elif isinstance(item, dict): value = item.get(sort_by) else: value = getattr(item, sort_by, None) return (value is not None, value or "")
[docs] @staticmethod def apply_pagination_and_sorting(items, page, per_page, sort_by, sort_order): """Apply sorting and pagination to a list of items.""" # Apply sorting if sort_by: reverse = sort_order == "desc" items.sort( key=lambda item: PaginationHelper.get_sort_key(item, sort_by), # type: ignore[reportUnknownLambdaType] reverse=reverse, ) total_count = len(items) # Apply pagination if per_page: start_idx = (page - 1) * per_page end_idx = start_idx + per_page paginated_items = items[start_idx:end_idx] else: paginated_items = items return paginated_items, total_count
[docs] @staticmethod def apply_filters(items, filters): """Apply filtering based on provided filters dictionary.""" filtered_items = items # Apply status filtering if "status" in filters: statuses = filters["status"] if statuses: filtered_items = [ item for item in filtered_items if ( item.get("status") if isinstance(item, dict) else getattr(item, "status", None) ) in statuses ] # Apply priority filtering if "priority" in filters: priorities = filters["priority"] if priorities: filtered_items = [ item for item in filtered_items if ( item.get("priority") if isinstance(item, dict) else getattr(item, "priority", None) ) in priorities ] return filtered_items
[docs] @staticmethod def serialize_items(items): """Convert items to dict if needed (for location models).""" if items and hasattr(items[0], "model_dump"): return [x.model_dump() for x in items] else: return items
[docs] @staticmethod def create_paginated_response(items, query, extract_filters_func=None): """Create a complete paginated response with all common logic.""" # Parse pagination parameters using the existing function try: page = max(1, int(query.get("page", ["1"])[0])) except (ValueError, IndexError, TypeError): page = 1 per_page_raw = query.get("per_page", ["20"])[0] if query.get("per_page") else "20" if per_page_raw == "all": per_page = None else: try: per_page = max(1, min(int(per_page_raw), 1000)) # Cap at 1000 except (ValueError, TypeError): per_page = 20 sort_by = query.get("sort_by", [None])[0] sort_order = query.get("sort_order", ["asc"])[0] if query.get("sort_order") else "asc" sort_order = sort_order.lower() # Apply filters if any filters = {} if query: if "status" in query: filters["status"] = query["status"] if "priority" in query: filters["priority"] = query["priority"] # Allow custom filter extraction if extract_filters_func: custom_filters = extract_filters_func(query) filters.update(custom_filters) if filters: items = PaginationHelper.apply_filters(items, filters) # Apply pagination and sorting paginated_items, total_count = PaginationHelper.apply_pagination_and_sorting( items, page, per_page, sort_by, sort_order ) # Serialize items if needed serialized_items = PaginationHelper.serialize_items(paginated_items) # Build pagination response directly if per_page: total_pages = (total_count + per_page - 1) // per_page else: total_pages = 1 per_page = total_count return { "items": serialized_items, "pagination": { "total": total_count, "page": page, "per_page": per_page, "total_pages": total_pages, }, }
[docs] class FileIOHelper: """Common file I/O utilities to eliminate duplication."""
[docs] @staticmethod def read_json_file(file_path): """Read and parse JSON file.""" with open(file_path, "r") as file: return json.load(file)
[docs] @staticmethod def write_json_file_atomic(data, file_path): """Write JSON data to file atomically.""" json_file_atomic_dump(data, file_path)
[docs] @staticmethod def get_data_from_file(file_path, data_key="map"): """Get data from JSON file with specified key structure.""" json_data = FileIOHelper.read_json_file(file_path) return json_data.get(data_key, {})
[docs] class CRUDHelper: """Common CRUD operation utilities to eliminate duplication."""
[docs] @staticmethod def add_item_to_json_db(db_data, collection_name, item_data, default_status=None): """Add item to JSON in-memory database.""" collection = db_data.setdefault(collection_name, []) uuid = item_data.get("uuid") resource_type = collection_name.rstrip("s").capitalize() # Check if item already exists existing = next( ( item for item in collection if (item.get("uuid") if isinstance(item, dict) else getattr(item, "uuid", None)) == uuid ), None, ) if existing: raise AlreadyExistsError(uuid, resource_type) record = dict(item_data) if default_status: record["status"] = default_status collection.append(record)
[docs] @staticmethod def add_item_to_json_file_db(file_path, collection_name, item_data, default_status=None): """Add item to JSON file database.""" json_file = FileIOHelper.read_json_file(file_path) collection = json_file["map"].get(collection_name, []) uuid = item_data.get("uuid") resource_type = collection_name.rstrip("s").capitalize() # Check if item already exists existing = next( ( item for item in collection if (item.get("uuid") if isinstance(item, dict) else getattr(item, "uuid", None)) == uuid ), None, ) if existing: raise AlreadyExistsError(uuid, resource_type) record = dict(item_data) if default_status: record["status"] = default_status collection.append(record) json_file["map"][collection_name] = collection FileIOHelper.write_json_file_atomic(json_file, file_path)
[docs] @staticmethod def add_item_to_mongodb(db_collection, item_data, item_type, default_status=None): """Add item to MongoDB database.""" existing = db_collection.find_one({"uuid": item_data.get("uuid")}) if existing: raise AlreadyExistsError(item_data.get("uuid"), item_type) record = dict(item_data) if default_status: record["status"] = default_status db_collection.insert_one(record)
# ------------------------------------------------ # get_location_obligatory_fields
[docs] def json_db_get_location_obligatory_fields(db): """Return location obligatory fields from in-memory JSON database.""" return db.data["location_obligatory_fields"]
[docs] def json_file_db_get_location_obligatory_fields(db): """Return location obligatory fields from JSON file database.""" with open(db.data_file_path, "r") as file: return json.load(file)["map"]["location_obligatory_fields"]
[docs] def google_json_db_get_location_obligatory_fields(db): """Return location obligatory fields from Google Cloud Storage JSON.""" return db.data.get("map", {}).get("location_obligatory_fields", [])
[docs] def mongodb_db_get_location_obligatory_fields(db): """Return location obligatory fields from MongoDB.""" config_doc = db.db.config.find_one({"_id": "map_config"}) if config_doc and "location_obligatory_fields" in config_doc: return config_doc["location_obligatory_fields"] return []
[docs] def get_location_obligatory_fields(db): """Dispatch to the backend-specific get_location_obligatory_fields function.""" return globals()[f"{db.module_name}_get_location_obligatory_fields"](db)
# ------------------------------------------------ # get_issue_options
[docs] def json_db_get_issue_options(self): """Return reported issue types from in-memory JSON database.""" return self.data.get("reported_issue_types", [])
[docs] def json_file_db_get_issue_options(self): """Return reported issue types from JSON file database.""" with open(self.data_file_path, "r") as file: return json.load(file)["map"].get("reported_issue_types", [])
[docs] def google_json_db_get_issue_options(self): """Return reported issue types from Google Cloud Storage JSON.""" return self.data.get("map", {}).get("reported_issue_types", [])
[docs] def mongodb_db_get_issue_options(self): """Return reported issue types from MongoDB.""" config_doc = self.db.config.find_one({"_id": "map_config"}) if config_doc and "reported_issue_types" in config_doc: return config_doc["reported_issue_types"] return []
[docs] def get_issue_options(db): """Dispatch to the backend-specific get_issue_options function.""" return globals()[f"{db.module_name}_get_issue_options"]
# ------------------------------------------------ # get_data
[docs] def google_json_db_get_data(self): """Return map data from Google Cloud Storage JSON.""" return self.data.get("map", {})
[docs] def json_file_db_get_data(self): """Return map data from JSON file database.""" with open(self.data_file_path, "r") as file: return json.load(file)["map"]
[docs] def json_db_get_data(self): """Return map data from in-memory JSON database.""" return self.data
[docs] def mongodb_db_get_data(self): """Return map data from MongoDB, including locations and config.""" config_doc = self.db.config.find_one({"_id": "map_config"}) if config_doc: return { "data": list(self.db.locations.find({}, {"_id": 0})), "categories": config_doc.get("categories", {}), "location_obligatory_fields": config_doc.get("location_obligatory_fields", []), # Backward-compat keys expected by core_api today "visible_data": config_doc.get("visible_data", {}), "meta_data": config_doc.get("meta_data", {}), } return { "data": [], "categories": {}, "location_obligatory_fields": [], "visible_data": {}, "meta_data": {}, }
[docs] def get_data(db): """Dispatch to the backend-specific get_data function.""" return globals()[f"{db.module_name}_get_data"]
# ------------------------------------------------ # get_visible_data
[docs] def google_json_db_get_visible_data(self) -> dict[str, Any]: """ Retrieve visible data configuration from Google Cloud Storage JSON blob. Returns: dict: Dictionary containing field visibility configuration. Returns empty dict if not found. """ return self.data.get("map", {}).get("visible_data", {})
[docs] def json_file_db_get_visible_data(self) -> dict[str, Any]: """ Retrieve visible data configuration from JSON file database. Returns: dict: Dictionary containing field visibility configuration. Returns empty dict if not found. """ return self.data.get("map", {}).get("visible_data", {})
[docs] def json_db_get_visible_data(self) -> dict[str, Any]: """ Retrieve visible data configuration from in-memory JSON database. Returns: dict: Dictionary containing field visibility configuration. Returns empty dict if not found. """ return self.data.get("visible_data", {})
[docs] def mongodb_db_get_visible_data(self) -> dict[str, Any]: """ Retrieve visible data configuration from MongoDB. Returns: dict: Dictionary containing field visibility configuration. Returns empty dict if config document not found or field missing. Raises: pymongo.errors.ConnectionFailure: If database connection fails. pymongo.errors.OperationFailure: If database operation fails. """ config_doc = self.db.config.find_one({"_id": "map_config"}) if config_doc: return config_doc.get("visible_data", {}) return {}
[docs] def get_visible_data(db): """ Get the appropriate get_visible_data function for the given database backend. Args: db: Database instance (must have module_name attribute). Returns: callable: Backend-specific get_visible_data function. """ return globals()[f"{db.module_name}_get_visible_data"]
# ------------------------------------------------ # get_meta_data
[docs] def google_json_db_get_meta_data(self) -> dict[str, Any]: """ Retrieve metadata configuration from Google Cloud Storage JSON blob. Returns: dict: Dictionary containing metadata configuration. Returns empty dict if not found. """ return self.data.get("map", {}).get("meta_data", {})
[docs] def json_file_db_get_meta_data(self) -> dict[str, Any]: """ Retrieve metadata configuration from JSON file database. Returns: dict: Dictionary containing metadata configuration. Returns empty dict if not found. """ return self.data.get("map", {}).get("meta_data", {})
[docs] def json_db_get_meta_data(self) -> dict[str, Any]: """ Retrieve metadata configuration from in-memory JSON database. Returns: dict: Dictionary containing metadata configuration. Returns empty dict if not found. """ return self.data.get("meta_data", {})
[docs] def mongodb_db_get_meta_data(self) -> dict[str, Any]: """ Retrieve metadata configuration from MongoDB. Returns: dict: Dictionary containing metadata configuration. Returns empty dict if config document not found or field missing. Raises: pymongo.errors.ConnectionFailure: If database connection fails. pymongo.errors.OperationFailure: If database operation fails. """ config_doc = self.db.config.find_one({"_id": "map_config"}) if config_doc: return config_doc.get("meta_data", {}) return {}
[docs] def get_meta_data(db): """ Get the appropriate get_meta_data function for the given database backend. Args: db: Database instance (must have module_name attribute). Returns: callable: Backend-specific get_meta_data function. """ return globals()[f"{db.module_name}_get_meta_data"]
# ------------------------------------------------ # get_categories
[docs] def json_db_get_categories(self): """Return category keys from in-memory JSON database.""" return self.data["categories"].keys()
[docs] def json_file_db_get_categories(self): """Return category keys from JSON file database.""" with open(self.data_file_path, "r") as file: return json.load(file)["map"]["categories"].keys()
[docs] def google_json_db_get_categories(self): """Return category keys from Google Cloud Storage JSON.""" return self.data.get("map", {}).get("categories", {}).keys()
[docs] def mongodb_db_get_categories(self): """Return category keys from MongoDB.""" config_doc = self.db.config.find_one({"_id": "map_config"}) if config_doc and "categories" in config_doc: return list(config_doc["categories"].keys()) return []
[docs] def get_categories(db): """Dispatch to the backend-specific get_categories function.""" return globals()[f"{db.module_name}_get_categories"]
# ------------------------------------------------ # get_category_data
[docs] def json_db_get_category_data(self, category_type=None): """Return category data from in-memory JSON database, optionally filtered by type.""" if category_type: return { "categories": {category_type: self.data["categories"].get(category_type, [])}, "categories_help": self.data.get("categories_help", []), "categories_options_help": { category_type: self.data.get("categories_options_help", {}).get(category_type, []) }, } return { "categories": self.data["categories"], "categories_help": self.data.get("categories_help", []), "categories_options_help": self.data.get("categories_options_help", {}), }
[docs] def json_file_db_get_category_data(self, category_type=None): """Return category data from JSON file database, optionally filtered by type.""" with open(self.data_file_path, "r") as file: data = json.load(file)["map"] if category_type: return { "categories": {category_type: data["categories"].get(category_type, [])}, "categories_help": data.get("categories_help", []), "categories_options_help": { category_type: data.get("categories_options_help", {}).get(category_type, []) }, } return { "categories": data["categories"], "categories_help": data.get("categories_help", []), "categories_options_help": data.get("categories_options_help", {}), }
[docs] def google_json_db_get_category_data(self, category_type=None): """Return category data from Google Cloud Storage JSON, optionally filtered by type.""" data = self.data.get("map", {}) if category_type: return { "categories": {category_type: data.get("categories", {}).get(category_type, [])}, "categories_help": data.get("categories_help", []), "categories_options_help": { category_type: data.get("categories_options_help", {}).get(category_type, []) }, } return { "categories": data.get("categories", {}), "categories_help": data.get("categories_help", []), "categories_options_help": data.get("categories_options_help", {}), }
[docs] def mongodb_db_get_category_data(self, category_type=None): """Return category data from MongoDB, optionally filtered by type.""" config_doc = self.db.config.find_one({"_id": "map_config"}) if config_doc: if category_type: return { "categories": { category_type: config_doc.get("categories", {}).get(category_type, []) }, "categories_help": config_doc.get("categories_help", []), "categories_options_help": { category_type: config_doc.get("categories_options_help", {}).get( category_type, [] ) }, } return { "categories": config_doc.get("categories", {}), "categories_help": config_doc.get("categories_help", []), "categories_options_help": config_doc.get("categories_options_help", {}), } return {"categories": {}, "categories_help": [], "categories_options_help": {}}
[docs] def get_category_data(db): """Dispatch to the backend-specific get_category_data function.""" return globals()[f"{db.module_name}_get_category_data"]
# ------------------------------------------------ # get_location
[docs] def get_location_from_raw_data(raw_data, uuid, location_model): """Find and validate a single location by UUID from raw data. Args: raw_data: Dict containing a 'data' key with a list of location dicts. uuid: UUID string of the location to find. location_model: Pydantic model class to validate the location. Returns: Validated location model instance, or None if not found. """ point = next((point for point in raw_data["data"] if point["uuid"] == uuid), None) return location_model.model_validate(point) if point else None
[docs] def google_json_db_get_location(self, uuid, location_model): """Retrieve a single location by UUID from Google Cloud Storage JSON.""" return get_location_from_raw_data(self.data.get("map", {}), uuid, location_model)
[docs] def json_file_db_get_location(self, uuid, location_model): """Retrieve a single location by UUID from JSON file database.""" with open(self.data_file_path, "r") as file: point = get_location_from_raw_data(json.load(file)["map"], uuid, location_model) return point
[docs] def json_db_get_location(self, uuid, location_model): """Retrieve a single location by UUID from in-memory JSON database.""" return get_location_from_raw_data(self.data, uuid, location_model)
[docs] def mongodb_db_get_location(self, uuid, location_model): """Retrieve a single location by UUID from MongoDB.""" location_doc = self.db.locations.find_one({"uuid": uuid}, {"_id": 0}) return location_model.model_validate(location_doc) if location_doc else None
[docs] def get_location(db, location_model): """Dispatch to the backend-specific get_location function.""" return partial(globals()[f"{db.module_name}_get_location"], location_model=location_model)
# ------------------------------------------------ # get_locations
[docs] def get_locations_list_from_raw_data(map_data, query, location_model): """Filter and validate locations from raw map data based on query parameters. Args: map_data: Dict containing 'data' and 'categories' keys. query: Dict of query parameters for filtering. location_model: Pydantic model class to validate each location. Returns: List of validated location model instances. """ filtered_locations = get_queried_data(map_data["data"], map_data["categories"], query) return [location_model.model_validate(point) for point in filtered_locations]
[docs] def google_json_db_get_locations(self, query, location_model): """Retrieve filtered locations from Google Cloud Storage JSON.""" return get_locations_list_from_raw_data(self.data.get("map", {}), query, location_model)
[docs] def json_file_db_get_locations(self, query, location_model): """Retrieve filtered locations from JSON file database.""" with open(self.data_file_path, "r") as file: return get_locations_list_from_raw_data(json.load(file)["map"], query, location_model)
[docs] def json_db_get_locations(self, query, location_model): """Retrieve filtered locations from in-memory JSON database.""" return get_locations_list_from_raw_data(self.data, query, location_model)
[docs] def mongodb_db_get_locations(self, query, location_model): """Retrieve filtered locations from MongoDB.""" mongo_query = {} for key, values in query.items(): if values: mongo_query[key] = {"$in": values} projection = {"_id": 0, "uuid": 1, "position": 1, "remark": 1} data = self.db.locations.find(mongo_query, projection) return (LocationBase.model_validate(loc) for loc in data)
[docs] def get_locations(db, location_model): """Dispatch to the backend-specific get_locations function.""" return partial(globals()[f"{db.module_name}_get_locations"], location_model=location_model)
[docs] def google_json_db_get_locations_paginated(self, query, location_model): """Google JSON locations with improved pagination.""" # Get all locations from raw data data = self.data.get("map", {}) all_locations = list(get_locations_list_from_raw_data(data, query, location_model)) return PaginationHelper.create_paginated_response(all_locations, query)
[docs] def json_db_get_locations_paginated(self, query, location_model): """JSON locations with improved pagination.""" # Get all locations from raw data all_locations = list(get_locations_list_from_raw_data(self.data, query, location_model)) return PaginationHelper.create_paginated_response(all_locations, query)
[docs] def json_file_db_get_locations_paginated(self, query, location_model): """JSON file locations with improved pagination.""" data = FileIOHelper.get_data_from_file(self.data_file_path) # Get all locations from raw data all_locations = list(get_locations_list_from_raw_data(data, query, location_model)) return PaginationHelper.create_paginated_response(all_locations, query)
[docs] def mongodb_db_get_locations_paginated(self, query, location_model): """MongoDB locations with improved pagination.""" page, per_page, sort_by, sort_order = __parse_pagination_params(query) # Build MongoDB query mongo_query = {} for key, values in query.items(): if values: mongo_query[key] = {"$in": values} # Get total count total_count = self.db.locations.count_documents(mongo_query) # Build aggregation pipeline pipeline = [{"$match": mongo_query}] # Add sorting if sort_by: sort_direction = -1 if sort_order == "desc" else 1 pipeline.append({"$sort": {sort_by: sort_direction}}) # Add pagination if per_page: pipeline.extend([{"$skip": (page - 1) * per_page}, {"$limit": per_page}]) # type: ignore # Remove MongoDB _id field pipeline.append({"$project": {"_id": 0}}) # Execute query cursor = self.db.locations.aggregate(pipeline) locations = [location_model.model_validate(loc) for loc in cursor] # Convert items to dict if needed (for location models) if locations and hasattr(locations[0], "model_dump"): serialized_locations = [x.model_dump() for x in locations] else: serialized_locations = locations return __build_pagination_response(serialized_locations, total_count, page, per_page)
[docs] def get_locations_paginated(db, location_model): """Dispatch to the backend-specific get_locations_paginated function.""" return partial( globals()[f"{db.module_name}_get_locations_paginated"], location_model=location_model )
# ------------------------------------------------ # add_location
[docs] def json_file_db_add_location(self, location_data, location_model): """Add a new location to the JSON file database. Raises: LocationAlreadyExistsError: If a location with the same UUID already exists. """ location = location_model.model_validate(location_data) with open(self.data_file_path, "r") as file: json_file = json.load(file) map_data = json_file["map"].get("data", []) idx = next( (i for i, point in enumerate(map_data) if point.get("uuid") == location_data["uuid"]), None ) if idx is not None: raise LocationAlreadyExistsError(location_data["uuid"]) map_data.append(location.model_dump()) json_file["map"]["data"] = map_data json_file_atomic_dump(json_file, self.data_file_path)
[docs] def json_db_add_location(self, location_data, location_model): """Add a new location to the in-memory JSON database. Raises: LocationAlreadyExistsError: If a location with the same UUID already exists. """ location = location_model.model_validate(location_data) idx = next( ( i for i, point in enumerate(self.data.get("data", [])) if point.get("uuid") == location_data["uuid"] ), None, ) if idx is not None: raise LocationAlreadyExistsError(location_data["uuid"]) self.data["data"].append(location.model_dump())
[docs] def mongodb_db_add_location(self, location_data, location_model): """Add a new location to MongoDB. Raises: LocationAlreadyExistsError: If a location with the same UUID already exists. """ location = location_model.model_validate(location_data) existing = self.db.locations.find_one({"uuid": location_data["uuid"]}) if existing: raise LocationAlreadyExistsError(location_data["uuid"]) self.db.locations.insert_one(location.model_dump())
[docs] def add_location(db, location_data, location_model): """Dispatch to the backend-specific add_location function.""" return globals()[f"{db.module_name}_add_location"](db, location_data, location_model)
# ------------------------------------------------ # update_location
[docs] def json_file_db_update_location(self, uuid, location_data, location_model): """Update an existing location in the JSON file database. Raises: LocationNotFoundError: If no location with the given UUID exists. """ location = location_model.model_validate(location_data) with open(self.data_file_path, "r") as file: json_file = json.load(file) map_data = json_file["map"].get("data", []) idx = next((i for i, point in enumerate(map_data) if point.get("uuid") == uuid), None) if idx is None: raise LocationNotFoundError(uuid) map_data[idx] = location.model_dump() json_file["map"]["data"] = map_data json_file_atomic_dump(json_file, self.data_file_path)
[docs] def json_db_update_location(self, uuid, location_data, location_model): """Update an existing location in the in-memory JSON database. Raises: LocationNotFoundError: If no location with the given UUID exists. """ location = location_model.model_validate(location_data) idx = next( (i for i, point in enumerate(self.data.get("data", [])) if point.get("uuid") == uuid), None ) if idx is None: raise LocationNotFoundError(uuid) self.data["data"][idx] = location.model_dump()
[docs] def mongodb_db_update_location(self, uuid, location_data, location_model): """Update an existing location in MongoDB. Raises: LocationNotFoundError: If no location with the given UUID exists. """ location = location_model.model_validate(location_data) result = self.db.locations.update_one({"uuid": uuid}, {"$set": location.model_dump()}) if result.matched_count == 0: raise LocationNotFoundError(uuid)
[docs] def update_location(db, uuid, location_data, location_model): """Dispatch to the backend-specific update_location function.""" return globals()[f"{db.module_name}_update_location"](db, uuid, location_data, location_model)
# ------------------------------------------------ # delete_location
[docs] def json_file_db_delete_location(self, uuid): """Delete a location from the JSON file database. Raises: LocationNotFoundError: If no location with the given UUID exists. """ with open(self.data_file_path, "r") as file: json_file = json.load(file) map_data = json_file["map"].get("data", []) idx = next((i for i, point in enumerate(map_data) if point.get("uuid") == uuid), None) if idx is None: raise LocationNotFoundError(uuid) del map_data[idx] json_file["map"]["data"] = map_data json_file_atomic_dump(json_file, self.data_file_path)
[docs] def json_db_delete_location(self, uuid): """Delete a location from the in-memory JSON database. Raises: LocationNotFoundError: If no location with the given UUID exists. """ idx = next( (i for i, point in enumerate(self.data.get("data", [])) if point.get("uuid") == uuid), None ) if idx is None: raise LocationNotFoundError(uuid) del self.data["data"][idx]
[docs] def mongodb_db_delete_location(self, uuid): """Delete a location from MongoDB. Raises: LocationNotFoundError: If no location with the given UUID exists. """ result = self.db.locations.delete_one({"uuid": uuid}) if result.deleted_count == 0: raise LocationNotFoundError(uuid)
[docs] def delete_location(db, uuid): """Dispatch to the backend-specific delete_location function.""" return globals()[f"{db.module_name}_delete_location"](db, uuid)
# ------------------------------------------------ # add_suggestion
[docs] def json_db_add_suggestion(self, suggestion_data): """Add a suggestion to the in-memory JSON database with 'pending' status.""" CRUDHelper.add_item_to_json_db(self.data, "suggestions", suggestion_data, "pending")
[docs] def json_file_db_add_suggestion(self, suggestion_data): """Add a suggestion to the JSON file database with 'pending' status.""" CRUDHelper.add_item_to_json_file_db( self.data_file_path, "suggestions", suggestion_data, "pending" )
[docs] def mongodb_db_add_suggestion(self, suggestion_data): """Add a suggestion to MongoDB with 'pending' status.""" CRUDHelper.add_item_to_mongodb(self.db.suggestions, suggestion_data, "Suggestion", "pending")
[docs] def google_json_db_add_suggestion(self, suggestion_data): """No-op for Google Cloud Storage JSON (read-only backend).""" # Temporary workaround: just use notifier without storing # Full implementation would require writing back to Google Cloud Storage pass
[docs] def add_suggestion(db, suggestion_data): """Dispatch to the backend-specific add_suggestion function.""" return globals()[f"{db.module_name}_add_suggestion"](db, suggestion_data)
# ------------------------------------------------ # get_suggestions
[docs] def json_db_get_suggestions(self, query_params): """Return suggestions from in-memory JSON database, optionally filtered by status.""" suggestions = self.data.get("suggestions", []) statuses = query_params.get("status") if statuses: suggestions = [s for s in suggestions if s.get("status") in statuses] return suggestions
[docs] def json_db_get_suggestions_paginated(self, query): """JSON suggestions with improved pagination.""" suggestions = self.data.get("suggestions", []) return PaginationHelper.create_paginated_response(suggestions, query)
[docs] def json_file_db_get_suggestions(self, query_params): """Return suggestions from JSON file database, optionally filtered by status.""" with open(self.data_file_path, "r") as file: json_file = json.load(file) suggestions = json_file["map"].get("suggestions", []) statuses = query_params.get("status") if statuses: suggestions = [s for s in suggestions if s.get("status") in statuses] return suggestions
[docs] def json_file_db_get_suggestions_paginated(self, query): """JSON file suggestions with improved pagination.""" with open(self.data_file_path, "r") as file: json_file = json.load(file) suggestions = json_file["map"].get("suggestions", []) return PaginationHelper.create_paginated_response(suggestions, query)
[docs] def mongodb_db_get_suggestions(self, query_params): """Return suggestions from MongoDB, optionally filtered by status.""" query = {} statuses = query_params.get("status") if statuses: query["status"] = {"$in": statuses} return list(self.db.suggestions.find(query, {"_id": 0}))
[docs] def mongodb_db_get_suggestions_paginated(self, query): """MongoDB suggestions with improved pagination.""" page, per_page, sort_by, sort_order = __parse_pagination_params(query) # Build MongoDB query mongo_query = {} statuses = query.get("status") if statuses: mongo_query["status"] = {"$in": statuses} # Get total count total_count = self.db.suggestions.count_documents(mongo_query) # Build aggregation pipeline pipeline = [{"$match": mongo_query}] # Add sorting if sort_by: sort_direction = -1 if sort_order == "desc" else 1 pipeline.append({"$sort": {sort_by: sort_direction}}) # Add pagination if per_page: pipeline.extend([{"$skip": (page - 1) * per_page}, {"$limit": per_page}]) # type: ignore # Remove MongoDB _id field pipeline.append({"$project": {"_id": 0}}) # Execute query cursor = self.db.suggestions.aggregate(pipeline) items = list(cursor) return __build_pagination_response(items, total_count, page, per_page)
[docs] def google_json_db_get_suggestions(self, query_params): """Return empty list for Google Cloud Storage JSON (read-only backend).""" # GoogleJsonDb is read-only, suggestions not stored in blob return []
[docs] def google_json_db_get_suggestions_paginated(self, query): """Google JSON suggestions with pagination (read-only).""" return PaginationHelper.create_paginated_response([], query)
[docs] def get_suggestions(db): """Dispatch to the backend-specific get_suggestions function.""" return globals()[f"{db.module_name}_get_suggestions"]
[docs] def get_suggestions_paginated(db): """Dispatch to the backend-specific get_suggestions_paginated function.""" return globals()[f"{db.module_name}_get_suggestions_paginated"]
# ------------------------------------------------ # get_suggestion
[docs] def json_db_get_suggestion(self, suggestion_id): """Return a single suggestion by UUID from in-memory JSON database.""" return next( (s for s in self.data.get("suggestions", []) if s.get("uuid") == suggestion_id), None )
[docs] def json_file_db_get_suggestion(self, suggestion_id): """Return a single suggestion by UUID from JSON file database.""" with open(self.data_file_path, "r") as file: json_file = json.load(file) return next( (s for s in json_file["map"].get("suggestions", []) if s.get("uuid") == suggestion_id), None )
[docs] def mongodb_db_get_suggestion(self, suggestion_id): """Return a single suggestion by UUID from MongoDB.""" return self.db.suggestions.find_one({"uuid": suggestion_id}, {"_id": 0})
[docs] def google_json_db_get_suggestion(self, suggestion_id): """Return None for Google Cloud Storage JSON (read-only backend).""" # GoogleJsonDb is read-only, suggestions not stored in blob return None
[docs] def get_suggestion(db): """Dispatch to the backend-specific get_suggestion function.""" return globals()[f"{db.module_name}_get_suggestion"]
# ------------------------------------------------ # update_suggestion
[docs] def json_db_update_suggestion(self, suggestion_id, status): """Update a suggestion's status in the in-memory JSON database. Raises: ValueError: If no suggestion with the given UUID exists. """ suggestions = self.data.get("suggestions", []) for s in suggestions: if s.get("uuid") == suggestion_id: s["status"] = status return raise ValueError(f"Suggestion with uuid {suggestion_id} not found")
[docs] def json_file_db_update_suggestion(self, suggestion_id, status): """Update a suggestion's status in the JSON file database. Raises: ValueError: If no suggestion with the given UUID exists. """ with open(self.data_file_path, "r") as file: json_file = json.load(file) suggestions = json_file["map"].get("suggestions", []) for s in suggestions: if s.get("uuid") == suggestion_id: s["status"] = status break else: raise ValueError(f"Suggestion with uuid {suggestion_id} not found") json_file["map"]["suggestions"] = suggestions json_file_atomic_dump(json_file, self.data_file_path)
[docs] def mongodb_db_update_suggestion(self, suggestion_id, status): """Update a suggestion's status in MongoDB. Raises: ValueError: If no suggestion with the given UUID exists. """ result = self.db.suggestions.update_one({"uuid": suggestion_id}, {"$set": {"status": status}}) if result.matched_count == 0: raise ValueError(f"Suggestion with uuid {suggestion_id} not found")
[docs] def google_json_db_update_suggestion(self, suggestion_id, status): """No-op for Google Cloud Storage JSON (read-only backend).""" # GoogleJsonDb is read-only, no-op pass
[docs] def update_suggestion(db, suggestion_id, status): """Dispatch to the backend-specific update_suggestion function.""" return globals()[f"{db.module_name}_update_suggestion"](db, suggestion_id, status)
# ------------------------------------------------ # delete_suggestion
[docs] def json_db_delete_suggestion(self, suggestion_id): """Delete a suggestion from the in-memory JSON database. Raises: ValueError: If no suggestion with the given UUID exists. """ suggestions = self.data.get("suggestions", []) idx = next((i for i, s in enumerate(suggestions) if s.get("uuid") == suggestion_id), None) if idx is None: raise ValueError(f"Suggestion with uuid {suggestion_id} not found") del suggestions[idx]
[docs] def json_file_db_delete_suggestion(self, suggestion_id): """Delete a suggestion from the JSON file database. Raises: ValueError: If no suggestion with the given UUID exists. """ with open(self.data_file_path, "r") as file: json_file = json.load(file) suggestions = json_file["map"].get("suggestions", []) idx = next((i for i, s in enumerate(suggestions) if s.get("uuid") == suggestion_id), None) if idx is None: raise ValueError(f"Suggestion with uuid {suggestion_id} not found") del suggestions[idx] json_file["map"]["suggestions"] = suggestions json_file_atomic_dump(json_file, self.data_file_path)
[docs] def mongodb_db_delete_suggestion(self, suggestion_id): """Delete a suggestion from MongoDB. Raises: ValueError: If no suggestion with the given UUID exists. """ result = self.db.suggestions.delete_one({"uuid": suggestion_id}) if result.deleted_count == 0: raise ValueError(f"Suggestion with uuid {suggestion_id} not found")
[docs] def google_json_db_delete_suggestion(self, suggestion_id): """No-op for Google Cloud Storage JSON (read-only backend).""" # GoogleJsonDb is read-only, no-op pass
[docs] def delete_suggestion(db, suggestion_id): """Dispatch to the backend-specific delete_suggestion function.""" return globals()[f"{db.module_name}_delete_suggestion"](db, suggestion_id)
# ------------------------------------------------ # add_report
[docs] def json_db_add_report(self, report_data): """Add a report to the in-memory JSON database. Raises: ValueError: If a report with the same UUID already exists. """ reports = self.data.setdefault("reports", []) if any(r.get("uuid") == report_data.get("uuid") for r in reports): raise ValueError(f"Report with uuid {report_data['uuid']} already exists") reports.append(report_data)
[docs] def json_file_db_add_report(self, report_data): """Add a report to the JSON file database. Raises: ValueError: If a report with the same UUID already exists. """ with open(self.data_file_path, "r") as file: json_file = json.load(file) reports = json_file["map"].get("reports", []) if any(r.get("uuid") == report_data.get("uuid") for r in reports): raise ValueError(f"Report with uuid {report_data['uuid']} already exists") reports.append(report_data) json_file["map"]["reports"] = reports json_file_atomic_dump(json_file, self.data_file_path)
[docs] def mongodb_db_add_report(self, report_data): """Add a report to MongoDB. Raises: ValueError: If a report with the same UUID already exists. """ existing = self.db.reports.find_one({"uuid": report_data.get("uuid")}) if existing: raise ValueError(f"Report with uuid {report_data['uuid']} already exists") self.db.reports.insert_one(report_data)
[docs] def google_json_db_add_report(self, report_data): """No-op for Google Cloud Storage JSON (read-only backend).""" # Temporary workaround: just use notifier without storing # Full implementation would require writing back to Google Cloud Storage pass
[docs] def add_report(db, report_data): """Dispatch to the backend-specific add_report function.""" return globals()[f"{db.module_name}_add_report"](db, report_data)
# ------------------------------------------------ # get_reports
[docs] def json_db_get_reports(self, query_params): """Return reports from in-memory JSON database, optionally filtered by status and priority.""" reports = self.data.get("reports", []) statuses = query_params.get("status") if statuses: reports = [r for r in reports if r.get("status") in statuses] priorities = query_params.get("priority") if priorities: reports = [r for r in reports if r.get("priority") in priorities] return reports
[docs] def json_db_get_reports_paginated(self, query): """JSON reports with improved pagination.""" reports = self.data.get("reports", []) return PaginationHelper.create_paginated_response(reports, query)
[docs] def json_file_db_get_reports(self, query_params): """Return reports from JSON file database, optionally filtered by status and priority.""" with open(self.data_file_path, "r") as file: json_file = json.load(file) reports = json_file["map"].get("reports", []) statuses = query_params.get("status") if statuses: reports = [r for r in reports if r.get("status") in statuses] priorities = query_params.get("priority") if priorities: reports = [r for r in reports if r.get("priority") in priorities] return reports
[docs] def json_file_db_get_reports_paginated(self, query): """JSON file reports with improved pagination.""" data = FileIOHelper.get_data_from_file(self.data_file_path) reports = data.get("reports", []) return PaginationHelper.create_paginated_response(reports, query)
[docs] def mongodb_db_get_reports(self, query_params): """Return reports from MongoDB, optionally filtered by status and priority.""" query = {} statuses = query_params.get("status") if statuses: query["status"] = {"$in": statuses} priorities = query_params.get("priority") if priorities: query["priority"] = {"$in": priorities} return list(self.db.reports.find(query, {"_id": 0}))
[docs] def mongodb_db_get_reports_paginated(self, query): """MongoDB reports with improved pagination.""" page, per_page, sort_by, sort_order = __parse_pagination_params(query) # Build MongoDB query mongo_query = {} statuses = query.get("status") if statuses: mongo_query["status"] = {"$in": statuses} priorities = query.get("priority") if priorities: mongo_query["priority"] = {"$in": priorities} # Get total count total_count = self.db.reports.count_documents(mongo_query) # Build aggregation pipeline pipeline = [{"$match": mongo_query}] # Add sorting if sort_by: sort_direction = -1 if sort_order == "desc" else 1 pipeline.append({"$sort": {sort_by: sort_direction}}) # Add pagination if per_page: pipeline.extend([{"$skip": (page - 1) * per_page}, {"$limit": per_page}]) # type: ignore # Remove MongoDB _id field pipeline.append({"$project": {"_id": 0}}) # Execute query cursor = self.db.reports.aggregate(pipeline) items = list(cursor) return __build_pagination_response(items, total_count, page, per_page)
[docs] def google_json_db_get_reports(self, query_params): """Return empty list for Google Cloud Storage JSON (read-only backend).""" # GoogleJsonDb is read-only, reports not stored in blob return []
[docs] def google_json_db_get_reports_paginated(self, query): """Google JSON reports with pagination (read-only).""" return PaginationHelper.create_paginated_response([], query)
[docs] def get_reports(db): """Dispatch to the backend-specific get_reports function.""" return globals()[f"{db.module_name}_get_reports"]
[docs] def get_reports_paginated(db): """Dispatch to the backend-specific get_reports_paginated function.""" return globals()[f"{db.module_name}_get_reports_paginated"]
# ------------------------------------------------ # get_report
[docs] def json_db_get_report(self, report_id): """Return a single report by UUID from in-memory JSON database.""" return next((r for r in self.data.get("reports", []) if r.get("uuid") == report_id), None)
[docs] def json_file_db_get_report(self, report_id): """Return a single report by UUID from JSON file database.""" with open(self.data_file_path, "r") as file: json_file = json.load(file) return next( (r for r in json_file["map"].get("reports", []) if r.get("uuid") == report_id), None )
[docs] def mongodb_db_get_report(self, report_id): """Return a single report by UUID from MongoDB.""" return self.db.reports.find_one({"uuid": report_id}, {"_id": 0})
[docs] def google_json_db_get_report(self, report_id): """Return None for Google Cloud Storage JSON (read-only backend).""" # GoogleJsonDb is read-only, reports not stored in blob return None
[docs] def get_report(db): """Dispatch to the backend-specific get_report function.""" return globals()[f"{db.module_name}_get_report"]
# ------------------------------------------------ # update_report
[docs] def json_db_update_report(self, report_id, status=None, priority=None): """Update a report's status and/or priority in the in-memory JSON database. Raises: ReportNotFoundError: If no report with the given UUID exists. """ reports = self.data.get("reports", []) for r in reports: if r.get("uuid") == report_id: if status: r["status"] = status if priority: r["priority"] = priority return raise ReportNotFoundError(report_id)
[docs] def json_file_db_update_report(self, report_id, status=None, priority=None): """Update a report's status and/or priority in the JSON file database. Raises: ReportNotFoundError: If no report with the given UUID exists. """ with open(self.data_file_path, "r") as file: json_file = json.load(file) reports = json_file["map"].get("reports", []) for r in reports: if r.get("uuid") == report_id: if status: r["status"] = status if priority: r["priority"] = priority break else: raise ReportNotFoundError(report_id) json_file["map"]["reports"] = reports json_file_atomic_dump(json_file, self.data_file_path)
[docs] def mongodb_db_update_report(self, report_id, status=None, priority=None): """Update a report's status and/or priority in MongoDB. Raises: ReportNotFoundError: If no report with the given UUID exists. """ update_doc = {} if status: update_doc["status"] = status if priority: update_doc["priority"] = priority if update_doc: result = self.db.reports.update_one({"uuid": report_id}, {"$set": update_doc}) if result.matched_count == 0: raise ReportNotFoundError(report_id)
[docs] def google_json_db_update_report(self, report_id, status=None, priority=None): """No-op for Google Cloud Storage JSON (read-only backend).""" # GoogleJsonDb is read-only, no-op pass
[docs] def update_report(db, report_id, status=None, priority=None): """Dispatch to the backend-specific update_report function.""" return globals()[f"{db.module_name}_update_report"](db, report_id, status, priority)
# ------------------------------------------------ # delete_report
[docs] def json_db_delete_report(self, report_id): """Delete a report from the in-memory JSON database. Raises: ReportNotFoundError: If no report with the given UUID exists. """ reports = self.data.get("reports", []) idx = next((i for i, r in enumerate(reports) if r.get("uuid") == report_id), None) if idx is None: raise ReportNotFoundError(report_id) del reports[idx]
[docs] def json_file_db_delete_report(self, report_id): """Delete a report from the JSON file database. Raises: ReportNotFoundError: If no report with the given UUID exists. """ with open(self.data_file_path, "r") as file: json_file = json.load(file) reports = json_file["map"].get("reports", []) idx = next((i for i, r in enumerate(reports) if r.get("uuid") == report_id), None) if idx is None: raise ReportNotFoundError(report_id) del reports[idx] json_file["map"]["reports"] = reports json_file_atomic_dump(json_file, self.data_file_path)
[docs] def mongodb_db_delete_report(self, report_id): """Delete a report from MongoDB. Raises: ReportNotFoundError: If no report with the given UUID exists. """ result = self.db.reports.delete_one({"uuid": report_id}) if result.deleted_count == 0: raise ReportNotFoundError(report_id)
[docs] def google_json_db_delete_report(self, report_id): """No-op for Google Cloud Storage JSON (read-only backend).""" # GoogleJsonDb is read-only, no-op pass
[docs] def delete_report(db, report_id): """Dispatch to the backend-specific delete_report function.""" return globals()[f"{db.module_name}_delete_report"](db, report_id)
# TODO extension function should be replaced with simple extend which would take a db plugin # it could look like that: # `db.extend(goodmap_db_plugin)` in plugin all those functions would be organized
[docs] def extend_db_with_goodmap_queries(db, location_model): """Register all goodmap-specific query functions on the database instance. Binds backend-specific implementations of all CRUD operations for locations, suggestions, and reports to the given database object. Args: db: Database instance to extend with query functions. location_model: Pydantic model class used for location validation. Returns: The extended database instance. """ db.extend("get_issue_options", get_issue_options(db)) db.extend("get_data", get_data(db)) db.extend("get_visible_data", get_visible_data(db)) db.extend("get_meta_data", get_meta_data(db)) db.extend("get_locations", get_locations(db, location_model)) db.extend("get_locations_paginated", get_locations_paginated(db, location_model)) db.extend("get_location", get_location(db, location_model)) db.extend("add_location", partial(add_location, location_model=location_model)) db.extend("update_location", partial(update_location, location_model=location_model)) db.extend("delete_location", delete_location) db.extend("get_categories", get_categories(db)) db.extend("get_category_data", get_category_data(db)) db.extend("add_suggestion", add_suggestion) db.extend("get_suggestions", get_suggestions(db)) db.extend("get_suggestions_paginated", get_suggestions_paginated(db)) db.extend("get_suggestion", get_suggestion(db)) db.extend("update_suggestion", update_suggestion) db.extend("delete_suggestion", delete_suggestion) db.extend("add_report", add_report) db.extend("get_reports", get_reports(db)) db.extend("get_reports_paginated", get_reports_paginated(db)) db.extend("get_report", get_report(db)) db.extend("update_report", update_report) db.extend("delete_report", delete_report) return db