import json
import logging
import os
import tempfile
from functools import partial
from typing import Any
from goodmap.core import get_queried_data
from goodmap.data_models.location import LocationBase
from goodmap.exceptions import (
AlreadyExistsError,
LocationAlreadyExistsError,
LocationNotFoundError,
ReportNotFoundError,
)
logger = logging.getLogger(__name__)
# TODO file is temporary solution to be compatible with old, static code,
# it should be replaced with dynamic solution
def __parse_pagination_params(query):
"""Extract and validate pagination parameters from query."""
try:
page = max(1, int(query.get("page", ["1"])[0]))
except (ValueError, IndexError, TypeError):
page = 1
per_page_raw = query.get("per_page", ["20"])[0] if query.get("per_page") else "20"
if per_page_raw == "all":
per_page = None
else:
try:
per_page = max(1, min(int(per_page_raw), 1000)) # Cap at 1000
except (ValueError, TypeError):
per_page = 20
sort_by = query.get("sort_by", [None])[0]
sort_order = query.get("sort_order", ["asc"])[0] if query.get("sort_order") else "asc"
return page, per_page, sort_by, sort_order.lower()
def __build_pagination_response(items, total, page, per_page):
"""Build standardized pagination response."""
if per_page:
total_pages = (total + per_page - 1) // per_page
else:
total_pages = 1
per_page = total
return {
"items": items,
"pagination": {
"total": total,
"page": page,
"per_page": per_page,
"total_pages": total_pages,
},
}
[docs]
def json_file_atomic_dump(data, file_path):
"""Write JSON data to a file atomically using a temporary file and rename.
Args:
data: Data to serialize as JSON.
file_path: Destination file path.
"""
dir_name = os.path.dirname(file_path)
with tempfile.NamedTemporaryFile("w", dir=dir_name, delete=False) as temp_file:
json.dump(data, temp_file)
temp_file.flush()
os.fsync(temp_file.fileno())
os.replace(temp_file.name, file_path)
[docs]
class FileIOHelper:
"""Common file I/O utilities to eliminate duplication."""
[docs]
@staticmethod
def read_json_file(file_path):
"""Read and parse JSON file."""
with open(file_path, "r") as file:
return json.load(file)
[docs]
@staticmethod
def write_json_file_atomic(data, file_path):
"""Write JSON data to file atomically."""
json_file_atomic_dump(data, file_path)
[docs]
@staticmethod
def get_data_from_file(file_path, data_key="map"):
"""Get data from JSON file with specified key structure."""
json_data = FileIOHelper.read_json_file(file_path)
return json_data.get(data_key, {})
[docs]
class CRUDHelper:
"""Common CRUD operation utilities to eliminate duplication."""
[docs]
@staticmethod
def add_item_to_json_db(db_data, collection_name, item_data, default_status=None):
"""Add item to JSON in-memory database."""
collection = db_data.setdefault(collection_name, [])
uuid = item_data.get("uuid")
resource_type = collection_name.rstrip("s").capitalize()
# Check if item already exists
existing = next(
(
item
for item in collection
if (item.get("uuid") if isinstance(item, dict) else getattr(item, "uuid", None))
== uuid
),
None,
)
if existing:
raise AlreadyExistsError(uuid, resource_type)
record = dict(item_data)
if default_status:
record["status"] = default_status
collection.append(record)
[docs]
@staticmethod
def add_item_to_json_file_db(file_path, collection_name, item_data, default_status=None):
"""Add item to JSON file database."""
json_file = FileIOHelper.read_json_file(file_path)
collection = json_file["map"].get(collection_name, [])
uuid = item_data.get("uuid")
resource_type = collection_name.rstrip("s").capitalize()
# Check if item already exists
existing = next(
(
item
for item in collection
if (item.get("uuid") if isinstance(item, dict) else getattr(item, "uuid", None))
== uuid
),
None,
)
if existing:
raise AlreadyExistsError(uuid, resource_type)
record = dict(item_data)
if default_status:
record["status"] = default_status
collection.append(record)
json_file["map"][collection_name] = collection
FileIOHelper.write_json_file_atomic(json_file, file_path)
[docs]
@staticmethod
def add_item_to_mongodb(db_collection, item_data, item_type, default_status=None):
"""Add item to MongoDB database."""
existing = db_collection.find_one({"uuid": item_data.get("uuid")})
if existing:
raise AlreadyExistsError(item_data.get("uuid"), item_type)
record = dict(item_data)
if default_status:
record["status"] = default_status
db_collection.insert_one(record)
# ------------------------------------------------
# get_location_obligatory_fields
[docs]
def json_db_get_location_obligatory_fields(db):
"""Return location obligatory fields from in-memory JSON database."""
return db.data["location_obligatory_fields"]
[docs]
def json_file_db_get_location_obligatory_fields(db):
"""Return location obligatory fields from JSON file database."""
with open(db.data_file_path, "r") as file:
return json.load(file)["map"]["location_obligatory_fields"]
[docs]
def google_json_db_get_location_obligatory_fields(db):
"""Return location obligatory fields from Google Cloud Storage JSON."""
return db.data.get("map", {}).get("location_obligatory_fields", [])
[docs]
def mongodb_db_get_location_obligatory_fields(db):
"""Return location obligatory fields from MongoDB."""
config_doc = db.db.config.find_one({"_id": "map_config"})
if config_doc and "location_obligatory_fields" in config_doc:
return config_doc["location_obligatory_fields"]
return []
[docs]
def get_location_obligatory_fields(db):
"""Dispatch to the backend-specific get_location_obligatory_fields function."""
return globals()[f"{db.module_name}_get_location_obligatory_fields"](db)
# ------------------------------------------------
# get_issue_options
[docs]
def json_db_get_issue_options(self):
"""Return reported issue types from in-memory JSON database."""
return self.data.get("reported_issue_types", [])
[docs]
def json_file_db_get_issue_options(self):
"""Return reported issue types from JSON file database."""
with open(self.data_file_path, "r") as file:
return json.load(file)["map"].get("reported_issue_types", [])
[docs]
def google_json_db_get_issue_options(self):
"""Return reported issue types from Google Cloud Storage JSON."""
return self.data.get("map", {}).get("reported_issue_types", [])
[docs]
def mongodb_db_get_issue_options(self):
"""Return reported issue types from MongoDB."""
config_doc = self.db.config.find_one({"_id": "map_config"})
if config_doc and "reported_issue_types" in config_doc:
return config_doc["reported_issue_types"]
return []
[docs]
def get_issue_options(db):
"""Dispatch to the backend-specific get_issue_options function."""
return globals()[f"{db.module_name}_get_issue_options"]
# ------------------------------------------------
# get_data
[docs]
def google_json_db_get_data(self):
"""Return map data from Google Cloud Storage JSON."""
return self.data.get("map", {})
[docs]
def json_file_db_get_data(self):
"""Return map data from JSON file database."""
with open(self.data_file_path, "r") as file:
return json.load(file)["map"]
[docs]
def json_db_get_data(self):
"""Return map data from in-memory JSON database."""
return self.data
[docs]
def mongodb_db_get_data(self):
"""Return map data from MongoDB, including locations and config."""
config_doc = self.db.config.find_one({"_id": "map_config"})
if config_doc:
return {
"data": list(self.db.locations.find({}, {"_id": 0})),
"categories": config_doc.get("categories", {}),
"location_obligatory_fields": config_doc.get("location_obligatory_fields", []),
# Backward-compat keys expected by core_api today
"visible_data": config_doc.get("visible_data", {}),
"meta_data": config_doc.get("meta_data", {}),
}
return {
"data": [],
"categories": {},
"location_obligatory_fields": [],
"visible_data": {},
"meta_data": {},
}
[docs]
def get_data(db):
"""Dispatch to the backend-specific get_data function."""
return globals()[f"{db.module_name}_get_data"]
# ------------------------------------------------
# get_visible_data
[docs]
def google_json_db_get_visible_data(self) -> dict[str, Any]:
"""
Retrieve visible data configuration from Google Cloud Storage JSON blob.
Returns:
dict: Dictionary containing field visibility configuration.
Returns empty dict if not found.
"""
return self.data.get("map", {}).get("visible_data", {})
[docs]
def json_file_db_get_visible_data(self) -> dict[str, Any]:
"""
Retrieve visible data configuration from JSON file database.
Returns:
dict: Dictionary containing field visibility configuration.
Returns empty dict if not found.
"""
return self.data.get("map", {}).get("visible_data", {})
[docs]
def json_db_get_visible_data(self) -> dict[str, Any]:
"""
Retrieve visible data configuration from in-memory JSON database.
Returns:
dict: Dictionary containing field visibility configuration.
Returns empty dict if not found.
"""
return self.data.get("visible_data", {})
[docs]
def mongodb_db_get_visible_data(self) -> dict[str, Any]:
"""
Retrieve visible data configuration from MongoDB.
Returns:
dict: Dictionary containing field visibility configuration.
Returns empty dict if config document not found or field missing.
Raises:
pymongo.errors.ConnectionFailure: If database connection fails.
pymongo.errors.OperationFailure: If database operation fails.
"""
config_doc = self.db.config.find_one({"_id": "map_config"})
if config_doc:
return config_doc.get("visible_data", {})
return {}
[docs]
def get_visible_data(db):
"""
Get the appropriate get_visible_data function for the given database backend.
Args:
db: Database instance (must have module_name attribute).
Returns:
callable: Backend-specific get_visible_data function.
"""
return globals()[f"{db.module_name}_get_visible_data"]
# ------------------------------------------------
# get_meta_data
# ------------------------------------------------
# get_categories
[docs]
def json_db_get_categories(self):
"""Return category keys from in-memory JSON database."""
return self.data["categories"].keys()
[docs]
def json_file_db_get_categories(self):
"""Return category keys from JSON file database."""
with open(self.data_file_path, "r") as file:
return json.load(file)["map"]["categories"].keys()
[docs]
def google_json_db_get_categories(self):
"""Return category keys from Google Cloud Storage JSON."""
return self.data.get("map", {}).get("categories", {}).keys()
[docs]
def mongodb_db_get_categories(self):
"""Return category keys from MongoDB."""
config_doc = self.db.config.find_one({"_id": "map_config"})
if config_doc and "categories" in config_doc:
return list(config_doc["categories"].keys())
return []
[docs]
def get_categories(db):
"""Dispatch to the backend-specific get_categories function."""
return globals()[f"{db.module_name}_get_categories"]
# ------------------------------------------------
# get_category_data
[docs]
def json_db_get_category_data(self, category_type=None):
"""Return category data from in-memory JSON database, optionally filtered by type."""
if category_type:
return {
"categories": {category_type: self.data["categories"].get(category_type, [])},
"categories_help": self.data.get("categories_help", []),
"categories_options_help": {
category_type: self.data.get("categories_options_help", {}).get(category_type, [])
},
}
return {
"categories": self.data["categories"],
"categories_help": self.data.get("categories_help", []),
"categories_options_help": self.data.get("categories_options_help", {}),
}
[docs]
def json_file_db_get_category_data(self, category_type=None):
"""Return category data from JSON file database, optionally filtered by type."""
with open(self.data_file_path, "r") as file:
data = json.load(file)["map"]
if category_type:
return {
"categories": {category_type: data["categories"].get(category_type, [])},
"categories_help": data.get("categories_help", []),
"categories_options_help": {
category_type: data.get("categories_options_help", {}).get(category_type, [])
},
}
return {
"categories": data["categories"],
"categories_help": data.get("categories_help", []),
"categories_options_help": data.get("categories_options_help", {}),
}
[docs]
def google_json_db_get_category_data(self, category_type=None):
"""Return category data from Google Cloud Storage JSON, optionally filtered by type."""
data = self.data.get("map", {})
if category_type:
return {
"categories": {category_type: data.get("categories", {}).get(category_type, [])},
"categories_help": data.get("categories_help", []),
"categories_options_help": {
category_type: data.get("categories_options_help", {}).get(category_type, [])
},
}
return {
"categories": data.get("categories", {}),
"categories_help": data.get("categories_help", []),
"categories_options_help": data.get("categories_options_help", {}),
}
[docs]
def mongodb_db_get_category_data(self, category_type=None):
"""Return category data from MongoDB, optionally filtered by type."""
config_doc = self.db.config.find_one({"_id": "map_config"})
if config_doc:
if category_type:
return {
"categories": {
category_type: config_doc.get("categories", {}).get(category_type, [])
},
"categories_help": config_doc.get("categories_help", []),
"categories_options_help": {
category_type: config_doc.get("categories_options_help", {}).get(
category_type, []
)
},
}
return {
"categories": config_doc.get("categories", {}),
"categories_help": config_doc.get("categories_help", []),
"categories_options_help": config_doc.get("categories_options_help", {}),
}
return {"categories": {}, "categories_help": [], "categories_options_help": {}}
[docs]
def get_category_data(db):
"""Dispatch to the backend-specific get_category_data function."""
return globals()[f"{db.module_name}_get_category_data"]
# ------------------------------------------------
# get_location
[docs]
def get_location_from_raw_data(raw_data, uuid, location_model):
"""Find and validate a single location by UUID from raw data.
Args:
raw_data: Dict containing a 'data' key with a list of location dicts.
uuid: UUID string of the location to find.
location_model: Pydantic model class to validate the location.
Returns:
Validated location model instance, or None if not found.
"""
point = next((point for point in raw_data["data"] if point["uuid"] == uuid), None)
return location_model.model_validate(point) if point else None
[docs]
def google_json_db_get_location(self, uuid, location_model):
"""Retrieve a single location by UUID from Google Cloud Storage JSON."""
return get_location_from_raw_data(self.data.get("map", {}), uuid, location_model)
[docs]
def json_file_db_get_location(self, uuid, location_model):
"""Retrieve a single location by UUID from JSON file database."""
with open(self.data_file_path, "r") as file:
point = get_location_from_raw_data(json.load(file)["map"], uuid, location_model)
return point
[docs]
def json_db_get_location(self, uuid, location_model):
"""Retrieve a single location by UUID from in-memory JSON database."""
return get_location_from_raw_data(self.data, uuid, location_model)
[docs]
def mongodb_db_get_location(self, uuid, location_model):
"""Retrieve a single location by UUID from MongoDB."""
location_doc = self.db.locations.find_one({"uuid": uuid}, {"_id": 0})
return location_model.model_validate(location_doc) if location_doc else None
[docs]
def get_location(db, location_model):
"""Dispatch to the backend-specific get_location function."""
return partial(globals()[f"{db.module_name}_get_location"], location_model=location_model)
# ------------------------------------------------
# get_locations
[docs]
def get_locations_list_from_raw_data(map_data, query, location_model):
"""Filter and validate locations from raw map data based on query parameters.
Args:
map_data: Dict containing 'data' and 'categories' keys.
query: Dict of query parameters for filtering.
location_model: Pydantic model class to validate each location.
Returns:
List of validated location model instances.
"""
filtered_locations = get_queried_data(map_data["data"], map_data["categories"], query)
return [location_model.model_validate(point) for point in filtered_locations]
[docs]
def google_json_db_get_locations(self, query, location_model):
"""Retrieve filtered locations from Google Cloud Storage JSON."""
return get_locations_list_from_raw_data(self.data.get("map", {}), query, location_model)
[docs]
def json_file_db_get_locations(self, query, location_model):
"""Retrieve filtered locations from JSON file database."""
with open(self.data_file_path, "r") as file:
return get_locations_list_from_raw_data(json.load(file)["map"], query, location_model)
[docs]
def json_db_get_locations(self, query, location_model):
"""Retrieve filtered locations from in-memory JSON database."""
return get_locations_list_from_raw_data(self.data, query, location_model)
[docs]
def mongodb_db_get_locations(self, query, location_model):
"""Retrieve filtered locations from MongoDB."""
mongo_query = {}
for key, values in query.items():
if values:
mongo_query[key] = {"$in": values}
projection = {"_id": 0, "uuid": 1, "position": 1, "remark": 1}
data = self.db.locations.find(mongo_query, projection)
return (LocationBase.model_validate(loc) for loc in data)
[docs]
def get_locations(db, location_model):
"""Dispatch to the backend-specific get_locations function."""
return partial(globals()[f"{db.module_name}_get_locations"], location_model=location_model)
[docs]
def google_json_db_get_locations_paginated(self, query, location_model):
"""Google JSON locations with improved pagination."""
# Get all locations from raw data
data = self.data.get("map", {})
all_locations = list(get_locations_list_from_raw_data(data, query, location_model))
return PaginationHelper.create_paginated_response(all_locations, query)
[docs]
def json_db_get_locations_paginated(self, query, location_model):
"""JSON locations with improved pagination."""
# Get all locations from raw data
all_locations = list(get_locations_list_from_raw_data(self.data, query, location_model))
return PaginationHelper.create_paginated_response(all_locations, query)
[docs]
def json_file_db_get_locations_paginated(self, query, location_model):
"""JSON file locations with improved pagination."""
data = FileIOHelper.get_data_from_file(self.data_file_path)
# Get all locations from raw data
all_locations = list(get_locations_list_from_raw_data(data, query, location_model))
return PaginationHelper.create_paginated_response(all_locations, query)
[docs]
def mongodb_db_get_locations_paginated(self, query, location_model):
"""MongoDB locations with improved pagination."""
page, per_page, sort_by, sort_order = __parse_pagination_params(query)
# Build MongoDB query
mongo_query = {}
for key, values in query.items():
if values:
mongo_query[key] = {"$in": values}
# Get total count
total_count = self.db.locations.count_documents(mongo_query)
# Build aggregation pipeline
pipeline = [{"$match": mongo_query}]
# Add sorting
if sort_by:
sort_direction = -1 if sort_order == "desc" else 1
pipeline.append({"$sort": {sort_by: sort_direction}})
# Add pagination
if per_page:
pipeline.extend([{"$skip": (page - 1) * per_page}, {"$limit": per_page}]) # type: ignore
# Remove MongoDB _id field
pipeline.append({"$project": {"_id": 0}})
# Execute query
cursor = self.db.locations.aggregate(pipeline)
locations = [location_model.model_validate(loc) for loc in cursor]
# Convert items to dict if needed (for location models)
if locations and hasattr(locations[0], "model_dump"):
serialized_locations = [x.model_dump() for x in locations]
else:
serialized_locations = locations
return __build_pagination_response(serialized_locations, total_count, page, per_page)
[docs]
def get_locations_paginated(db, location_model):
"""Dispatch to the backend-specific get_locations_paginated function."""
return partial(
globals()[f"{db.module_name}_get_locations_paginated"], location_model=location_model
)
# ------------------------------------------------
# add_location
[docs]
def json_file_db_add_location(self, location_data, location_model):
"""Add a new location to the JSON file database.
Raises:
LocationAlreadyExistsError: If a location with the same UUID already exists.
"""
location = location_model.model_validate(location_data)
with open(self.data_file_path, "r") as file:
json_file = json.load(file)
map_data = json_file["map"].get("data", [])
idx = next(
(i for i, point in enumerate(map_data) if point.get("uuid") == location_data["uuid"]), None
)
if idx is not None:
raise LocationAlreadyExistsError(location_data["uuid"])
map_data.append(location.model_dump())
json_file["map"]["data"] = map_data
json_file_atomic_dump(json_file, self.data_file_path)
[docs]
def json_db_add_location(self, location_data, location_model):
"""Add a new location to the in-memory JSON database.
Raises:
LocationAlreadyExistsError: If a location with the same UUID already exists.
"""
location = location_model.model_validate(location_data)
idx = next(
(
i
for i, point in enumerate(self.data.get("data", []))
if point.get("uuid") == location_data["uuid"]
),
None,
)
if idx is not None:
raise LocationAlreadyExistsError(location_data["uuid"])
self.data["data"].append(location.model_dump())
[docs]
def mongodb_db_add_location(self, location_data, location_model):
"""Add a new location to MongoDB.
Raises:
LocationAlreadyExistsError: If a location with the same UUID already exists.
"""
location = location_model.model_validate(location_data)
existing = self.db.locations.find_one({"uuid": location_data["uuid"]})
if existing:
raise LocationAlreadyExistsError(location_data["uuid"])
self.db.locations.insert_one(location.model_dump())
[docs]
def add_location(db, location_data, location_model):
"""Dispatch to the backend-specific add_location function."""
return globals()[f"{db.module_name}_add_location"](db, location_data, location_model)
# ------------------------------------------------
# update_location
[docs]
def json_file_db_update_location(self, uuid, location_data, location_model):
"""Update an existing location in the JSON file database.
Raises:
LocationNotFoundError: If no location with the given UUID exists.
"""
location = location_model.model_validate(location_data)
with open(self.data_file_path, "r") as file:
json_file = json.load(file)
map_data = json_file["map"].get("data", [])
idx = next((i for i, point in enumerate(map_data) if point.get("uuid") == uuid), None)
if idx is None:
raise LocationNotFoundError(uuid)
map_data[idx] = location.model_dump()
json_file["map"]["data"] = map_data
json_file_atomic_dump(json_file, self.data_file_path)
[docs]
def json_db_update_location(self, uuid, location_data, location_model):
"""Update an existing location in the in-memory JSON database.
Raises:
LocationNotFoundError: If no location with the given UUID exists.
"""
location = location_model.model_validate(location_data)
idx = next(
(i for i, point in enumerate(self.data.get("data", [])) if point.get("uuid") == uuid), None
)
if idx is None:
raise LocationNotFoundError(uuid)
self.data["data"][idx] = location.model_dump()
[docs]
def mongodb_db_update_location(self, uuid, location_data, location_model):
"""Update an existing location in MongoDB.
Raises:
LocationNotFoundError: If no location with the given UUID exists.
"""
location = location_model.model_validate(location_data)
result = self.db.locations.update_one({"uuid": uuid}, {"$set": location.model_dump()})
if result.matched_count == 0:
raise LocationNotFoundError(uuid)
[docs]
def update_location(db, uuid, location_data, location_model):
"""Dispatch to the backend-specific update_location function."""
return globals()[f"{db.module_name}_update_location"](db, uuid, location_data, location_model)
# ------------------------------------------------
# delete_location
[docs]
def json_file_db_delete_location(self, uuid):
"""Delete a location from the JSON file database.
Raises:
LocationNotFoundError: If no location with the given UUID exists.
"""
with open(self.data_file_path, "r") as file:
json_file = json.load(file)
map_data = json_file["map"].get("data", [])
idx = next((i for i, point in enumerate(map_data) if point.get("uuid") == uuid), None)
if idx is None:
raise LocationNotFoundError(uuid)
del map_data[idx]
json_file["map"]["data"] = map_data
json_file_atomic_dump(json_file, self.data_file_path)
[docs]
def json_db_delete_location(self, uuid):
"""Delete a location from the in-memory JSON database.
Raises:
LocationNotFoundError: If no location with the given UUID exists.
"""
idx = next(
(i for i, point in enumerate(self.data.get("data", [])) if point.get("uuid") == uuid), None
)
if idx is None:
raise LocationNotFoundError(uuid)
del self.data["data"][idx]
[docs]
def mongodb_db_delete_location(self, uuid):
"""Delete a location from MongoDB.
Raises:
LocationNotFoundError: If no location with the given UUID exists.
"""
result = self.db.locations.delete_one({"uuid": uuid})
if result.deleted_count == 0:
raise LocationNotFoundError(uuid)
[docs]
def delete_location(db, uuid):
"""Dispatch to the backend-specific delete_location function."""
return globals()[f"{db.module_name}_delete_location"](db, uuid)
# ------------------------------------------------
# add_suggestion
[docs]
def json_db_add_suggestion(self, suggestion_data):
"""Add a suggestion to the in-memory JSON database with 'pending' status."""
CRUDHelper.add_item_to_json_db(self.data, "suggestions", suggestion_data, "pending")
[docs]
def json_file_db_add_suggestion(self, suggestion_data):
"""Add a suggestion to the JSON file database with 'pending' status."""
CRUDHelper.add_item_to_json_file_db(
self.data_file_path, "suggestions", suggestion_data, "pending"
)
[docs]
def mongodb_db_add_suggestion(self, suggestion_data):
"""Add a suggestion to MongoDB with 'pending' status."""
CRUDHelper.add_item_to_mongodb(self.db.suggestions, suggestion_data, "Suggestion", "pending")
[docs]
def google_json_db_add_suggestion(self, suggestion_data):
"""No-op for Google Cloud Storage JSON (read-only backend)."""
# Temporary workaround: just use notifier without storing
# Full implementation would require writing back to Google Cloud Storage
pass
[docs]
def add_suggestion(db, suggestion_data):
"""Dispatch to the backend-specific add_suggestion function."""
return globals()[f"{db.module_name}_add_suggestion"](db, suggestion_data)
# ------------------------------------------------
# get_suggestions
[docs]
def json_db_get_suggestions(self, query_params):
"""Return suggestions from in-memory JSON database, optionally filtered by status."""
suggestions = self.data.get("suggestions", [])
statuses = query_params.get("status")
if statuses:
suggestions = [s for s in suggestions if s.get("status") in statuses]
return suggestions
[docs]
def json_db_get_suggestions_paginated(self, query):
"""JSON suggestions with improved pagination."""
suggestions = self.data.get("suggestions", [])
return PaginationHelper.create_paginated_response(suggestions, query)
[docs]
def json_file_db_get_suggestions(self, query_params):
"""Return suggestions from JSON file database, optionally filtered by status."""
with open(self.data_file_path, "r") as file:
json_file = json.load(file)
suggestions = json_file["map"].get("suggestions", [])
statuses = query_params.get("status")
if statuses:
suggestions = [s for s in suggestions if s.get("status") in statuses]
return suggestions
[docs]
def json_file_db_get_suggestions_paginated(self, query):
"""JSON file suggestions with improved pagination."""
with open(self.data_file_path, "r") as file:
json_file = json.load(file)
suggestions = json_file["map"].get("suggestions", [])
return PaginationHelper.create_paginated_response(suggestions, query)
[docs]
def mongodb_db_get_suggestions(self, query_params):
"""Return suggestions from MongoDB, optionally filtered by status."""
query = {}
statuses = query_params.get("status")
if statuses:
query["status"] = {"$in": statuses}
return list(self.db.suggestions.find(query, {"_id": 0}))
[docs]
def mongodb_db_get_suggestions_paginated(self, query):
"""MongoDB suggestions with improved pagination."""
page, per_page, sort_by, sort_order = __parse_pagination_params(query)
# Build MongoDB query
mongo_query = {}
statuses = query.get("status")
if statuses:
mongo_query["status"] = {"$in": statuses}
# Get total count
total_count = self.db.suggestions.count_documents(mongo_query)
# Build aggregation pipeline
pipeline = [{"$match": mongo_query}]
# Add sorting
if sort_by:
sort_direction = -1 if sort_order == "desc" else 1
pipeline.append({"$sort": {sort_by: sort_direction}})
# Add pagination
if per_page:
pipeline.extend([{"$skip": (page - 1) * per_page}, {"$limit": per_page}]) # type: ignore
# Remove MongoDB _id field
pipeline.append({"$project": {"_id": 0}})
# Execute query
cursor = self.db.suggestions.aggregate(pipeline)
items = list(cursor)
return __build_pagination_response(items, total_count, page, per_page)
[docs]
def google_json_db_get_suggestions(self, query_params):
"""Return empty list for Google Cloud Storage JSON (read-only backend)."""
# GoogleJsonDb is read-only, suggestions not stored in blob
return []
[docs]
def google_json_db_get_suggestions_paginated(self, query):
"""Google JSON suggestions with pagination (read-only)."""
return PaginationHelper.create_paginated_response([], query)
[docs]
def get_suggestions(db):
"""Dispatch to the backend-specific get_suggestions function."""
return globals()[f"{db.module_name}_get_suggestions"]
[docs]
def get_suggestions_paginated(db):
"""Dispatch to the backend-specific get_suggestions_paginated function."""
return globals()[f"{db.module_name}_get_suggestions_paginated"]
# ------------------------------------------------
# get_suggestion
[docs]
def json_db_get_suggestion(self, suggestion_id):
"""Return a single suggestion by UUID from in-memory JSON database."""
return next(
(s for s in self.data.get("suggestions", []) if s.get("uuid") == suggestion_id), None
)
[docs]
def json_file_db_get_suggestion(self, suggestion_id):
"""Return a single suggestion by UUID from JSON file database."""
with open(self.data_file_path, "r") as file:
json_file = json.load(file)
return next(
(s for s in json_file["map"].get("suggestions", []) if s.get("uuid") == suggestion_id), None
)
[docs]
def mongodb_db_get_suggestion(self, suggestion_id):
"""Return a single suggestion by UUID from MongoDB."""
return self.db.suggestions.find_one({"uuid": suggestion_id}, {"_id": 0})
[docs]
def google_json_db_get_suggestion(self, suggestion_id):
"""Return None for Google Cloud Storage JSON (read-only backend)."""
# GoogleJsonDb is read-only, suggestions not stored in blob
return None
[docs]
def get_suggestion(db):
"""Dispatch to the backend-specific get_suggestion function."""
return globals()[f"{db.module_name}_get_suggestion"]
# ------------------------------------------------
# update_suggestion
[docs]
def json_db_update_suggestion(self, suggestion_id, status):
"""Update a suggestion's status in the in-memory JSON database.
Raises:
ValueError: If no suggestion with the given UUID exists.
"""
suggestions = self.data.get("suggestions", [])
for s in suggestions:
if s.get("uuid") == suggestion_id:
s["status"] = status
return
raise ValueError(f"Suggestion with uuid {suggestion_id} not found")
[docs]
def json_file_db_update_suggestion(self, suggestion_id, status):
"""Update a suggestion's status in the JSON file database.
Raises:
ValueError: If no suggestion with the given UUID exists.
"""
with open(self.data_file_path, "r") as file:
json_file = json.load(file)
suggestions = json_file["map"].get("suggestions", [])
for s in suggestions:
if s.get("uuid") == suggestion_id:
s["status"] = status
break
else:
raise ValueError(f"Suggestion with uuid {suggestion_id} not found")
json_file["map"]["suggestions"] = suggestions
json_file_atomic_dump(json_file, self.data_file_path)
[docs]
def mongodb_db_update_suggestion(self, suggestion_id, status):
"""Update a suggestion's status in MongoDB.
Raises:
ValueError: If no suggestion with the given UUID exists.
"""
result = self.db.suggestions.update_one({"uuid": suggestion_id}, {"$set": {"status": status}})
if result.matched_count == 0:
raise ValueError(f"Suggestion with uuid {suggestion_id} not found")
[docs]
def google_json_db_update_suggestion(self, suggestion_id, status):
"""No-op for Google Cloud Storage JSON (read-only backend)."""
# GoogleJsonDb is read-only, no-op
pass
[docs]
def update_suggestion(db, suggestion_id, status):
"""Dispatch to the backend-specific update_suggestion function."""
return globals()[f"{db.module_name}_update_suggestion"](db, suggestion_id, status)
# ------------------------------------------------
# delete_suggestion
[docs]
def json_db_delete_suggestion(self, suggestion_id):
"""Delete a suggestion from the in-memory JSON database.
Raises:
ValueError: If no suggestion with the given UUID exists.
"""
suggestions = self.data.get("suggestions", [])
idx = next((i for i, s in enumerate(suggestions) if s.get("uuid") == suggestion_id), None)
if idx is None:
raise ValueError(f"Suggestion with uuid {suggestion_id} not found")
del suggestions[idx]
[docs]
def json_file_db_delete_suggestion(self, suggestion_id):
"""Delete a suggestion from the JSON file database.
Raises:
ValueError: If no suggestion with the given UUID exists.
"""
with open(self.data_file_path, "r") as file:
json_file = json.load(file)
suggestions = json_file["map"].get("suggestions", [])
idx = next((i for i, s in enumerate(suggestions) if s.get("uuid") == suggestion_id), None)
if idx is None:
raise ValueError(f"Suggestion with uuid {suggestion_id} not found")
del suggestions[idx]
json_file["map"]["suggestions"] = suggestions
json_file_atomic_dump(json_file, self.data_file_path)
[docs]
def mongodb_db_delete_suggestion(self, suggestion_id):
"""Delete a suggestion from MongoDB.
Raises:
ValueError: If no suggestion with the given UUID exists.
"""
result = self.db.suggestions.delete_one({"uuid": suggestion_id})
if result.deleted_count == 0:
raise ValueError(f"Suggestion with uuid {suggestion_id} not found")
[docs]
def google_json_db_delete_suggestion(self, suggestion_id):
"""No-op for Google Cloud Storage JSON (read-only backend)."""
# GoogleJsonDb is read-only, no-op
pass
[docs]
def delete_suggestion(db, suggestion_id):
"""Dispatch to the backend-specific delete_suggestion function."""
return globals()[f"{db.module_name}_delete_suggestion"](db, suggestion_id)
# ------------------------------------------------
# add_report
[docs]
def json_db_add_report(self, report_data):
"""Add a report to the in-memory JSON database.
Raises:
ValueError: If a report with the same UUID already exists.
"""
reports = self.data.setdefault("reports", [])
if any(r.get("uuid") == report_data.get("uuid") for r in reports):
raise ValueError(f"Report with uuid {report_data['uuid']} already exists")
reports.append(report_data)
[docs]
def json_file_db_add_report(self, report_data):
"""Add a report to the JSON file database.
Raises:
ValueError: If a report with the same UUID already exists.
"""
with open(self.data_file_path, "r") as file:
json_file = json.load(file)
reports = json_file["map"].get("reports", [])
if any(r.get("uuid") == report_data.get("uuid") for r in reports):
raise ValueError(f"Report with uuid {report_data['uuid']} already exists")
reports.append(report_data)
json_file["map"]["reports"] = reports
json_file_atomic_dump(json_file, self.data_file_path)
[docs]
def mongodb_db_add_report(self, report_data):
"""Add a report to MongoDB.
Raises:
ValueError: If a report with the same UUID already exists.
"""
existing = self.db.reports.find_one({"uuid": report_data.get("uuid")})
if existing:
raise ValueError(f"Report with uuid {report_data['uuid']} already exists")
self.db.reports.insert_one(report_data)
[docs]
def google_json_db_add_report(self, report_data):
"""No-op for Google Cloud Storage JSON (read-only backend)."""
# Temporary workaround: just use notifier without storing
# Full implementation would require writing back to Google Cloud Storage
pass
[docs]
def add_report(db, report_data):
"""Dispatch to the backend-specific add_report function."""
return globals()[f"{db.module_name}_add_report"](db, report_data)
# ------------------------------------------------
# get_reports
[docs]
def json_db_get_reports(self, query_params):
"""Return reports from in-memory JSON database, optionally filtered by status and priority."""
reports = self.data.get("reports", [])
statuses = query_params.get("status")
if statuses:
reports = [r for r in reports if r.get("status") in statuses]
priorities = query_params.get("priority")
if priorities:
reports = [r for r in reports if r.get("priority") in priorities]
return reports
[docs]
def json_db_get_reports_paginated(self, query):
"""JSON reports with improved pagination."""
reports = self.data.get("reports", [])
return PaginationHelper.create_paginated_response(reports, query)
[docs]
def json_file_db_get_reports(self, query_params):
"""Return reports from JSON file database, optionally filtered by status and priority."""
with open(self.data_file_path, "r") as file:
json_file = json.load(file)
reports = json_file["map"].get("reports", [])
statuses = query_params.get("status")
if statuses:
reports = [r for r in reports if r.get("status") in statuses]
priorities = query_params.get("priority")
if priorities:
reports = [r for r in reports if r.get("priority") in priorities]
return reports
[docs]
def json_file_db_get_reports_paginated(self, query):
"""JSON file reports with improved pagination."""
data = FileIOHelper.get_data_from_file(self.data_file_path)
reports = data.get("reports", [])
return PaginationHelper.create_paginated_response(reports, query)
[docs]
def mongodb_db_get_reports(self, query_params):
"""Return reports from MongoDB, optionally filtered by status and priority."""
query = {}
statuses = query_params.get("status")
if statuses:
query["status"] = {"$in": statuses}
priorities = query_params.get("priority")
if priorities:
query["priority"] = {"$in": priorities}
return list(self.db.reports.find(query, {"_id": 0}))
[docs]
def mongodb_db_get_reports_paginated(self, query):
"""MongoDB reports with improved pagination."""
page, per_page, sort_by, sort_order = __parse_pagination_params(query)
# Build MongoDB query
mongo_query = {}
statuses = query.get("status")
if statuses:
mongo_query["status"] = {"$in": statuses}
priorities = query.get("priority")
if priorities:
mongo_query["priority"] = {"$in": priorities}
# Get total count
total_count = self.db.reports.count_documents(mongo_query)
# Build aggregation pipeline
pipeline = [{"$match": mongo_query}]
# Add sorting
if sort_by:
sort_direction = -1 if sort_order == "desc" else 1
pipeline.append({"$sort": {sort_by: sort_direction}})
# Add pagination
if per_page:
pipeline.extend([{"$skip": (page - 1) * per_page}, {"$limit": per_page}]) # type: ignore
# Remove MongoDB _id field
pipeline.append({"$project": {"_id": 0}})
# Execute query
cursor = self.db.reports.aggregate(pipeline)
items = list(cursor)
return __build_pagination_response(items, total_count, page, per_page)
[docs]
def google_json_db_get_reports(self, query_params):
"""Return empty list for Google Cloud Storage JSON (read-only backend)."""
# GoogleJsonDb is read-only, reports not stored in blob
return []
[docs]
def google_json_db_get_reports_paginated(self, query):
"""Google JSON reports with pagination (read-only)."""
return PaginationHelper.create_paginated_response([], query)
[docs]
def get_reports(db):
"""Dispatch to the backend-specific get_reports function."""
return globals()[f"{db.module_name}_get_reports"]
[docs]
def get_reports_paginated(db):
"""Dispatch to the backend-specific get_reports_paginated function."""
return globals()[f"{db.module_name}_get_reports_paginated"]
# ------------------------------------------------
# get_report
[docs]
def json_db_get_report(self, report_id):
"""Return a single report by UUID from in-memory JSON database."""
return next((r for r in self.data.get("reports", []) if r.get("uuid") == report_id), None)
[docs]
def json_file_db_get_report(self, report_id):
"""Return a single report by UUID from JSON file database."""
with open(self.data_file_path, "r") as file:
json_file = json.load(file)
return next(
(r for r in json_file["map"].get("reports", []) if r.get("uuid") == report_id), None
)
[docs]
def mongodb_db_get_report(self, report_id):
"""Return a single report by UUID from MongoDB."""
return self.db.reports.find_one({"uuid": report_id}, {"_id": 0})
[docs]
def google_json_db_get_report(self, report_id):
"""Return None for Google Cloud Storage JSON (read-only backend)."""
# GoogleJsonDb is read-only, reports not stored in blob
return None
[docs]
def get_report(db):
"""Dispatch to the backend-specific get_report function."""
return globals()[f"{db.module_name}_get_report"]
# ------------------------------------------------
# update_report
[docs]
def json_db_update_report(self, report_id, status=None, priority=None):
"""Update a report's status and/or priority in the in-memory JSON database.
Raises:
ReportNotFoundError: If no report with the given UUID exists.
"""
reports = self.data.get("reports", [])
for r in reports:
if r.get("uuid") == report_id:
if status:
r["status"] = status
if priority:
r["priority"] = priority
return
raise ReportNotFoundError(report_id)
[docs]
def json_file_db_update_report(self, report_id, status=None, priority=None):
"""Update a report's status and/or priority in the JSON file database.
Raises:
ReportNotFoundError: If no report with the given UUID exists.
"""
with open(self.data_file_path, "r") as file:
json_file = json.load(file)
reports = json_file["map"].get("reports", [])
for r in reports:
if r.get("uuid") == report_id:
if status:
r["status"] = status
if priority:
r["priority"] = priority
break
else:
raise ReportNotFoundError(report_id)
json_file["map"]["reports"] = reports
json_file_atomic_dump(json_file, self.data_file_path)
[docs]
def mongodb_db_update_report(self, report_id, status=None, priority=None):
"""Update a report's status and/or priority in MongoDB.
Raises:
ReportNotFoundError: If no report with the given UUID exists.
"""
update_doc = {}
if status:
update_doc["status"] = status
if priority:
update_doc["priority"] = priority
if update_doc:
result = self.db.reports.update_one({"uuid": report_id}, {"$set": update_doc})
if result.matched_count == 0:
raise ReportNotFoundError(report_id)
[docs]
def google_json_db_update_report(self, report_id, status=None, priority=None):
"""No-op for Google Cloud Storage JSON (read-only backend)."""
# GoogleJsonDb is read-only, no-op
pass
[docs]
def update_report(db, report_id, status=None, priority=None):
"""Dispatch to the backend-specific update_report function."""
return globals()[f"{db.module_name}_update_report"](db, report_id, status, priority)
# ------------------------------------------------
# delete_report
[docs]
def json_db_delete_report(self, report_id):
"""Delete a report from the in-memory JSON database.
Raises:
ReportNotFoundError: If no report with the given UUID exists.
"""
reports = self.data.get("reports", [])
idx = next((i for i, r in enumerate(reports) if r.get("uuid") == report_id), None)
if idx is None:
raise ReportNotFoundError(report_id)
del reports[idx]
[docs]
def json_file_db_delete_report(self, report_id):
"""Delete a report from the JSON file database.
Raises:
ReportNotFoundError: If no report with the given UUID exists.
"""
with open(self.data_file_path, "r") as file:
json_file = json.load(file)
reports = json_file["map"].get("reports", [])
idx = next((i for i, r in enumerate(reports) if r.get("uuid") == report_id), None)
if idx is None:
raise ReportNotFoundError(report_id)
del reports[idx]
json_file["map"]["reports"] = reports
json_file_atomic_dump(json_file, self.data_file_path)
[docs]
def mongodb_db_delete_report(self, report_id):
"""Delete a report from MongoDB.
Raises:
ReportNotFoundError: If no report with the given UUID exists.
"""
result = self.db.reports.delete_one({"uuid": report_id})
if result.deleted_count == 0:
raise ReportNotFoundError(report_id)
[docs]
def google_json_db_delete_report(self, report_id):
"""No-op for Google Cloud Storage JSON (read-only backend)."""
# GoogleJsonDb is read-only, no-op
pass
[docs]
def delete_report(db, report_id):
"""Dispatch to the backend-specific delete_report function."""
return globals()[f"{db.module_name}_delete_report"](db, report_id)
# TODO extension function should be replaced with simple extend which would take a db plugin
# it could look like that:
# `db.extend(goodmap_db_plugin)` in plugin all those functions would be organized
[docs]
def extend_db_with_goodmap_queries(db, location_model):
"""Register all goodmap-specific query functions on the database instance.
Binds backend-specific implementations of all CRUD operations for locations,
suggestions, and reports to the given database object.
Args:
db: Database instance to extend with query functions.
location_model: Pydantic model class used for location validation.
Returns:
The extended database instance.
"""
db.extend("get_issue_options", get_issue_options(db))
db.extend("get_data", get_data(db))
db.extend("get_visible_data", get_visible_data(db))
db.extend("get_meta_data", get_meta_data(db))
db.extend("get_locations", get_locations(db, location_model))
db.extend("get_locations_paginated", get_locations_paginated(db, location_model))
db.extend("get_location", get_location(db, location_model))
db.extend("add_location", partial(add_location, location_model=location_model))
db.extend("update_location", partial(update_location, location_model=location_model))
db.extend("delete_location", delete_location)
db.extend("get_categories", get_categories(db))
db.extend("get_category_data", get_category_data(db))
db.extend("add_suggestion", add_suggestion)
db.extend("get_suggestions", get_suggestions(db))
db.extend("get_suggestions_paginated", get_suggestions_paginated(db))
db.extend("get_suggestion", get_suggestion(db))
db.extend("update_suggestion", update_suggestion)
db.extend("delete_suggestion", delete_suggestion)
db.extend("add_report", add_report)
db.extend("get_reports", get_reports(db))
db.extend("get_reports_paginated", get_reports_paginated(db))
db.extend("get_report", get_report(db))
db.extend("update_report", update_report)
db.extend("delete_report", delete_report)
return db