import importlib.metadata
import logging
import uuid
import deprecation
import numpy
import pysupercluster
from flask import Blueprint, jsonify, make_response, request
from flask_babel import gettext
from platzky import FeatureFlagSet
from platzky.attachment import AttachmentProtocol
from platzky.config import AttachmentConfig, LanguagesMapping
from spectree import Response, SpecTree
from goodmap.api_models import (
CSRFTokenResponse,
ErrorResponse,
LocationReportRequest,
LocationReportResponse,
SuccessResponse,
VersionResponse,
)
from goodmap.clustering import (
map_clustering_data_to_proper_lazy_loading_object,
match_clusters_uuids,
)
from goodmap.exceptions import LocationValidationError
from goodmap.feature_flags import CategoriesHelp
from goodmap.formatter import prepare_pin
from goodmap.json_security import (
MAX_JSON_DEPTH_LOCATION,
JSONDepthError,
JSONSizeError,
safe_json_loads,
)
# SuperCluster configuration constants
MIN_ZOOM = 0
MAX_ZOOM = 16
CLUSTER_RADIUS = 200
CLUSTER_EXTENT = 512
# Report description validation constants
MAX_DESCRIPTION_LENGTH = 500
# Error message constants
ERROR_INVALID_REQUEST_DATA = "Invalid request data"
ERROR_INVALID_LOCATION_DATA = "Invalid location data"
ERROR_LOCATION_NOT_FOUND = "Location not found"
ERROR_INVALID_DESCRIPTION = "Invalid report description"
logger = logging.getLogger(__name__)
[docs]
@deprecation.deprecated(
deprecated_in="1.5.0",
removed_in="2.0.0",
details="Configure 'reported_issue_types' in the database instead. "
"The hardcoded fallback will be removed in a future release.",
)
def get_default_issue_options():
"""Return hardcoded fallback issue options for backward compatibility."""
return ["notHere", "overload", "broken", "other"]
[docs]
def make_tuple_translation(keys_to_translate):
return [(x, gettext(x)) for x in keys_to_translate]
[docs]
def get_or_none(data, *keys):
for key in keys:
if isinstance(data, dict):
data = data.get(key)
else:
return None
return data
[docs]
def get_locations_from_request(database, request_args):
"""
Shared helper to fetch locations from database based on request arguments.
Args:
database: Database instance
request_args: Request arguments (flask.request.args)
Returns:
List of locations as basic_info dicts
"""
query_params = request_args.to_dict(flat=False)
all_locations = database.get_locations(query_params)
return [x.basic_info() for x in all_locations]
[docs]
def core_pages(
database,
languages: LanguagesMapping,
notifier_function,
csrf_generator,
location_model,
photo_attachment_class: type[AttachmentProtocol],
photo_attachment_config: AttachmentConfig,
feature_flags: FeatureFlagSet,
field_renderers: dict[str, str],
) -> Blueprint:
core_api_blueprint = Blueprint("api", __name__, url_prefix="/api")
# Build photo error message from config
allowed_ext = ", ".join(sorted(photo_attachment_config.allowed_extensions or []))
max_size_mb = photo_attachment_config.max_size / (1024 * 1024)
error_invalid_photo = (
f"Invalid photo. Allowed formats: {allowed_ext}. Max size: {max_size_mb:.0f}MB."
)
# Initialize Spectree for API documentation and validation
def _clean_model_name(model: type) -> str:
return model.__name__
spec = SpecTree(
"flask",
title="Goodmap API",
version="0.1",
path="doc",
annotations=True,
naming_strategy=_clean_model_name, # Use clean model names without hash
)
@core_api_blueprint.route("/suggest-new-point", methods=["POST"])
@spec.validate(resp=Response(HTTP_200=SuccessResponse, HTTP_400=ErrorResponse))
def suggest_new_point():
"""Suggest new location for review.
Accepts location data either as JSON or multipart/form-data.
All fields are validated using Pydantic location model.
"""
import json as json_lib
try:
# Initialize photo attachment (only populated for multipart/form-data)
photo_attachment = None
# Handle both multipart/form-data (with file uploads) and JSON
if request.content_type and request.content_type.startswith("multipart/form-data"):
# Parse form data dynamically
suggested_location = {}
for key in request.form:
value = request.form[key]
# Try to parse as JSON for complex types (arrays, objects, position)
try:
# SECURITY: Use safe_json_loads with strict depth limit
# MAX_JSON_DEPTH_LOCATION=1: arrays/objects of primitives only
suggested_location[key] = safe_json_loads(
value, max_depth=MAX_JSON_DEPTH_LOCATION
)
except (JSONDepthError, JSONSizeError) as e:
# Log security event and return 400
logger.warning(
f"JSON parsing blocked for security: {e}",
extra={"field": key, "value_size": len(value)},
)
return make_response(
jsonify(
{
"message": (
"Invalid request: JSON payload too complex or too large"
),
"error": str(e),
}
),
400,
)
except ValueError: # JSONDecodeError inherits from ValueError
# If not JSON, use as-is (simple string values)
suggested_location[key] = value
# Extract and validate photo attachment if present
photo_file = request.files.get("photo")
if photo_file and photo_file.filename:
photo_content = photo_file.read()
photo_mime = photo_file.content_type or "application/octet-stream"
# Validate using configured Attachment class
try:
photo_attachment = photo_attachment_class(
photo_file.filename, photo_content, photo_mime
)
except ValueError as e:
logger.warning(
"Rejected photo: %s",
e,
extra={"photo_filename": photo_file.filename},
)
return make_response(jsonify({"message": error_invalid_photo}), 400)
else:
# Parse JSON data with security checks (depth/size protection)
raw_data = request.get_data(as_text=True)
if not raw_data:
logger.warning("Empty JSON body in suggest endpoint")
return make_response(jsonify({"message": ERROR_INVALID_REQUEST_DATA}), 400)
try:
suggested_location = safe_json_loads(
raw_data, max_depth=MAX_JSON_DEPTH_LOCATION
)
except (JSONDepthError, JSONSizeError) as e:
logger.warning(
f"JSON parsing blocked for security: {e}",
extra={"value_size": len(raw_data)},
)
return make_response(
jsonify(
{
"message": (
"Invalid request: JSON payload too complex or too large"
),
"error": str(e),
}
),
400,
)
except ValueError:
logger.warning("Invalid JSON in suggest endpoint")
return make_response(jsonify({"message": ERROR_INVALID_REQUEST_DATA}), 400)
if suggested_location is None:
logger.warning("Null JSON value in suggest endpoint")
return make_response(jsonify({"message": ERROR_INVALID_REQUEST_DATA}), 400)
suggested_location.update({"uuid": str(uuid.uuid4())})
location = location_model.model_validate(suggested_location)
database.add_suggestion(location.model_dump())
message = gettext("A new location has been suggested with details")
notifier_message = f"{message}: {json_lib.dumps(suggested_location, indent=2)}"
attachments = [photo_attachment] if photo_attachment else None
notifier_function(notifier_message, attachments=attachments)
except LocationValidationError as e:
# NOTE: validation_errors includes input values from the location model fields:
# - Core fields: position (lat/long), uuid, remark
# - Dynamic fields: categories and obligatory_fields configured per deployment
# These are geographic/categorical data, NOT PII (no email, phone, names of people).
# Safe to log for debugging. If PII fields are ever added to the location model,
# strip 'input' from validation_errors before logging.
logger.warning(
"Location validation failed in suggest endpoint: %s",
e.validation_errors,
extra={"errors": e.validation_errors},
)
return make_response(jsonify({"message": ERROR_INVALID_LOCATION_DATA}), 400)
except Exception:
logger.exception("Error in suggest location endpoint")
return make_response(
jsonify({"message": "An error occurred while processing your suggestion"}), 500
)
return make_response(jsonify({"message": "Location suggested"}), 200)
@core_api_blueprint.route("/report-location", methods=["POST"])
@spec.validate(
json=LocationReportRequest,
resp=Response(HTTP_200=LocationReportResponse, HTTP_400=ErrorResponse),
)
def report_location():
"""Report a problem with a location.
Allows users to report issues with existing locations,
such as incorrect information or closed establishments.
"""
try:
location_report = request.get_json()
description = location_report["description"]
# Validate description against configured issue options
issue_options = database.get_issue_options()
if not issue_options:
issue_options = get_default_issue_options()
if description not in issue_options:
if "other" not in issue_options:
return make_response(jsonify({"message": ERROR_INVALID_DESCRIPTION}), 400)
if len(description) > MAX_DESCRIPTION_LENGTH:
return make_response(jsonify({"message": ERROR_INVALID_DESCRIPTION}), 400)
report = {
"uuid": str(uuid.uuid4()),
"location_id": location_report["id"],
"description": description,
"status": "pending",
"priority": "medium",
}
database.add_report(report)
message = (
f"A location has been reported: '{location_report['id']}' "
f"with problem: {location_report['description']}"
)
notifier_function(message)
except Exception:
logger.exception("Error in report location endpoint")
error_message = gettext("Error sending notification")
return make_response(jsonify({"message": error_message}), 500)
return make_response(jsonify({"message": gettext("Location reported")}), 200)
@core_api_blueprint.route("/locations", methods=["GET"])
@spec.validate()
def get_locations():
"""Get list of locations with basic info.
Returns locations filtered by query parameters,
showing only uuid, position, and remark flag.
"""
locations = get_locations_from_request(database, request.args)
return jsonify(locations)
@core_api_blueprint.route("/locations-clustered", methods=["GET"])
@spec.validate(resp=Response(HTTP_400=ErrorResponse))
def get_locations_clustered():
"""Get clustered locations for map display.
Returns locations grouped into clusters based on zoom level,
optimized for rendering on interactive maps.
"""
try:
query_params = request.args.to_dict(flat=False)
zoom = int(query_params.get("zoom", [7])[0])
# Validate zoom level (aligned with SuperCluster min_zoom/max_zoom)
if not MIN_ZOOM <= zoom <= MAX_ZOOM:
return make_response(
jsonify({"message": f"Zoom must be between {MIN_ZOOM} and {MAX_ZOOM}"}),
400,
)
points = get_locations_from_request(database, request.args)
if not points:
return jsonify([])
points_numpy = numpy.array(
[(point["position"][0], point["position"][1]) for point in points]
)
index = pysupercluster.SuperCluster(
points_numpy,
min_zoom=MIN_ZOOM,
max_zoom=MAX_ZOOM,
radius=CLUSTER_RADIUS,
extent=CLUSTER_EXTENT,
)
clusters = index.getClusters(
top_left=(-180.0, 90.0),
bottom_right=(180.0, -90.0),
zoom=zoom,
)
clusters = match_clusters_uuids(points, clusters)
return jsonify(map_clustering_data_to_proper_lazy_loading_object(clusters))
except ValueError as e:
logger.warning("Invalid parameter in clustering request: %s", e)
return make_response(jsonify({"message": "Invalid parameters provided"}), 400)
except Exception as e:
logger.exception("Clustering operation failed: %s", e)
return make_response(jsonify({"message": "An error occurred during clustering"}), 500)
@core_api_blueprint.route("/location/<location_id>", methods=["GET"])
@spec.validate(resp=Response(HTTP_404=ErrorResponse))
def get_location(location_id):
"""Get detailed information for a single location.
Returns full location data including all custom fields,
formatted for display in the location details view.
"""
location = database.get_location(location_id)
if location is None:
logger.info(ERROR_LOCATION_NOT_FOUND, extra={"uuid": location_id})
return make_response(jsonify({"message": ERROR_LOCATION_NOT_FOUND}), 404)
visible_data = database.get_visible_data()
meta_data = database.get_meta_data()
formatted_data = prepare_pin(
location.model_dump(), visible_data, meta_data, field_renderers
)
return jsonify(formatted_data)
@core_api_blueprint.route("/version", methods=["GET"])
@spec.validate(resp=Response(HTTP_200=VersionResponse))
def get_version():
"""Get backend version information.
Returns the current version of the Goodmap backend.
"""
version_info = {"backend": importlib.metadata.version("goodmap")}
return jsonify(version_info)
@core_api_blueprint.route("/generate-csrf-token", methods=["GET"])
@spec.validate(resp=Response(HTTP_200=CSRFTokenResponse))
@deprecation.deprecated(
deprecated_in="1.1.8",
details="This endpoint for explicit CSRF token generation is deprecated. "
"CSRF protection remains active in the application.",
)
def generate_csrf_token():
"""Generate CSRF token (DEPRECATED).
This endpoint is deprecated and maintained only for backward compatibility.
CSRF protection remains active in the application.
"""
csrf_token = csrf_generator()
return {"csrf_token": csrf_token}
@core_api_blueprint.route("/categories", methods=["GET"])
@spec.validate()
def get_categories():
"""Get all available location categories.
Returns list of categories with optional help text
if CATEGORIES_HELP feature flag is enabled.
"""
raw_categories = database.get_categories()
categories = make_tuple_translation(raw_categories)
if CategoriesHelp not in feature_flags:
return jsonify(categories)
category_data = database.get_category_data()
categories_help = category_data.get("categories_help")
proper_categories_help = []
if categories_help is not None:
for option in categories_help:
proper_categories_help.append({option: gettext(f"categories_help_{option}")})
return jsonify({"categories": categories, "categories_help": proper_categories_help})
@core_api_blueprint.route("/categories-full", methods=["GET"])
@spec.validate()
def get_categories_full():
"""Get all categories with their subcategory options in a single request.
Returns combined category data to reduce API calls for filter panel loading.
This endpoint eliminates the need for multiple sequential requests.
"""
categories_data = database.get_category_data()
result = []
categories_options_help = categories_data.get("categories_options_help", {})
for key, options in categories_data["categories"].items():
category_entry = {
"key": key,
"name": gettext(key),
"options": make_tuple_translation(options),
}
if CategoriesHelp in feature_flags:
option_help_list = categories_options_help.get(key, [])
proper_options_help = []
for option in option_help_list:
proper_options_help.append(
{option: gettext(f"categories_options_help_{option}")}
)
category_entry["options_help"] = proper_options_help
result.append(category_entry)
response = {"categories": result}
if CategoriesHelp in feature_flags:
categories_help = categories_data.get("categories_help", [])
proper_categories_help = []
for option in categories_help:
proper_categories_help.append({option: gettext(f"categories_help_{option}")})
response["categories_help"] = proper_categories_help
return jsonify(response)
@core_api_blueprint.route("/languages", methods=["GET"])
@spec.validate()
def get_languages():
"""Get all available interface languages.
Returns list of supported languages for the application.
"""
return jsonify(languages)
@core_api_blueprint.route("/category/<category_type>", methods=["GET"])
@spec.validate()
def get_category_types(category_type):
"""Get all available options for a specific category.
Returns list of category options with optional help text
if CATEGORIES_HELP feature flag is enabled.
"""
category_data = database.get_category_data(category_type)
local_data = make_tuple_translation(category_data["categories"][category_type])
categories_options_help = get_or_none(
category_data, "categories_options_help", category_type
)
proper_categories_options_help = []
if categories_options_help is not None:
for option in categories_options_help:
proper_categories_options_help.append(
{option: gettext(f"categories_options_help_{option}")}
)
if CategoriesHelp not in feature_flags:
return jsonify(local_data)
return jsonify(
{
"categories_options": local_data,
"categories_options_help": proper_categories_options_help,
}
)
# Register Spectree with blueprint after all routes are defined
spec.register(core_api_blueprint)
@core_api_blueprint.route("/doc")
def api_doc_index():
"""Return links to available API documentation formats."""
html = """<!DOCTYPE html>
<html><head><title>API Documentation</title></head>
<body>
<h1>API Documentation</h1>
<ul>
<li><a href="/api/doc/swagger/">Swagger UI</a></li>
<li><a href="/api/doc/redoc/">ReDoc</a></li>
<li><a href="/api/doc/openapi.json">OpenAPI JSON</a></li>
</ul>
</body></html>"""
return html, 200, {"Content-Type": "text/html"}
return core_api_blueprint