diff --git a/bookings/bookings/doctype/item_booking/item_booking.py b/bookings/bookings/doctype/item_booking/item_booking.py
new file mode 100644
index 0000000000000000000000000000000000000000..b0eb9a35af48f464cdcfd0946bb08f21a21cdf31
--- /dev/null
+++ b/bookings/bookings/doctype/item_booking/item_booking.py
@@ -0,0 +1,1985 @@
+# Copyright (c) 2025, Dokos SAS and contributors
+# For license information, please see license.txt
+
+import calendar
+import datetime
+import json
+from collections.abc import Iterable
+from datetime import timedelta
+from functools import cached_property
+from typing import TYPE_CHECKING, TypedDict
+
+import frappe
+from erpnext.accounts.party import get_party_account_currency
+from erpnext.controllers.website_list_for_contact import get_customers_suppliers
+from erpnext.setup.utils import get_exchange_rate
+from frappe import _
+from frappe.desk.calendar import process_recurring_events
+from frappe.integrations.doctype.google_calendar.google_calendar import (
+ format_date_according_to_google_calendar,
+ get_google_calendar_object,
+ get_timezone_naive_datetime,
+)
+from frappe.model.document import Document
+from frappe.model.mapper import get_mapped_doc
+from frappe.utils import (
+ add_days,
+ cint,
+ date_diff,
+ flt,
+ get_datetime,
+ get_time,
+ getdate,
+ is_desk,
+ now,
+ now_datetime,
+ sbool,
+ time_diff_in_minutes,
+)
+from frappe.utils.deprecations import deprecated
+from googleapiclient.errors import HttpError
+
+from bookings.bookings.doctype.booking_credit.booking_credit import get_booking_credit_types_for_item
+from bookings.bookings.doctype.item_booking_calendar.item_booking_calendar import (
+ ItemBookingExceptionEngine,
+)
+from bookings.bookings.utils.customer import get_linked_customers
+
+if TYPE_CHECKING:
+ from collections.abc import Callable
+ from typing import TypeVar
+
+ from erpnext.selling.doctype.quotation.quotation import Quotation
+
+ class schedule_line_t(TypedDict):
+ start_time: str
+ end_time: str
+ day: str
+ whole: bool | None # Ignore the duration of the UOM and fill the full schedule line with a single slot.
+
+ class item_calendar_t(TypedDict):
+ type: str
+ calendar: list[schedule_line_t]
+ name: str | None
+ minimum_duration_in_seconds: int | None # Minimal duration of a slot in seconds.
+
+ class prepared_schedule_line_t(TypedDict):
+ start: datetime.datetime
+ end: datetime.datetime
+ whole: bool | None
+ base_start: datetime.datetime
+
+ T = TypeVar("T")
+
+
+def util_split_list(items: "list[T]", condition: "Callable[[T], bool]") -> "tuple[list[T], list[T]]":
+ """
+ Splits a list into two lists based on a condition.
+ Similar to:
+ ```py
+ left = [item for item in items if not condition(item)]
+ right = [item for item in items if condition(item)]
+ ```
+ """
+ items_no, items_yes = [], []
+ for item in items:
+ (items_yes if condition(item) else items_no).append(item)
+ return items_no, items_yes
+
+
+class BookingException(frappe.ValidationError):
+ @classmethod
+ def throw(cls, *args, **kwargs):
+ if is_desk():
+ cls.throw_desk(*args, **kwargs)
+ else:
+ cls.throw_website(*args, **kwargs)
+
+ @classmethod
+ def throw_website(cls, *args, **kwargs):
+ raise cls(*args, **kwargs)
+
+ @classmethod
+ def throw_desk(cls, *args, **kwargs):
+ raise cls(*args, **kwargs)
+
+
+class ExceptionBookingOverlap(BookingException):
+ @classmethod
+ def overlaps_to_html(cls, overlaps: list):
+ from frappe.utils import get_link_to_form
+
+ conflicts_str = "
"
+ conflicts_str += "
" + _("Existing Bookings") + "
"
+ conflicts_str += ""
+ shown = set()
+ for overlap in overlaps:
+ dt, name = overlap.get("doctype"), overlap.get("name")
+ if (dt, name) in shown:
+ continue
+ shown.add((dt, name))
+ link = get_link_to_form(dt, name)
+ conflicts_str += f"- {link}
"
+ conflicts_str += "
"
+ return conflicts_str
+
+ @classmethod
+ def throw_website(cls, doc: "ItemBooking", overlaps: list):
+ frappe.throw(_("This slot is no longer bookable."), exc=cls)
+
+ @classmethod
+ def throw_desk(cls, doc: "ItemBooking", overlaps: list):
+ msg = _(
+ "An existing item booking or subscription for this item is overlapping with this document. Please change its dates to save it, or change your settings in Venue Settings."
+ )
+
+ msg = msg + cls.overlaps_to_html(overlaps)
+ frappe.throw(msg, exc=cls)
+
+
+class ExceptionTooManyBookings(ExceptionBookingOverlap):
+ @classmethod
+ def throw_website(cls, doc: "ItemBooking", overlaps: list):
+ frappe.throw(_("This slot is no longer bookable."), exc=cls)
+
+ @classmethod
+ def throw_desk(cls, doc: "ItemBooking", overlaps: list):
+ frappe.throw(
+ _("The maximum number of simultaneous bookings allowed for this item has been reached.")
+ + cls.overlaps_to_html(overlaps),
+ exc=cls,
+ )
+
+class ItemBooking(Document):
+ # begin: auto-generated types
+ # This code is auto-generated. Do not modify anything in this block.
+
+ from typing import TYPE_CHECKING
+
+ if TYPE_CHECKING:
+ from frappe.types import DF
+
+ all_day: DF.Check
+ amended_from: DF.Link | None
+ booking_resource: DF.Link
+ color: DF.Color | None
+ deduct_booking_credits: DF.Check
+ ends_on: DF.Datetime
+ event: DF.Link | None
+ google_calendar: DF.Link | None
+ google_calendar_event_id: DF.SmallText | None
+ google_calendar_id: DF.Data | None
+ item: DF.Link | None
+ naming_series: DF.Literal["STO-BOOK-.YYYY.-"]
+ notes: DF.SmallText | None
+ parent_item_booking: DF.Link | None
+ party_name: DF.DynamicLink | None
+ party_type: DF.Link | None
+ repeat_this_event: DF.Check
+ repeat_till: DF.Date | None
+ rrule: DF.SmallText | None
+ starts_on: DF.Datetime
+ status: DF.Literal["In cart", "Not confirmed", "Confirmed", "Cancelled"]
+ sync_with_google_calendar: DF.Check
+ title: DF.Data | None
+ uom: DF.Link | None
+ user: DF.Link | None
+ # end: auto-generated types
+
+ def before_insert(self):
+ if self.parent_item_booking:
+ self.google_calendar = self.google_calendar_id = None
+
+ def validate(self):
+ self.validate_linked_item()
+ self.set_title()
+
+ if self.sync_with_google_calendar and not self.google_calendar:
+ self.google_calendar = frappe.db.get_value("Item", self.item, "google_calendar")
+
+ if self.google_calendar and not self.google_calendar_id:
+ self.google_calendar_id = frappe.db.get_value(
+ "Google Calendar", self.google_calendar, "google_calendar_id"
+ )
+
+ if isinstance(self.rrule, list) and self.rrule > 1:
+ self.rrule = self.rrule[0]
+
+ if get_datetime(self.starts_on) > get_datetime(self.ends_on):
+ frappe.throw(_("Please make sure the end time is greater than the start time"))
+
+ if not self.color:
+ self.color = frappe.db.get_value("Item", self.item, "calendar_color")
+
+ if not (self.party_type and self.party_name) and self.user:
+ self.party_type, self.party_name = get_corresponding_party(self.user)
+
+ self.check_overlaps()
+
+ def validate_linked_item(self):
+ if not self.item:
+ return
+
+ item_doc = frappe.get_value("Item", self.item, ["enable_item_booking"], as_dict=True)
+ if not item_doc:
+ return # Link validation will catch this
+
+ from frappe.utils import get_link_to_form
+
+ item_link = get_link_to_form("Item", self.item)
+
+ if not item_doc["enable_item_booking"]:
+ msg = _("Booking is not enabled for this item.")
+ msg = _("{0}: {1}").format(item_link, msg)
+ if is_desk():
+ frappe.msgprint(msg)
+ else:
+ frappe.throw(msg)
+
+ def check_overlaps(self):
+ overlapping_bookings = self.get_overlapping_bookings()
+ overlapping_subscriptions = self.get_overlapping_subscriptions()
+ overlaps = overlapping_bookings + overlapping_subscriptions
+
+ # Split overlaps into repeating and non-repeating in a single line of code using functools
+ non_repeating_overlaps, repeating_overlaps = util_split_list(
+ overlaps, lambda x: x.get("repeat_this_event", False)
+ )
+
+ overlaps = non_repeating_overlaps
+ self_start = get_datetime(self.starts_on)
+ self_end = get_datetime(self.ends_on)
+ for rep in repeating_overlaps:
+ # Transform the recurring event into a list of instances
+ # Note that the starting timestamp for the processing is the 00:00 of the day of the current booking (self)
+ # NOTE: Only events that start on the same day as the current booking are considered for overlap.
+ # TODO: Handle the case where the recurring event starts 1 day or more before the current booking.
+ # Example: A recurring event that starts at 23:00 on the day before the current booking.
+
+ time_window = (self_start.date(), self.ends_on)
+ recurring = list(process_recurring_events(rep, *time_window, "starts_on", "ends_on", "rrule"))
+
+ # If the recurring event does not have any instances in the time window, at least keep the original event.
+ if not recurring:
+ recurring = [rep]
+
+ def filt(other: dict):
+ """Filter out instances of the recurring event that do not overlap with the current event."""
+ other_start = get_datetime(other.get("starts_on"))
+ other_end = get_datetime(other.get("ends_on"))
+ return (other_start < self_end) and (other_end > self_start)
+
+ recurring = [other for other in recurring if filt(other)]
+ if recurring:
+ overlaps.extend(recurring)
+
+ # Process the overlaps, which is a list of existing documents that overlap with the current booking.
+ if not overlaps:
+ # It is always possible to book an item if there are no overlaps.
+ return
+
+ # Get the number of simultaneous bookings allowed for this item.
+ simultaneous_bookings_allowed = 0
+ if frappe.db.get_single_value("Venue Settings", "enable_simultaneous_booking"):
+ # If simultaneous bookings are enabled, get the number of simultaneous bookings allowed for this item.
+ simultaneous_bookings_allowed = cint(
+ frappe.db.get_value("Item", self.item, "simultaneous_bookings_allowed") # type: ignore
+ )
+
+ # Get if overlaps are disallowed for desk users
+ no_overlap_per_item = frappe.db.get_single_value("Venue Settings", "no_overlap_per_item")
+
+ # At this point, there are overlaps, so we need to check if there are too many bookings.
+
+ if no_overlap_per_item or not is_desk():
+ if simultaneous_bookings_allowed <= 0:
+ # Overlaps are not allowed, and there is already a booking/subscription.
+ ExceptionBookingOverlap.throw(self, overlaps)
+ elif len(overlaps) >= simultaneous_bookings_allowed:
+ # There would be too many bookings if we allowed this one.
+ ExceptionTooManyBookings.throw(self, overlaps)
+ else:
+ if len(overlaps) >= simultaneous_bookings_allowed:
+ # NOTE: This always run for simultaneous_bookings_allowed=0
+ frappe.publish_realtime("booking_overlap")
+
+ def get_overlapping_bookings(self):
+ from pypika import Criterion
+ from pypika import functions as fn
+
+ if not self.item:
+ return []
+
+ IB = frappe.qb.DocType("Item Booking")
+ # https://stackoverflow.com/questions/13390333/two-rectangles-intersection
+ item_name = self.item if isinstance(self.item, str) else self.item.name
+ query = (
+ frappe.qb.select(IB.name, IB.repeat_this_event, IB.rrule, IB.starts_on, IB.ends_on)
+ .from_(IB)
+ .where(
+ Criterion.all(
+ [
+ IB.name != self.name,
+ IB.item == item_name,
+ IB.status != "Cancelled",
+ ]
+ )
+ )
+ .where(
+ Criterion.any(
+ [
+ (IB.starts_on < self.ends_on) & (IB.ends_on > self.starts_on),
+ # Check overlaps with recurring events.
+ # TODO: Check overlaps with other events when this booking (self) is recurring.
+ (IB.starts_on < self.ends_on)
+ & (IB.repeat_this_event == 1)
+ & (fn.Coalesce(IB.repeat_till, "9999-01-01") > self.starts_on),
+ ]
+ )
+ )
+ )
+ return [{"doctype": "Item Booking", **booking} for booking in query.run(as_dict=True)]
+
+ def get_overlapping_subscriptions(self):
+ from pypika import Criterion
+ from pypika import functions as fn
+
+ if not self.item:
+ return []
+
+ Subscription = frappe.qb.DocType("Subscription")
+ SubscriptionPlanDetail = frappe.qb.DocType("Subscription Plan Detail")
+ start_field = fn.Coalesce(SubscriptionPlanDetail.from_date, Subscription.start, "0000-00-00")
+ end_field = fn.Coalesce(SubscriptionPlanDetail.to_date, Subscription.cancellation_date, "9999-01-01")
+ item_name = self.item if isinstance(self.item, str) else self.item.name
+ query = (
+ frappe.qb.select(Subscription.name)
+ .from_(Subscription)
+ .join(SubscriptionPlanDetail)
+ .on(
+ Criterion.all(
+ [
+ Subscription.name == SubscriptionPlanDetail.parent,
+ SubscriptionPlanDetail.parenttype == "Subscription",
+ ]
+ )
+ )
+ .where(
+ Criterion.all(
+ [
+ start_field < self.ends_on,
+ end_field > self.starts_on,
+ SubscriptionPlanDetail.booked_item == item_name,
+ ]
+ )
+ )
+ .limit(1)
+ )
+ return [{"doctype": "Subscription", "name": sub[0]} for sub in query.run()]
+
+ def set_title(self):
+ if self.meta.get_field("title").hidden or not self.title:
+ self.title = self.booking_resource or ""
+ if self.user:
+ user_name = frappe.db.get_value("User", self.user, "full_name")
+ self.title += " - " + (user_name or self.user)
+
+ elif self.party_name and self.party_type:
+ self.title += " - " + frappe.get_doc(self.party_type, self.party_name).get_title() or ""
+
+ def set_status(self, status):
+ self.db_set("status", status, update_modified=True, notify=True)
+
+ gcalendar_method = (
+ delete_event_in_google_calendar if status == "Cancelled" else update_event_in_google_calendar
+ )
+ gcalendar_method(self)
+
+ for child in frappe.get_all(
+ "Item Booking", filters=dict(parent_item_booking=self.name), pluck="name"
+ ):
+ child = frappe.get_doc("Item Booking", child)
+ child.set_status(status)
+ gcalendar_method(child)
+
+ def on_update(self):
+ self.synchronize_child_bookings()
+
+ def synchronize_child_bookings(self):
+ def update_child(item, childname=None):
+ child = frappe.get_doc("Item Booking", childname) if childname else frappe.new_doc("Item Booking")
+ child.update(
+ {
+ key: value
+ for key, value in frappe.copy_doc(self).as_dict().items()
+ if (value is not None and not key.startswith("__"))
+ }
+ )
+ child.item = item
+ child.parent_item_booking = self.name
+ child.save()
+
+ if frappe.db.exists("Product Bundle", dict(new_item_code=self.item)):
+ doc = frappe.get_doc("Product Bundle", dict(new_item_code=self.item))
+ for item in doc.items:
+ childnames = frappe.db.get_all(
+ "Item Booking", dict(item=item.item_code, parent_item_booking=self.name), pluck="name"
+ )
+ for childname in childnames:
+ update_child(item.item_code, childname)
+
+ if not childnames:
+ for _value in range(int(item.qty)):
+ update_child(item.item_code)
+
+ elif frappe.db.exists("Item Booking", dict(parent_item_booking=self.name)):
+ for child in frappe.get_all(
+ "Item Booking", filters=dict(parent_item_booking=self.name), fields=["name", "item"]
+ ):
+ update_child(child.item, child.name)
+
+ def credits_have_been_deducted(self):
+ return bool(
+ frappe.db.get_all(
+ "Booking Credit Usage", filters={"item_booking": self.name, "docstatus": 1}, limit=1
+ )
+ )
+
+ def get_deducted_credits(self):
+ return sum(
+ frappe.db.get_all(
+ "Booking Credit Usage", filters={"item_booking": self.name, "docstatus": 1}, pluck="quantity"
+ )
+ )
+
+ def cancel_appointment(self, ignore_links=False):
+ if ignore_links:
+ self.flags.ignore_links = True
+
+ # Check that the user is allowed to cancel the appointment
+ allow_event_cancellation = frappe.db.get_single_value("Venue Settings", "allow_event_cancellation")
+ if not allow_event_cancellation:
+ return frappe.throw("Item Booking cancellation is not allowed")
+
+ if self.status == "Cancelled":
+ return frappe.throw("Item Booking already cancelled")
+
+ # Cancellation delay is in seconds
+ cancellation_delay = cint(frappe.db.get_single_value("Venue Settings", "cancellation_delay"))
+ if time_diff_in_minutes(self.starts_on, now_datetime()) < (cancellation_delay / 60):
+ formatted_duration = frappe.format(cancellation_delay, "Duration")
+ return frappe.throw(
+ _("Cancellation is only possible up to {0} before the start of the booking").format(
+ formatted_duration
+ )
+ )
+
+ self.set_status("Cancelled")
+
+ @frappe.whitelist()
+ def end_now(self):
+ new_end = now_datetime()
+ old_start = get_datetime(self.starts_on)
+ old_end = get_datetime(self.ends_on)
+ assert old_start and old_end # for typechecking
+
+ if old_start <= new_end <= old_end:
+ self.ends_on = new_end
+ return
+ else:
+ frappe.throw(
+ _("Date must be between {0} and {1}").format(
+ self.get_formatted("starts_on"),
+ self.get_formatted("ends_on"),
+ )
+ )
+
+ def has_website_permission(self, ptype, user, verbose=False):
+ if ptype == "read":
+ # Read-only, user has to use the website to cancel the bookings.
+ return has_booking_permission(self, ptype=ptype, user=user, raise_exception=False)
+ return False
+
+
+def get_list_context(context=None):
+ allow_event_cancellation = frappe.db.get_single_value("Venue Settings", "allow_event_cancellation")
+
+ cancellation_delay = (
+ cint(frappe.db.get_single_value("Venue Settings", "cancellation_delay")) / 60
+ if allow_event_cancellation
+ else 0
+ )
+ context.update(
+ {
+ "show_sidebar": True,
+ "show_search": True,
+ "no_breadcrumbs": True,
+ "title": _("Bookings"),
+ "get_list": get_bookings_list,
+ "row_template": "templates/includes/item_booking/item_booking_row.html",
+ "can_cancel": allow_event_cancellation,
+ "cancellation_delay": cancellation_delay,
+ "header_action": frappe.render_template(
+ "templates/includes/item_booking/item_booking_list_action.html",
+ {"can_cancel": allow_event_cancellation, "cancellation_delay": cancellation_delay},
+ ),
+ "list_footer": frappe.render_template(
+ "templates/includes/item_booking/item_booking_list_footer.html", {}
+ ),
+ }
+ )
+
+
+def get_bookings_list(doctype, txt, filters, limit_start, limit_page_length=20, order_by=None):
+ from frappe.www.list import get_list
+
+ user = frappe.session.user
+
+ if user == "Guest":
+ return []
+
+ customers = set()
+ or_filters = []
+
+ contact = frappe.db.get_value("Contact", {"user": user}, "name")
+ if contact:
+ contact_doc = frappe.get_doc("Contact", contact)
+ if customer := contact_doc.get_link_for("Customer"):
+ customers.add(customer)
+
+ all_customers, _ = get_customers_suppliers("Customer", user)
+ customers.update(all_customers or [])
+
+ or_filters.append(["user", "=", user])
+ if customers:
+ or_filters.append(["party_name", "in", customers])
+
+ return get_list(
+ doctype,
+ txt,
+ filters,
+ limit_start,
+ limit_page_length,
+ ignore_permissions=True,
+ or_filters=or_filters,
+ order_by="starts_on desc",
+ )
+
+
+@frappe.whitelist(allow_guest=True)
+def get_bookings_list_for_map(start, end):
+ bookings_list = _get_events(getdate(start), getdate(end), item=None, user=frappe.session.user)
+
+ def get_title(x):
+ return x.get("title", x.get("booking_resource", x.name))
+
+ def is_all_day(x):
+ return x.get("all_day", x.get("allDay", False))
+
+ def get_color(x):
+ if x.ends_on < frappe.utils.now_datetime():
+ return "darkgrey"
+ if x.status == "Cancelled":
+ return "#ff4d4d"
+ elif x.status == "Confirmed":
+ return "#6195ff"
+ elif x.status == "In cart":
+ return "#b67890"
+ return "#ff7846"
+
+ return [
+ dict(
+ start=x.starts_on,
+ end=x.ends_on,
+ title=get_title(x),
+ status=x.status,
+ id=x.name,
+ allDay=is_all_day(x),
+ item_name=x.get("booking_resource"),
+ backgroundColor=get_color(x),
+ borderColor="darkgrey",
+ )
+ for x in bookings_list
+ ]
+
+
+@frappe.whitelist()
+def update_linked_transaction(transaction_type, line_item, item_booking):
+ has_booking_permission(item_booking, raise_exception=True)
+ return frappe.db.set_value(f"{transaction_type} Item", line_item, "item_booking", item_booking)
+
+
+@frappe.whitelist()
+def get_transactions_items(transaction_type, transactions):
+ frappe.has_permission(transaction_type, "read", throw=True)
+ transactions = frappe.parse_json(transactions)
+ output = []
+ for transaction in transactions:
+ doc = frappe.get_doc(transaction_type, transaction)
+ output.extend(doc.items)
+
+ return output
+
+
+def has_booking_permission(doc: ItemBooking | str, ptype="write", user="", raise_exception=False):
+ from frappe.permissions import has_permission
+
+ if raise_exception:
+ if not has_booking_permission(doc, ptype, user, raise_exception=False):
+ frappe.throw("Not allowed", frappe.PermissionError)
+
+ user = user or frappe.session.user
+ if isinstance(doc, str):
+ doc: ItemBooking = frappe.get_doc("Item Booking", doc) # type: ignore
+
+ if has_permission(doc.doctype, ptype, doc=doc, print_logs=False, user=user):
+ return True
+
+ if doc.user == user:
+ return True
+
+ customers, _ = get_customers_suppliers("Customer", user)
+ if customers:
+ for c in customers:
+ if doc.party_name == c:
+ return True
+
+ return False
+
+
+@frappe.whitelist()
+def cancel_appointment(id, force=False, render_row=False):
+ booking: ItemBooking = frappe.get_doc("Item Booking", id) # type: ignore
+ has_booking_permission(booking, raise_exception=True)
+ booking.flags.ignore_permissions = True
+ booking.cancel_appointment(ignore_links=force)
+ booking.save()
+ if render_row:
+ return do_render_row(booking)
+
+
+@frappe.whitelist()
+def end_booking_now(id, render_row=False):
+ booking: ItemBooking = frappe.get_doc("Item Booking", id) # type: ignore
+ has_booking_permission(booking, raise_exception=True)
+ booking.flags.ignore_permissions = True
+ booking.end_now()
+ booking.save()
+ if render_row:
+ return do_render_row(booking)
+
+
+def do_render_row(doc: ItemBooking):
+ ctx = {"doc": doc}
+ get_list_context(ctx)
+ return frappe.render_template(ctx["row_template"], ctx)
+
+
+@frappe.whitelist(allow_guest=True)
+def get_item_uoms(item_code):
+ return {
+ "uoms": frappe.get_all(
+ "UOM Conversion Detail",
+ filters={"parent": item_code},
+ fields=["distinct uom"],
+ order_by="idx desc",
+ as_list=1,
+ ),
+ "sales_uom": frappe.get_cached_value("Item", item_code, "sales_uom"),
+ }
+
+
+@frappe.whitelist()
+@deprecated
+def book_new_slot(**kwargs):
+ frappe.log_error(
+ "book_new_slot is deprecated: use webshop.webshop.shopping_cart.booking.book_new_slot instead"
+ )
+ try:
+ doc = frappe.get_doc(
+ {
+ "doctype": "Item Booking",
+ "item": kwargs.get("item"),
+ "starts_on": kwargs.get("start"),
+ "ends_on": kwargs.get("end"),
+ "user": kwargs.get("user"),
+ "status": kwargs.get("status") or "In cart",
+ "event": kwargs.get("event"),
+ "all_day": kwargs.get("all_day") or 0,
+ "uom": kwargs.get("uom"),
+ "sync_with_google_calendar": kwargs.get("sync_with_google_calendar")
+ or frappe.db.get_single_value("Venue Settings", "sync_with_google_calendar"),
+ "deduct_booking_credits": sbool(kwargs.get("with_credits")),
+ }
+ ).insert(ignore_permissions=True)
+
+ return doc
+ except Exception:
+ if frappe.db.get_value("User", frappe.session.user, "user_type") != "System User":
+ frappe.log_error(_("New item booking error"))
+
+
+@frappe.whitelist()
+def book_new_slot_from_event(**kwargs):
+ frappe.only_for("Desk User")
+ doc = frappe.new_doc("Item Booking")
+ doc.update(
+ {
+ "item": kwargs.get("item"),
+ "starts_on": kwargs.get("start"),
+ "ends_on": kwargs.get("end"),
+ "user": kwargs.get("user"),
+ "status": kwargs.get("status") or "In cart",
+ "event": kwargs.get("event"),
+ "all_day": kwargs.get("all_day") or 0,
+ "uom": kwargs.get("uom"),
+ "sync_with_google_calendar": kwargs.get("sync_with_google_calendar")
+ or frappe.db.get_single_value("Venue Settings", "sync_with_google_calendar"),
+ "deduct_booking_credits": sbool(kwargs.get("with_credits")),
+ }
+ )
+ doc.insert(ignore_permissions=True)
+ return doc
+
+
+@frappe.whitelist()
+def remove_booked_slot(name):
+ has_booking_permission(name, raise_exception=True)
+ try:
+ for dt in ["Quotation", "Sales Order"]:
+ linked_docs = frappe.get_all(
+ f"{dt} Item", filters={"item_booking": name, "parenttype": dt}, fields=["name", "parent"]
+ )
+ for d in linked_docs:
+ doc = frappe.get_doc(dt, d.get("parent"))
+ if len(doc.items) > 1:
+ doc.items = [i for i in doc.items if i.item_booking != name]
+ doc.flags.ignore_permissions = True
+ doc.save()
+ else:
+ frappe.delete_doc(dt, doc.name, ignore_permissions=True, force=True)
+
+ return frappe.delete_doc("Item Booking", name, ignore_permissions=True, force=True)
+ except frappe.TimestampMismatchError:
+ frappe.get_doc("Item Booking", name).reload()
+ remove_booked_slot(name)
+
+
+# TODO: refactor with a class and add an option to get monthly availabilities
+@frappe.whitelist(allow_guest=True)
+def get_availabilities(item: str, start, end, uom: str | None = None, user: str | None = None, limit=0):
+ return ItemBookingAvailabilities(
+ item=item, start=start, end=end, uom=uom, user=user, limit=limit
+ ).get_available_slots()
+
+
+def _parse_date_or_datetime(s: str):
+ from frappe.utils.data import convert_utc_to_system_timezone
+
+ if isinstance(s, str):
+ if s.endswith("Z"):
+ d = get_datetime(s.rstrip("Z"))
+ d = convert_utc_to_system_timezone(d)
+ return d.replace(tzinfo=None)
+
+ if len(s) <= 10:
+ return datetime.datetime.strptime(s, "%Y-%m-%d")
+
+ return get_datetime(s)
+
+
+class ItemBookingAvailabilities:
+ def __init__(
+ self,
+ *,
+ item: str,
+ start: str | datetime.datetime | None,
+ end: str | datetime.datetime | None,
+ uom: str | None = None,
+ user: str | None = None,
+ limit: int = 0, # does NOT work well with exceptions
+ **kwargs,
+ ):
+ self.item = item
+ self.start = start
+ self.end = end
+ self.limit = int(limit)
+ self.init = _parse_date_or_datetime(self.start)
+ self.finish = _parse_date_or_datetime(self.end)
+ self.user = user or frappe.session.user
+
+ if user == "*" and not frappe.has_permission("Item Booking", "read"):
+ user = frappe.session.user
+
+ self.item_doc = frappe.db.get_value(
+ "Item",
+ self.item,
+ ["name", "sales_uom", "enable_item_booking", "simultaneous_bookings_allowed"],
+ as_dict=True,
+ )
+
+ self.uom = uom or self.item_doc.sales_uom or self.item_doc.stock_uom
+ self.duration = get_uom_in_minutes(self.uom)
+
+ if self.item_doc.enable_item_booking and self.duration == 0:
+ if not self.uom:
+ frappe.throw(_("UOM is not set for Item {0}").format(self.item))
+ frappe.throw(_("UOM {0} is not supported").format(self.uom))
+
+ def get_available_slots(self):
+ if not self.item_doc.enable_item_booking or not self.duration:
+ return []
+ if not self.init or not self.finish:
+ return []
+
+ output = []
+
+ seen: set[str] = set()
+ for dt in daterange_including_start(self.init, self.finish):
+ # For each day, get the available slots
+ for slot in self._check_availability(dt):
+ slot_id = slot.get("id")
+ if (not slot_id) or (slot_id not in seen):
+ # Deduplicate events that span multiple days
+ seen.add(slot_id)
+ output.append(slot)
+ if self.limit and len(output) >= self.limit:
+ break
+ if self.limit and len(output) >= self.limit:
+ break
+
+ # filter out exclusions
+ if cal_name := self.item_calendar.get("name"):
+ eng = ItemBookingExceptionEngine(cal_name)
+
+ for op in eng.get_operations():
+ if op.dt_start > self.finish or op.dt_end < self.init:
+ continue
+ if op.type == "+":
+ sch: prepared_schedule_line_t = {
+ "start": op.dt_start,
+ "base_start": op.dt_start,
+ "end": op.dt_end,
+ "whole": False,
+ }
+ slots = self._get_availability_from_schedule([sch])
+ slots = eng.filter_keep_in_range(self.init, self.finish, slots)
+ output.extend(slots)
+ elif op.type == "-":
+ output = eng.filter_from_op(op, output)
+
+ if self.limit:
+ output = output[: self.limit]
+ return output
+
+ @cached_property
+ def item_calendar(self):
+ return get_item_calendar(self.item, self.uom)
+
+ @cached_property
+ def minimum_duration_in_seconds(self):
+ return frappe.cint(self.item_calendar.get("minimum_duration_in_seconds") or 1)
+
+ def _check_availability(self, date):
+ date = getdate(date)
+ if not date:
+ return []
+
+ schedules = self.generate_schedules_for_date(date)
+ yield from self._get_availability_from_schedule(schedules)
+
+ def generate_schedules_for_date(self, date: datetime.date):
+ day = calendar.day_name[date.weekday()]
+
+ dt_now = now_datetime()
+ if cal := self.item_calendar["calendar"]:
+ schedule_for_the_day = filter(lambda x: x.get("day") == day, cal)
+ for line in schedule_for_the_day:
+ line_start, line_end = self.get_schedule_line_as_datetime_tuple(date, line)
+
+ if dt_now >= line_end:
+ continue # The line already ended, no slot can be booked
+
+ start = line_start
+ if dt_now > line_start:
+ # The line already started, some slots need to be skipped
+ new_start = self._round_datetime_in_slot(dt_now, line_start)
+ if new_start < line_end:
+ start = new_start
+ else:
+ start = line_end
+
+ whole = line.get("whole", None)
+ sch: "prepared_schedule_line_t" = {
+ "start": start,
+ "end": line_end,
+ "whole": whole,
+ "base_start": line_start,
+ }
+ yield sch
+
+ def get_schedule_line_as_datetime_tuple(self, date: datetime.date, line):
+ day_start = datetime.datetime.combine(date, get_time(line.start_time))
+ if line.end_time <= line.start_time:
+ # When the end time is before the start time, it means that the schedule ends the next day.
+ date = add_days(date, 1) # type: ignore
+ day_end = datetime.datetime.combine(date, get_time(line.end_time))
+ return (day_start, day_end)
+
+ def _round_datetime_in_slot(
+ self,
+ dt: datetime.datetime,
+ slot_start: datetime.datetime,
+ interval_in_minutes: int | None = None,
+ ):
+ from math import ceil, floor
+
+ assert (
+ dt >= slot_start
+ ), "_round_datetime_in_slot: Datetime to round should be after the beginning of the slot."
+ interval_in_minutes = interval_in_minutes or int(
+ datetime.timedelta(minutes=cint(self.duration)).total_seconds() / 60
+ )
+ if not interval_in_minutes:
+ return dt
+
+ offset = dt - slot_start # How far into the slot is the current time?
+ offset_in_minutes = offset.total_seconds() / 60
+
+ if self.minimum_duration_in_seconds:
+ rounded_offset_in_minutes = interval_in_minutes * floor(offset_in_minutes / interval_in_minutes)
+ new_dt = slot_start + datetime.timedelta(minutes=rounded_offset_in_minutes)
+ if (dt - new_dt).total_seconds() > self.minimum_duration_in_seconds:
+ return new_dt # There is enough time in this slot
+
+ # Round the offset to the nearest interval
+ rounded_offset_in_minutes = interval_in_minutes * ceil(offset_in_minutes / interval_in_minutes)
+ new_dt = slot_start + datetime.timedelta(minutes=rounded_offset_in_minutes)
+ return new_dt
+
+ def _get_availability_from_schedule(self, schedules: "Iterable[prepared_schedule_line_t]"):
+ for line in schedules:
+ # We try to get all the events between line.base_start (start of day) and line.end (end of day).
+ # The base_start is the specified start date in the booking calendar, regardless of the current time.
+ booked_items = _get_events(line.get("base_start"), line.get("end"), item=self.item_doc)
+ scheduled_items = []
+ for event in booked_items:
+ e_start = get_datetime(event.get("starts_on"))
+ e_end = get_datetime(event.get("ends_on"))
+ # Only keep booked events that overlap the schedule slot
+ if e_start and e_end and (e_start < line.get("end")) and (e_end > line.get("base_start")):
+ scheduled_items.append(event)
+
+ yield from self._find_available_slot(line, scheduled_items)
+
+ def _has_whole_slot_overlap(
+ self,
+ line_start: datetime.datetime,
+ line_end: datetime.datetime,
+ current_schedule: list[tuple[datetime.datetime, datetime.datetime]],
+ ):
+ simultaneous_bookings_allowed = self.item_doc.get("simultaneous_bookings_allowed") or 1
+ n_overlaps = 1 # Offset by 1 because we take into account the future slot that can be booked
+ for start, end in current_schedule:
+ if line_start <= end and start <= line_end:
+ # For each overlapping item
+ n_overlaps += 1
+ if n_overlaps >= simultaneous_bookings_allowed:
+ return True
+ return False
+
+ @cached_property
+ def simultaneous_booking_allowed(self):
+ return frappe.db.get_single_value("Venue Settings", "enable_simultaneous_booking")
+
+ def _find_available_slot(self, line: "prepared_schedule_line_t", scheduled_items):
+ slots = []
+ output = []
+
+ if self.user == "*":
+ user_scheduled_items = scheduled_items
+ else:
+ user_scheduled_items = (x for x in scheduled_items if x.get("user") == self.user)
+
+ simultaneous_booking_allowed = self.simultaneous_booking_allowed # TODO: Remove global setting
+ if simultaneous_booking_allowed:
+ scheduled_items = self.check_simultaneaous_bookings(scheduled_items)
+
+ slots.extend(
+ self._get_all_slots(
+ line,
+ simultaneous_booking_allowed,
+ scheduled_items,
+ )
+ )
+
+ if not slots and not scheduled_items:
+ slots.extend(self._get_all_slots(line, simultaneous_booking_allowed))
+
+ for slot in slots:
+ out = self.get_available_dict(slot)
+ try:
+ count = get_booking_count(self.item, out.get("start"), out.get("end"))
+ out.remaining = count.get("remaining")
+ except Exception:
+ pass
+ output.append(out)
+
+ for scheduled_item in user_scheduled_items:
+ added = False
+
+ if scheduled_item.get("status") == "In cart":
+ status = "selected"
+ elif scheduled_item.get("status") == "Confirmed":
+ status = "confirmed"
+ else:
+ status = "blocked"
+
+ for out in output:
+ if (
+ out.get("start") == scheduled_item.get("starts_on").isoformat()
+ and out.get("end") == scheduled_item.get("ends_on").isoformat()
+ ):
+ out.id = scheduled_item.get("name")
+ out.status = status
+ out.number += 1
+ added = True
+
+ # if getdate(out.get("start")) == getdate(scheduled_item.get("starts_on")) or getdate(out.get("end")) == getdate(scheduled_item.get("ends_on")):
+ # out.color = "red"
+
+ if not added:
+ out = self.get_available_dict(scheduled_item, status)
+ out.allDay = scheduled_item.get("all_day")
+ # out.color = "green"
+ out.number += 1
+ output.append(out)
+
+ return output
+
+ def check_simultaneaous_bookings(self, scheduled_items):
+ import itertools
+ from operator import itemgetter
+
+ simultaneous_bookings = self.item_doc.get("simultaneous_bookings_allowed") or 1
+ sorted_schedule = sorted(scheduled_items, key=itemgetter("starts_on")) # always sort
+ for _key, group in itertools.groupby(sorted_schedule, key=lambda x: x["starts_on"]):
+ grouped_sch = [x.get("name") for x in list(group)]
+ if len(grouped_sch) == simultaneous_bookings:
+ scheduled_items = [x for x in scheduled_items if x.get("name") not in grouped_sch[:-1]]
+ elif len(grouped_sch) < simultaneous_bookings:
+ scheduled_items = [x for x in scheduled_items if x.get("name") not in grouped_sch]
+
+ return scheduled_items
+
+ def _get_all_slots(
+ self, line: "prepared_schedule_line_t", simultaneous_booking_allowed, scheduled_items=None
+ ):
+ line_start = line.get("start")
+ line_end = line.get("end")
+
+ assert isinstance(line_start, datetime.datetime)
+ assert isinstance(line_end, datetime.datetime)
+
+ # Compute existing items
+ if not scheduled_items:
+ scheduled_items = []
+
+ current_schedule = []
+ for scheduled_item in scheduled_items:
+ sch_start = get_datetime(scheduled_item.get("starts_on"))
+ sch_end = get_datetime(scheduled_item.get("ends_on"))
+ if not sch_start or not sch_end:
+ continue # For some reason this fails
+ if sch_start < line_start:
+ # Ok, but the scheduled item begins before the current slot, trim it.
+ current_schedule.append((line_start, sch_end))
+ elif sch_start < line_end:
+ # Ok, the scheduled item ends before the end of the slot, keep it.
+ current_schedule.append((sch_start, sch_end))
+
+ if line.get("whole"):
+ # Too small to fit a slot
+ if (line_end - line_start).total_seconds() < self.minimum_duration_in_seconds:
+ return []
+
+ if self._has_whole_slot_overlap(line_start, line_end, current_schedule):
+ return []
+
+ return [{"starts_on": line_start, "ends_on": line_end}]
+
+ interval = datetime.timedelta(minutes=cint(self.duration))
+
+ slots = sorted([(line_start, line_start), (line_end, line_end)])
+
+ if simultaneous_booking_allowed:
+ vanilla_start_times = []
+ for start, end in ((slots[i][0], slots[i + 1][0]) for i in range(len(slots) - 1)):
+ while start + interval <= end:
+ vanilla_start_times.append(start)
+ start += interval
+
+ if current_schedule:
+ sorted_schedule = list(reduced(sorted(current_schedule, key=lambda x: x[0])))
+ slots = sorted([(line_start, line_start), *sorted_schedule, (line_end, line_end)])
+
+ free_slots = []
+ for start, end in ((slots[i][1], slots[i + 1][0]) for i in range(len(slots) - 1)):
+ while start + interval <= end:
+ if simultaneous_booking_allowed:
+ if start not in vanilla_start_times:
+ vanilla_start = [x for x in vanilla_start_times if start + interval <= x]
+ if vanilla_start:
+ start = vanilla_start[0]
+ free_slots.append({"starts_on": start, "ends_on": start + interval})
+ start += interval
+
+ return free_slots
+
+ def get_available_dict(self, slot, status=None):
+ """
+ Status can be:
+ - available
+ - selected
+ """
+ return frappe._dict(
+ {
+ "start": slot.get("starts_on").isoformat(),
+ "end": slot.get("ends_on").isoformat(),
+ "id": slot.get("name") or frappe.generate_hash(length=8),
+ "status": status or "available",
+ "number": 0,
+ "total_available": self.item_doc.get("simultaneous_bookings_allowed"),
+ # "display": "background",
+ # "color": None,
+ # "allDay": 1,
+ }
+ )
+
+
+@frappe.whitelist()
+def get_events_for_calendar(doctype, start, end, field_map, filters=None, fields=None):
+ assert doctype in (
+ None,
+ "",
+ "Item Booking",
+ ), "get_events_for_calendar: expected the doctype to be Item Booking"
+ # Note: we ignore the doctype because we return Item Booking and Subscription objects
+ if isinstance(field_map, str):
+ field_map: dict = frappe.parse_json(field_map)
+
+ if fields and isinstance(fields, str):
+ fields: list = frappe.parse_json(fields)
+
+ fields = fields or [] # default value
+
+ for f in field_map.values():
+ dt = doctype
+ doc_meta = frappe.get_meta(dt)
+ if doc_meta.has_field(f):
+ fields.append(f)
+
+ if filters and isinstance(filters, str):
+ filters: dict | list = frappe.parse_json(filters)
+ if isinstance(filters, list):
+ # Normalize the filters to [table, field, operator, value]
+ for i, f in enumerate(filters):
+ if len(f) >= 4:
+ f = [f[0], f[1], f[2], f[3]]
+ elif len(f) == 3:
+ f = [doctype, f[0], f[1], f[2]]
+ filters[i] = f
+
+ events: list = _get_events(start, end, item=None, user=None, filters=filters, fields=fields)
+ return events
+
+
+def _get_events(
+ start, end, item=None, user=None, filters: list | dict | None = None, fields: list | None = None
+):
+ from pypika import Criterion
+ from pypika import functions as fn
+
+ if user == "Guest":
+ return []
+
+ assert (not fields) or isinstance(
+ fields, (list | tuple | set)
+ ), "`fields` parameters must be a list, tuple, set, or None"
+ filters = filters or []
+
+ IB = frappe.qb.DocType("Item Booking")
+ all_fields = list(
+ {
+ "starts_on",
+ "ends_on",
+ "all_day",
+ IB.item.as_("item_name"),
+ "name",
+ "repeat_this_event",
+ "rrule",
+ "user",
+ "status",
+ *(fields or []),
+ }
+ )
+
+ time_condition_1 = (IB.starts_on < end) & (IB.ends_on > start)
+ time_condition_2 = (
+ (IB.starts_on < end)
+ & (IB.repeat_this_event == 1)
+ & (fn.Coalesce(IB.repeat_till, "3000-01-01") > start)
+ )
+
+ extra_conditions = []
+ if item:
+ item_name = item if isinstance(item, str) else item.name
+ extra_conditions.append(IB.item == item_name)
+ if user:
+ customers, _ = get_customers_suppliers("Customer", user)
+ cond = IB.user == user
+ if customers:
+ cond |= IB.party_name.isin(customers)
+ extra_conditions.append(cond)
+
+ query = (
+ frappe.qb.get_query("Item Booking", filters=filters)
+ .select(*all_fields)
+ .where(IB.status != "Cancelled")
+ .where(time_condition_1 | time_condition_2)
+ .where(Criterion.all(extra_conditions))
+ )
+ events = query.run(as_dict=1)
+
+ # Note: do not forward the fields/filters arguments to _get_subscriptions_as_events
+ subscriptions_as_events = _get_subscriptions_as_events(
+ start, end, item=item, user=user, fields=None, filters=None
+ )
+ events += subscriptions_as_events
+
+ result = []
+
+ for event in events:
+ if event.get("repeat_this_event") == 1:
+ result.extend(process_recurring_events(event, start, end, "starts_on", "ends_on", "rrule"))
+ else:
+ result.append(event)
+
+ return result
+
+
+def _get_subscriptions_as_events(start, end, item=None, user=None, fields=None, filters=None):
+ subscriptions = _get_booking_subscriptions_between(
+ start, end, item=item, user=user, fields=fields, filters=filters
+ )
+ display_subscriptions = bool(frappe.db.get_single_value("Venue Settings", "display_subscriptions_in_calendar"))
+
+ events = []
+ for sub in subscriptions:
+ qty = sub["qty"]
+ booked_item = sub["booked_item"]
+ customer = sub["customer"]
+
+ title = booked_item
+ if qty > 1:
+ title = f"{qty} × {title}"
+ if customer:
+ title += " - " + customer
+
+ title = frappe._("{0}: {1}").format(
+ frappe._("Subscription"),
+ title,
+ )
+
+ events.append(
+ frappe._dict(
+ {
+ **sub,
+ "starts_on": get_datetime(sub["start"]),
+ "ends_on": get_datetime(sub["end"]),
+ "item_name": booked_item,
+ "title": title,
+ "name": sub["name"],
+ "doctype": "Subscription",
+ # "repeat_this_event": 1,
+ # "rrule": "RRULE:FREQ=HOURLY",
+ # "user": sub["_customers"][0] if sub["_customers"] and len(sub["_customers"]) > 0,
+ "status": "Confirmed",
+ "all_day": 1,
+ "startEditable": False,
+ "durationEditable": False,
+ "display": 'auto' if display_subscriptions else 'none'
+ }
+ )
+ )
+ return events
+
+
+def _get_booking_subscriptions_between(
+ after_date,
+ before_date,
+ item: str | None = None,
+ user: str | None = None,
+ fields: list | None = None,
+ filters: list | dict | None = None,
+):
+ from pypika import Criterion, Field
+ from pypika import functions as fn
+
+ Subscription = frappe.qb.DocType("Subscription")
+ SubscriptionPlanDetail = frappe.qb.DocType("Subscription Plan Detail")
+
+ doc_meta = frappe.get_meta("Subscription")
+ all_fields = []
+ if fields:
+ fields.extend(fieldname for fieldname in fields if doc_meta.has_field(fieldname))
+ for d in doc_meta.fields:
+ if d.fieldtype == "Color":
+ all_fields.append(Subscription.field(d.fieldname).as_("color"))
+ break
+
+ start_field = fn.Coalesce(SubscriptionPlanDetail.from_date, Subscription.start, "0000-00-00")
+ end_field = fn.Coalesce(SubscriptionPlanDetail.to_date, Subscription.cancellation_date, "9999-01-01")
+
+ item_field: Field = SubscriptionPlanDetail.booked_item
+
+ all_filters = [
+ start_field < before_date,
+ end_field > after_date,
+ ]
+ if item:
+ item_name = item if isinstance(item, str) else item.name
+ all_filters.append(item_field == item_name) # Must book this exact item
+ else:
+ all_filters.append(item_field.isnotnull()) # Must be a booking subscription
+
+ if user:
+ customers, _ = get_customers_suppliers("Customer", user)
+ if customers:
+ all_filters.append(Subscription.customer.isin(customers))
+ else:
+ return []
+
+ all_fields.extend(
+ (
+ Subscription.name.as_("name"),
+ SubscriptionPlanDetail.name.as_("plan_detail_name"),
+ SubscriptionPlanDetail.qty,
+ start_field.as_("start"),
+ end_field.as_("end"),
+ item_field.as_("booked_item"),
+ Subscription.customer.as_("customer"),
+ )
+ )
+
+ query = (
+ frappe.qb.get_query("Subscription", filters=(filters or []))
+ .select(*all_fields)
+ .join(SubscriptionPlanDetail)
+ .on(
+ (Subscription.name == SubscriptionPlanDetail.parent)
+ & (SubscriptionPlanDetail.parenttype == "Subscription")
+ ) # NOTE: Plans are present in both Subscription and Subscription template
+ .where(Criterion.all(all_filters))
+ )
+
+ subscriptions = query.run(as_dict=True)
+
+ # Update subscriptions to strip the time component in the datetime
+ for s in subscriptions:
+ if abs(round(s["qty"]) - s["qty"]) > 1e-6:
+ raise ValueError("Non integer quantity of booked slots.")
+ s["start"] = getdate(s["start"])
+ s["end"] = getdate(s["end"])
+ s["qty"] = int(s["qty"])
+ s["color"] = s.get("color", "#77bbff")
+
+ return subscriptions
+
+
+def get_item_calendar(item: str | None = None, uom: str | None = None) -> "item_calendar_t":
+ # Override the calendar with a custom one
+ for hook in frappe.get_hooks("get_item_booking_calendar"):
+ calendar = frappe.call(hook, item=item, uom=uom)
+ if not calendar:
+ continue
+ if isinstance(calendar, str):
+ calendar = frappe.get_doc("Item Booking Calendar", calendar)
+ if calendar:
+ lines = calendar.get("booking_calendar")
+ lines = list(map(lambda x: x.as_dict(), lines))
+ return {
+ "type": str(calendar.get("type")),
+ "calendar": lines, # type: ignore
+ "name": str(calendar.name),
+ "minimum_duration_in_seconds": calendar.get("minimum_duration_in_seconds"),
+ }
+
+ if item and not uom:
+ uom = frappe.get_cached_value("Item", item, "sales_uom")
+
+ item = item or ""
+ uom = uom or ""
+
+ for filters in [
+ dict(item=item, uom=uom),
+ dict(item=item, uom=""),
+ dict(item="", uom=uom),
+ dict(item="", uom=""),
+ ]:
+ filtered_calendars = frappe.get_all(
+ "Item Booking Calendar",
+ fields=["name", "item", "uom"],
+ filters=filters,
+ limit=1,
+ )
+ if filtered_calendars:
+ return {
+ "type": "Daily",
+ "calendar": frappe.get_all(
+ "Item Booking Calendars",
+ filters={"parent": filtered_calendars[0].name, "parenttype": "Item Booking Calendar"},
+ fields=["start_time", "end_time", "day", "whole"],
+ ),
+ "name": filtered_calendars[0].name,
+ }
+
+ return {"type": "Daily", "calendar": [], "name": None}
+
+
+def get_uom_in_minutes(uom=None, minute_uom=None):
+ minute_uom = minute_uom or frappe.db.get_single_value("Venue Settings", "minute_uom")
+ if uom == minute_uom:
+ return 1
+
+ return frappe.db.get_value("UOM Conversion Factor", dict(from_uom=uom, to_uom=minute_uom), "value") or 0
+
+
+def get_sales_qty(item, start, end):
+ minute_uom = frappe.db.get_single_value("Venue Settings", "minute_uom")
+ sales_uom = frappe.get_cached_value("Item", item, "sales_uom") or frappe.get_cached_value(
+ "Item", item, "stock_uom"
+ )
+ duration = time_diff_in_minutes(end, start)
+
+ if sales_uom == minute_uom:
+ return duration
+
+ conversion_factor = (
+ frappe.db.get_value("UOM Conversion Factor", dict(from_uom=sales_uom, to_uom=minute_uom), "value")
+ or 1
+ )
+
+ return flt(duration) / flt(conversion_factor)
+
+
+def daterange(start_date, end_date):
+ if start_date < get_datetime(now()):
+ start_date = datetime.datetime.now().replace(
+ hour=0, minute=0, second=0, microsecond=0
+ ) + datetime.timedelta(days=1)
+ for n in range(int((end_date - start_date).days)):
+ yield start_date + timedelta(n)
+
+
+def daterange_including_start(start_date, end_date):
+ if start_date < get_datetime(now()):
+ start_date = datetime.datetime.now()
+ start_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
+ for n in range(int((end_date - start_date).days)):
+ yield start_date + timedelta(n)
+
+
+def reduced(timeseries):
+ prev = datetime.datetime.min
+ for start, end in timeseries:
+ if end > prev:
+ prev = end
+ yield start, end
+
+
+def delete_linked_item_bookings(doc, method):
+ for item in doc.items:
+ if item.item_booking:
+ frappe.delete_doc("Item Booking", item.item_booking, ignore_permissions=True, force=True)
+
+
+def confirm_linked_item_bookings(doc, method):
+ confirm_after_payment = cint(
+ frappe.db.get_single_value("Venue Settings", "confirm_booking_after_payment")
+ )
+ for item in doc.items:
+ if item.item_booking:
+ slot = frappe.get_doc("Item Booking", item.item_booking)
+ slot.flags.ignore_permissions = True
+ slot.set_status("Not confirmed" if confirm_after_payment else "Confirmed")
+
+
+def clear_draft_bookings():
+ drafts = frappe.get_all("Item Booking", filters={"status": "In cart"}, fields=["name", "modified"])
+ if not drafts:
+ return
+
+ clearing_duration = frappe.db.get_value("Venue Settings", None, "clear_item_booking_draft_duration")
+
+ if cint(clearing_duration) <= 0:
+ return
+
+ for draft in drafts:
+ if now_datetime() > draft.get("modified") + datetime.timedelta(minutes=cint(clearing_duration)):
+ remove_booked_slot(draft.get("name"))
+
+
+@frappe.whitelist()
+def make_quotation(source_name, target_doc=None):
+ def set_missing_values(source, target):
+ quotation: "Quotation" = frappe.get_doc(target)
+ quotation.order_type = "Maintenance"
+ company_currency = frappe.get_cached_value("Company", quotation.company, "default_currency")
+
+ if quotation.quotation_to == "Customer" and quotation.party_name:
+ party_account_currency = get_party_account_currency(
+ "Customer", quotation.party_name, quotation.company
+ )
+ else:
+ party_account_currency = company_currency
+
+ quotation.currency = party_account_currency or company_currency
+
+ if company_currency == quotation.currency:
+ exchange_rate = 1
+ else:
+ exchange_rate = get_exchange_rate(
+ quotation.currency, company_currency, quotation.transaction_date, args="for_selling"
+ )
+
+ quotation.conversion_rate = exchange_rate
+
+ # add item
+ quotation.append(
+ "items",
+ {
+ "item_code": source.item,
+ "qty": get_sales_qty(source.item, source.starts_on, source.ends_on),
+ "uom": frappe.get_cached_value("Item", source.item, "sales_uom"),
+ "item_booking": source.name,
+ },
+ )
+
+ quotation.run_method("set_missing_values")
+ quotation.run_method("set_other_charges")
+ quotation.run_method("calculate_taxes_and_totals")
+
+ doclist = get_mapped_doc(
+ "Item Booking",
+ source_name,
+ {
+ "Item Booking": {
+ "doctype": "Quotation",
+ "field_map": {"party_type": "quotation_to"},
+ "field_no_map": ["status"],
+ }
+ },
+ target_doc,
+ set_missing_values,
+ )
+
+ return doclist
+
+
+@frappe.whitelist()
+def make_sales_order(source_name, target_doc=None):
+ def set_missing_values(source, target):
+ from erpnext.controllers.accounts_controller import get_default_taxes_and_charges
+
+ sales_order = frappe.get_doc(target)
+ sales_order.order_type = "Maintenance"
+ company_currency = frappe.get_cached_value("Company", sales_order.company, "default_currency")
+
+ party_account_currency = get_party_account_currency(
+ "Customer", sales_order.customer, sales_order.company
+ )
+
+ sales_order.currency = party_account_currency or company_currency
+
+ if company_currency == sales_order.currency:
+ exchange_rate = 1
+ else:
+ exchange_rate = get_exchange_rate(
+ sales_order.currency, company_currency, sales_order.transaction_date, args="for_selling"
+ )
+
+ sales_order.conversion_rate = exchange_rate
+
+ # add item
+ sales_order.append(
+ "items",
+ {
+ "item_code": source.item,
+ "qty": get_sales_qty(source.item, source.starts_on, source.ends_on),
+ "uom": frappe.get_cached_value("Item", source.item, "sales_uom"),
+ "item_booking": source.name,
+ },
+ )
+
+ # get default taxes
+ taxes = get_default_taxes_and_charges("Sales Taxes and Charges Template", company=sales_order.company)
+ if taxes.get("taxes"):
+ sales_order.update(taxes)
+
+ sales_order.run_method("set_missing_values")
+ sales_order.run_method("calculate_taxes_and_totals")
+
+ doclist = get_mapped_doc(
+ "Item Booking",
+ source_name,
+ {"Item Booking": {"doctype": "Sales Order", "field_map": {"party_name": "customer"}}},
+ target_doc,
+ set_missing_values,
+ )
+
+ return doclist
+
+
+@frappe.whitelist()
+def make_booking_credit_usage(source_name, target_doc=None):
+ def set_missing_values(source, target):
+ result = get_booking_credit_types_for_item(source.item, source.uom)
+ if result:
+ target.booking_credit_type = result[0]
+
+ doclist = get_mapped_doc(
+ "Item Booking",
+ source_name,
+ {"Item Booking": {"doctype": "Booking Credit Usage", "field_map": {"party_name": "customer"}}},
+ target_doc,
+ set_missing_values,
+ )
+
+ return doclist
+
+
+def register_google_calendar_item(doc, method, *args, **kwargs):
+ if method == "on_trash":
+ for item_name in frappe.get_all("Item", filters={"google_calendar": doc.name}):
+ frappe.get_doc("Item", item_name.name).update({"google_calendar": ""}).save()
+ return
+
+ if doc.reference_document != "Item Booking":
+ return
+ elif doc.booking_item_code:
+ frappe.get_doc("Item", doc.booking_item_code).update({"google_calendar": doc.name}).save()
+ doc.booking_item_code = "" # The field is just there to make it easier to setup
+
+
+def get_calendar_item(account):
+ return frappe.db.get_value(
+ "Item", dict(google_calendar=account.name, disabled=0), ["item_code", "calendar_color"]
+ )
+
+
+def insert_event_to_calendar(account, event, recurrence=None):
+ """
+ Inserts event in Dokos Calendar during Sync
+ """
+ start = event.get("start")
+ end = event.get("end")
+
+ if item_values := get_calendar_item(account):
+ item, color = item_values
+ else:
+ frappe.throw(_("Item not found for Google Calendar {0}").format(repr(account.name)))
+
+ calendar_event = {
+ "doctype": "Item Booking",
+ "item": item,
+ "color": color,
+ "notes": event.get("description"),
+ "sync_with_google_calendar": 1,
+ "google_calendar": account.name,
+ "google_calendar_id": account.google_calendar_id,
+ "google_calendar_event_id": event.get("id"),
+ "rrule": recurrence,
+ "starts_on": get_datetime(start.get("date"))
+ if start.get("date")
+ else get_timezone_naive_datetime(start),
+ "ends_on": get_datetime(end.get("date")) if end.get("date") else get_timezone_naive_datetime(end),
+ "all_day": 1 if start.get("date") else 0,
+ "repeat_this_event": 1 if recurrence else 0,
+ "status": "Confirmed",
+ }
+ doc = frappe.get_doc(calendar_event)
+ doc.flags.pulled_from_google_calendar = True
+ doc.insert(ignore_permissions=True)
+
+
+def update_event_in_calendar(account, event, recurrence=None):
+ """
+ Updates Event in Dokos Calendar if any existing Google Calendar Event is updated
+ """
+ start = event.get("start")
+ end = event.get("end")
+
+ calendar_event = frappe.get_doc("Item Booking", {"google_calendar_event_id": event.get("id")})
+ item, _ = get_calendar_item(account)
+
+ updated_event = {
+ "item": item,
+ "notes": event.get("description"),
+ "rrule": recurrence,
+ "starts_on": get_datetime(start.get("date"))
+ if start.get("date")
+ else get_timezone_naive_datetime(start),
+ "ends_on": get_datetime(end.get("date")) if end.get("date") else get_timezone_naive_datetime(end),
+ "all_day": 1 if start.get("date") else 0,
+ "repeat_this_event": 1 if recurrence else 0,
+ "status": "Confirmed",
+ }
+
+ update = False
+ for field in updated_event:
+ if field == "rrule" and recurrence:
+ update = calendar_event.get(field) is None or (
+ set(calendar_event.get(field).split(";")) != set(updated_event.get(field).split(";"))
+ )
+ else:
+ update = str(calendar_event.get(field)) != str(updated_event.get(field))
+ if update:
+ break
+
+ if update:
+ calendar_event.update(updated_event)
+ calendar_event.flags.pulled_from_google_calendar = True
+ calendar_event.save()
+
+
+def cancel_event_in_calendar(account, event):
+ # If any synced Google Calendar Event is cancelled, then close the Event
+ add_comment = False
+
+ if booking_event := frappe.db.exists(
+ "Item Booking",
+ {"google_calendar_id": account.google_calendar_id, "google_calendar_event_id": event.get("id")},
+ ):
+ booking = frappe.get_doc("Item Booking", booking_event)
+
+ try:
+ booking.flags.pulled_from_google_calendar = True
+ booking.delete()
+ add_comment = False
+ except frappe.LinkExistsError:
+ # Try to delete event, but only if it has no links
+ add_comment = True
+
+ if add_comment:
+ frappe.get_doc(
+ {
+ "doctype": "Comment",
+ "comment_type": "Info",
+ "reference_doctype": "Item Booking",
+ "reference_name": booking.get("name"),
+ "content": " {}".format(_("- Event deleted from Google Calendar.")),
+ }
+ ).insert(ignore_permissions=True)
+
+
+def insert_event_in_google_calendar(doc, method=None):
+ """
+ Insert Events in Google Calendar if sync_with_google_calendar is checked.
+ """
+ if (
+ not doc.sync_with_google_calendar
+ or doc.flags.pulled_from_google_calendar
+ or not frappe.db.exists("Google Calendar", {"name": doc.google_calendar, "push_to_google_calendar": 1})
+ ):
+ return
+
+ google_calendar, account = get_google_calendar_object(doc.google_calendar)
+
+ event = {
+ "summary": doc.title,
+ "description": doc.notes,
+ "recurrence": [doc.rrule] if doc.repeat_this_event and doc.rrule else [],
+ }
+ event.update(
+ format_date_according_to_google_calendar(
+ doc.get("all_day", 0), get_datetime(doc.starts_on), get_datetime(doc.ends_on)
+ )
+ )
+
+ try:
+ event = google_calendar.events().insert(calendarId=doc.google_calendar_id, body=event).execute()
+ doc.db_set("google_calendar_event_id", event.get("id"), update_modified=False)
+ frappe.publish_realtime(
+ "event_synced", {"message": _("Event Synced with Google Calendar.")}, user=frappe.session.user
+ )
+ except HttpError as err:
+ frappe.msgprint(f'{_("Google Error")}: {json.loads(err.content)["error"]["message"]}')
+ frappe.throw(
+ _("Google Calendar - Could not insert event in Google Calendar {0}, error code {1}.").format(
+ account.name, err.resp.status
+ )
+ )
+
+
+def update_event_in_google_calendar(doc, method=None):
+ """
+ Updates Events in Google Calendar if any existing event is modified in Dokos Calendar
+ """
+ # Workaround to avoid triggering update when Event is being inserted since
+ # creation and modified are same when inserting doc
+ if (
+ doc.flags.pulled_from_google_calendar
+ or doc.modified == doc.creation
+ or not doc.sync_with_google_calendar
+ or not frappe.db.exists("Google Calendar", {"name": doc.google_calendar, "push_to_google_calendar": 1})
+ ):
+ return
+
+ if doc.sync_with_google_calendar and not doc.google_calendar_event_id:
+ # If sync_with_google_calendar is checked later, then insert the event rather than updating it.
+ return insert_event_in_google_calendar(doc)
+
+ google_calendar, dummy = get_google_calendar_object(doc.google_calendar)
+
+ try:
+ event = (
+ google_calendar.events()
+ .get(calendarId=doc.google_calendar_id, eventId=doc.google_calendar_event_id)
+ .execute()
+ )
+ event["summary"] = doc.title
+ event["description"] = doc.notes
+ event["recurrence"] = [doc.rrule] if doc.repeat_this_event and doc.rrule else []
+ event["status"] = "cancelled" if doc.status == "Cancelled" else "confirmed"
+ event.update(
+ format_date_according_to_google_calendar(
+ doc.get("all_day", 0), get_datetime(doc.starts_on), get_datetime(doc.ends_on)
+ )
+ )
+
+ google_calendar.events().update(
+ calendarId=doc.google_calendar_id, eventId=doc.google_calendar_event_id, body=event
+ ).execute()
+ frappe.publish_realtime(
+ "event_synced", {"message": _("Event Synced with Google Calendar.")}, user=frappe.session.user
+ )
+ except HttpError as err:
+ frappe.msgprint(f'{_("Google Error")}: {json.loads(err.content)["error"]["message"]}')
+ frappe.throw(
+ _("Google Calendar - Could not update Event {0} in Google Calendar, error code {1}.").format(
+ doc.name, err.resp.status
+ )
+ )
+
+
+def delete_event_in_google_calendar(doc, method=None):
+ """
+ Delete Events from Google Calendar if Item Booking is deleted.
+ """
+
+ if (
+ not doc.google_calendar_event_id
+ or doc.flags.pulled_from_google_calendar
+ or not doc.sync_with_google_calendar
+ or not frappe.db.exists("Google Calendar", {"name": doc.google_calendar, "push_to_google_calendar": 1})
+ ):
+ return
+
+ google_calendar, account = get_google_calendar_object(doc.google_calendar)
+
+ try:
+ event = (
+ google_calendar.events()
+ .get(calendarId=doc.google_calendar_id, eventId=doc.google_calendar_event_id)
+ .execute()
+ )
+ event["recurrence"] = None
+ event["status"] = "cancelled"
+
+ google_calendar.events().update(
+ calendarId=doc.google_calendar_id, eventId=doc.google_calendar_event_id, body=event
+ ).execute()
+ except HttpError as err:
+ frappe.msgprint(f'{_("Google Error")}: {json.loads(err.content)["error"]["message"]}')
+ frappe.msgprint(
+ _("Google Calendar - Could not delete Event {0} from Google Calendar, error code {1}.").format(
+ doc.name, err.resp.status
+ )
+ )
+
+
+@frappe.whitelist()
+def get_corresponding_party(user):
+ customers, leads = get_linked_customers(user)
+ party_type = party_name = None
+ if customers:
+ party_type = "Customer"
+ party_name = customers[0]
+
+ elif leads:
+ party_type = "Lead"
+ party_name = leads[0]
+
+ return party_type, party_name
+
+
+def move_booking_with_event(doc, method):
+ doc_before_save = doc.get_doc_before_save()
+ if doc_before_save and getdate(doc_before_save.starts_on) != getdate(doc.starts_on):
+ days = date_diff(doc.starts_on, doc_before_save.starts_on)
+ bookings = frappe.get_all(
+ "Item Booking", filters={"event": doc.name}, fields=["name", "starts_on", "ends_on"]
+ )
+
+ for booking in bookings:
+ doc = frappe.get_doc("Item Booking", booking.name)
+ doc.starts_on = add_days(booking.starts_on, days)
+ doc.ends_on = add_days(booking.ends_on, days)
+ doc.flags.ignore_permissions = True
+ doc.save()
+
+
+@frappe.whitelist()
+def get_booking_count(item=None, starts_on=None, ends_on=None):
+ if not item:
+ return
+
+ if not starts_on:
+ starts_on = now_datetime()
+ if not ends_on:
+ ends_on = starts_on
+
+ starts_on, ends_on = get_datetime(starts_on), get_datetime(ends_on)
+
+ simultaneous_bookings_enabled = frappe.db.get_single_value(
+ "Venue Settings", "enable_simultaneous_booking"
+ )
+ item_doc = frappe.get_doc("Item", item)
+
+ if simultaneous_bookings_enabled:
+ capacity = cint(item_doc.get("simultaneous_bookings_allowed")) # type: ignore
+ else:
+ capacity = 1
+
+ events = _get_events(starts_on, ends_on, item=item_doc)
+ current = len(events)
+
+ return {"capacity": capacity, "current": current, "remaining": capacity - current}
+
+
+# def get_simultaneous_bookings(scheduled_items, timeslot, simultaneous_bookings=None):
+# import itertools
+# from operator import itemgetter
+# count = 0
+# if cint(simultaneous_bookings) > 1:
+# sorted_schedule = sorted(scheduled_items, key=itemgetter("starts_on"))
+# for key, group in itertools.groupby(sorted_schedule, key=lambda x: x["starts_on"]):
+# group_count = 0
+# for slot in group:
+# if get_datetime(timeslot[1]) > slot.get("starts_on") and get_datetime(timeslot[0]) < slot.get(
+# "ends_on"
+# ):
+# group_count += 1
+# count = max(count, group_count)
+# return count
diff --git a/bookings/bookings/doctype/venue_settings/venue_settings.json b/bookings/bookings/doctype/venue_settings/venue_settings.json
new file mode 100644
index 0000000000000000000000000000000000000000..7e777b33810fe99f7eae66e31a69606a9585e73d
--- /dev/null
+++ b/bookings/bookings/doctype/venue_settings/venue_settings.json
@@ -0,0 +1,241 @@
+{
+ "actions": [],
+ "creation": "2020-08-03 08:10:42.541814",
+ "doctype": "DocType",
+ "editable_grid": 1,
+ "engine": "InnoDB",
+ "field_order": [
+ "item_booking_section",
+ "clear_item_booking_draft_duration",
+ "confirm_booking_after_payment",
+ "column_break_4",
+ "enable_simultaneous_booking",
+ "no_overlap_per_item",
+ "sync_with_google_calendar",
+ "allow_event_cancellation",
+ "display_subscriptions_in_calendar",
+ "cancellation_delay",
+ "role_allowed_to_skip_cart",
+ "event_registration_section",
+ "registration_item_code",
+ "multi_venue_section",
+ "enable_multi_companies",
+ "cart_settings_overrides",
+ "unit_of_measure_tab",
+ "short_bookings_section",
+ "minute_uom",
+ "venue_units_of_measure",
+ "section_break_20",
+ "month_uom",
+ "column_break_uuvm",
+ "week_uom",
+ "column_break_jmrs",
+ "day_uom",
+ "section_break_lnrr",
+ "venue_long_uoms"
+ ],
+ "fields": [
+ {
+ "fieldname": "item_booking_section",
+ "fieldtype": "Section Break",
+ "label": "Item Booking"
+ },
+ {
+ "default": "15",
+ "description": "Minimum 3 minutes.
Set 0 to disable this functionality.",
+ "fieldname": "clear_item_booking_draft_duration",
+ "fieldtype": "Int",
+ "label": "Clear drafts after x minutes"
+ },
+ {
+ "default": "0",
+ "description": "If not set, the booking will be considered confirmed when the order is placed",
+ "fieldname": "confirm_booking_after_payment",
+ "fieldtype": "Check",
+ "label": "Confirm booking after payment"
+ },
+ {
+ "fieldname": "column_break_4",
+ "fieldtype": "Column Break"
+ },
+ {
+ "default": "0",
+ "fieldname": "enable_simultaneous_booking",
+ "fieldtype": "Check",
+ "label": "Enable simultaneous booking"
+ },
+ {
+ "default": "0",
+ "fieldname": "no_overlap_per_item",
+ "fieldtype": "Check",
+ "label": "Do not allow overlapping bookings for the same item on desk"
+ },
+ {
+ "default": "0",
+ "description": "Applicable for shopping cart bookings",
+ "fieldname": "sync_with_google_calendar",
+ "fieldtype": "Check",
+ "label": "Automatically synchronize with Google Calendar"
+ },
+ {
+ "default": "0",
+ "fieldname": "allow_event_cancellation",
+ "fieldtype": "Check",
+ "label": "Allow cancellation of item bookings on the portal"
+ },
+ {
+ "default": "86400",
+ "depends_on": "eval:doc.allow_event_cancellation",
+ "description": "Users will not be able to cancel later than this delay before the appointment",
+ "fieldname": "cancellation_delay",
+ "fieldtype": "Duration",
+ "label": "Cancellation delay"
+ },
+ {
+ "description": "Users with this role will be able to make bookings without having to validate a shopping cart and create a sales order",
+ "fieldname": "role_allowed_to_skip_cart",
+ "fieldtype": "Link",
+ "label": "Role allowed to skip shopping cart",
+ "options": "Role"
+ },
+ {
+ "fieldname": "event_registration_section",
+ "fieldtype": "Section Break",
+ "label": "Event Registration"
+ },
+ {
+ "fieldname": "registration_item_code",
+ "fieldtype": "Link",
+ "label": "Billed Item for registration",
+ "options": "Item"
+ },
+ {
+ "fieldname": "multi_venue_section",
+ "fieldtype": "Tab Break",
+ "label": "Multi-venue mode"
+ },
+ {
+ "default": "0",
+ "fieldname": "enable_multi_companies",
+ "fieldtype": "Check",
+ "label": "Enable"
+ },
+ {
+ "depends_on": "enable_multi_companies",
+ "fieldname": "cart_settings_overrides",
+ "fieldtype": "Table",
+ "label": "Allowed companies",
+ "mandatory_depends_on": "enable_multi_companies",
+ "options": "Venue Cart Settings"
+ },
+ {
+ "fieldname": "unit_of_measure_tab",
+ "fieldtype": "Tab Break",
+ "label": "Units of Measure"
+ },
+ {
+ "fieldname": "short_bookings_section",
+ "fieldtype": "Section Break",
+ "label": "Short Bookings"
+ },
+ {
+ "description": "Used for slots calculation.
Please add an UOM conversion factor between each UOM used in booked items and this UOM.",
+ "fieldname": "minute_uom",
+ "fieldtype": "Link",
+ "label": "Minute UOM",
+ "mandatory_depends_on": "eval:doc.venue_units_of_measure?.length",
+ "options": "UOM"
+ },
+ {
+ "description": "Example: For the unit Half-Day set a duration of 4 hours.",
+ "fieldname": "venue_units_of_measure",
+ "fieldtype": "Table",
+ "label": "Conversion Table",
+ "options": "Venue Units of Measure"
+ },
+ {
+ "fieldname": "section_break_20",
+ "fieldtype": "Section Break",
+ "hide_border": 1,
+ "label": "Long Bookings"
+ },
+ {
+ "fieldname": "month_uom",
+ "fieldtype": "Link",
+ "label": "Month UOM",
+ "options": "UOM"
+ },
+ {
+ "fieldname": "column_break_uuvm",
+ "fieldtype": "Column Break"
+ },
+ {
+ "fieldname": "week_uom",
+ "fieldtype": "Link",
+ "label": "Week UOM",
+ "options": "UOM"
+ },
+ {
+ "fieldname": "column_break_jmrs",
+ "fieldtype": "Column Break"
+ },
+ {
+ "fieldname": "day_uom",
+ "fieldtype": "Link",
+ "label": "Day UOM",
+ "options": "UOM"
+ },
+ {
+ "fieldname": "section_break_lnrr",
+ "fieldtype": "Section Break"
+ },
+ {
+ "description": "Example: For the unit '3 month' set a quantity of 3 and a target unit of Month.",
+ "fieldname": "venue_long_uoms",
+ "fieldtype": "Table",
+ "label": "Conversion Table",
+ "options": "Venue UOM Conversion"
+ },
+ {
+ "default": "1",
+ "description": "If checked, the subscriptions linked to a resource will be displayed on the desk calendar",
+ "fieldname": "display_subscriptions_in_calendar",
+ "fieldtype": "Check",
+ "label": "Display subscriptions in the booking calendar"
+ }
+ ],
+ "grid_page_length": 50,
+ "issingle": 1,
+ "links": [],
+ "modified": "2025-03-25 10:57:38.144654",
+ "modified_by": "Administrator",
+ "module": "Bookings",
+ "name": "Venue Settings",
+ "owner": "Administrator",
+ "permissions": [
+ {
+ "create": 1,
+ "delete": 1,
+ "email": 1,
+ "print": 1,
+ "read": 1,
+ "role": "System Manager",
+ "share": 1,
+ "write": 1
+ },
+ {
+ "create": 1,
+ "delete": 1,
+ "email": 1,
+ "print": 1,
+ "read": 1,
+ "role": "Venue Manager",
+ "share": 1,
+ "write": 1
+ }
+ ],
+ "row_format": "Dynamic",
+ "sort_field": "modified",
+ "sort_order": "DESC",
+ "states": []
+}
diff --git a/bookings/bookings/doctype/venue_settings/venue_settings.py b/bookings/bookings/doctype/venue_settings/venue_settings.py
new file mode 100644
index 0000000000000000000000000000000000000000..031fdbccf6696259ac35bbb2c9a0df23ab8cfec5
--- /dev/null
+++ b/bookings/bookings/doctype/venue_settings/venue_settings.py
@@ -0,0 +1,434 @@
+# Copyright (c) 2025, Dokos SAS and contributors
+# For license information, please see license.txt
+
+import datetime
+from dataclasses import dataclass
+from typing import Literal
+from urllib.parse import unquote
+
+import frappe
+from frappe import _
+from frappe.model.document import Document
+from frappe.utils import cint
+
+
+@dataclass
+class venue_uom_info_t(frappe._dict):
+ selector: Literal["long", "short"]
+ target_type: Literal["Minute", "Day", "Week", "Month", "Year"]
+ value: int | float
+ from_uom: str
+ to_uom: str
+
+class VenueSettings(Document):
+ # begin: auto-generated types
+ # This code is auto-generated. Do not modify anything in this block.
+
+ from typing import TYPE_CHECKING
+
+ if TYPE_CHECKING:
+ from bookings.bookings.doctype.venue_cart_settings.venue_cart_settings import VenueCartSettings
+ from bookings.bookings.doctype.venue_units_of_measure.venue_units_of_measure import VenueUnitsofMeasure
+ from bookings.bookings.doctype.venue_uom_conversion.venue_uom_conversion import VenueUOMConversion
+ from frappe.types import DF
+
+ allow_event_cancellation: DF.Check
+ cancellation_delay: DF.Duration | None
+ cart_settings_overrides: DF.Table[VenueCartSettings]
+ clear_item_booking_draft_duration: DF.Int
+ confirm_booking_after_payment: DF.Check
+ day_uom: DF.Link | None
+ display_subscriptions_in_calendar: DF.Check
+ enable_multi_companies: DF.Check
+ enable_simultaneous_booking: DF.Check
+ minute_uom: DF.Link | None
+ month_uom: DF.Link | None
+ no_overlap_per_item: DF.Check
+ registration_item_code: DF.Link | None
+ role_allowed_to_skip_cart: DF.Link | None
+ sync_with_google_calendar: DF.Check
+ venue_long_uoms: DF.Table[VenueUOMConversion]
+ venue_units_of_measure: DF.Table[VenueUnitsofMeasure]
+ week_uom: DF.Link | None
+ # end: auto-generated types
+
+ def onload(self):
+ # see: webshop_settings.py
+ self.get("__onload").quotation_series = frappe.get_meta("Quotation").get_options("naming_series")
+
+ def validate(self):
+ # check that all selected companies are unique in the cart_settings_overrides,
+ # even if disabled to avoid mistakes
+ unique_companies = set()
+ for override in self.cart_settings_overrides:
+ if override.company in unique_companies:
+ frappe.throw(
+ frappe._("Company {0} is used more than once in the cart settings overrides").format(
+ override.company
+ )
+ )
+ unique_companies.add(override.company)
+
+ if self.enable_multi_companies and not unique_companies:
+ frappe.throw(frappe._("You must select at least one company in the cart settings overrides"))
+
+ def configure_uom_conversions(self):
+ if self.minute_uom:
+ for row in self.venue_units_of_measure:
+ self.venue_upsert_uom_conversion(
+ row.unit_of_measure, self.minute_uom, cint(row.duration) / 60
+ )
+
+ self.venue_configure_long_uom_conversions()
+
+ def venue_upsert_uom_conversion(self, from_uom: str, to_uom: str, value: int | float):
+ if not from_uom or not to_uom or value <= 0:
+ return
+
+ conversion = frappe.db.get_value(
+ "UOM Conversion Factor",
+ filters={"from_uom": from_uom, "to_uom": to_uom},
+ fieldname=["name", "value"],
+ as_dict=True,
+ for_update=True,
+ )
+ if conversion and conversion.value == value:
+ return
+
+ if conversion:
+ conversion = frappe.get_doc("UOM Conversion Factor", conversion)
+ else:
+ conversion = frappe.new_doc("UOM Conversion Factor")
+
+ conversion.category = self.venue_make_time_category()
+ conversion.from_uom = from_uom
+ conversion.to_uom = to_uom
+ conversion.value = value
+ conversion.save(ignore_permissions=True)
+
+ def get_uom_aliases(self):
+ return {
+ "Minute": self.minute_uom,
+ "Day": self.day_uom,
+ "Week": self.week_uom,
+ "Month": self.month_uom,
+ "Year": None,
+ }
+
+ def venue_configure_long_uom_conversions(self):
+ uom_aliases = self.get_uom_aliases()
+ for row in self.venue_long_uoms:
+ to_uom = uom_aliases[row.target_type]
+ if to_uom:
+ self.venue_upsert_uom_conversion(row.from_uom, to_uom, row.value)
+
+ def venue_make_time_category(self):
+ category = frappe.db.exists("UOM Category", "Time")
+ if not category:
+ category = frappe.db.exists("UOM Category", _("Time"))
+
+ if not category:
+ category_doc = frappe.new_doc("UOM Category")
+ category_doc.category_name = _("Time")
+ category_doc.insert(ignore_permissions=True)
+ category = category_doc.name
+
+ return category
+
+ def get_uom_infos(self) -> dict[str, venue_uom_info_t]:
+ uom_aliases = self.get_uom_aliases()
+ uom_infos = {}
+ for long_uom in self.venue_long_uoms:
+ base_uom = uom_aliases[long_uom.target_type]
+ if not base_uom:
+ continue
+ uom_infos[long_uom.from_uom] = venue_uom_info_t(
+ selector="long",
+ target_type=long_uom.target_type,
+ value=long_uom.value,
+ from_uom=long_uom.from_uom,
+ to_uom=base_uom,
+ )
+ for target_type in ("Month", "Week", "Day", "Year"):
+ if uom := getattr(self, f"{target_type.lower()}_uom", None):
+ uom_infos[uom] = venue_uom_info_t(
+ selector="long",
+ target_type=target_type,
+ value=1,
+ from_uom=uom,
+ to_uom=uom,
+ )
+ if minute_uom := self.minute_uom:
+ uom_infos[minute_uom] = venue_uom_info_t(
+ selector="short",
+ target_type="Minute",
+ value=1,
+ from_uom=minute_uom,
+ to_uom=minute_uom,
+ )
+ # conversions = frappe.get_all(
+ # "UOM Conversion Factor",
+ # filters={"to_uom": minute_uom},
+ # fields=["from_uom", "to_uom", "value"],
+ # order_by="modified desc",
+ # )
+ # for conv in conversions:
+ # if conv.from_uom not in uom_infos:
+ # uom_infos[conv.from_uom] = venue_uom_info_t(
+ # selector="short",
+ # target_type="Minute",
+ # value=conv.value,
+ # from_uom=conv.from_uom,
+ # to_uom=minute_uom,
+ # )
+ return uom_infos
+
+ def clear_cache(self):
+ frappe.cache.delete_value("venue_settings_enable_multi_companies")
+ return super().clear_cache()
+
+ def on_update(self):
+ old_doc = self.get_doc_before_save()
+ did_change = False
+ if old_doc:
+ did_change = old_doc.enable_multi_companies != self.enable_multi_companies
+ else:
+ did_change = True
+ if did_change:
+ if self.enable_multi_companies:
+ multicompany_create_custom_fields(self)
+ else:
+ multicompany_delete_custom_fields(self)
+
+ self.configure_uom_conversions()
+
+ ## Type hints for fields
+ enable_multi_companies: bool
+ cart_settings_overrides: dict
+
+
+ ## Helpers
+ def multicompany_is_company_allowed(self, company):
+ if not self.enable_multi_companies:
+ return True # all companies are allowed if the feature is disabled
+
+ for override in self.cart_settings_overrides:
+ if override.company == company:
+ # return override.enabled
+ return True
+
+ return False
+
+ def multicompany_get_allowed_companies(self) -> list:
+ return [override.company for override in self.cart_settings_overrides]
+
+ def multicompany_get_dropdown(self, selected_company: str | None = None) -> list:
+ selected_company = selected_company or self.multicompany_get_current_company()
+ return [
+ {
+ "label": override.get("_label") or override.company,
+ "value": override.company,
+ "selected": override.company == selected_company,
+ }
+ for override in self.cart_settings_overrides
+ ]
+
+ def multicompany_get_current_company(self):
+ if self.enable_multi_companies:
+ if company := multicompany_read_cookie(self):
+ if self.multicompany_is_company_allowed(company):
+ return company
+
+ def multicompany_get_item_filter(self):
+ NOT_ALLOWED = ["Venue Selected Company", "company", "=", ""]
+ if self.enable_multi_companies:
+ if company := self.multicompany_get_current_company():
+ return ["Venue Selected Company", "company", "=", company]
+ return NOT_ALLOWED
+ return None
+
+ def multicompany_get_item_filter_for_company(self, for_company=None):
+ NOT_ALLOWED = ["Venue Selected Company", "company", "=", ""]
+ if self.enable_multi_companies:
+ if self.multicompany_is_company_allowed(for_company):
+ return ["Venue Selected Company", "company", "=", for_company]
+ return NOT_ALLOWED
+ return None
+
+MULTICOMPANY_COOKIE_NAME = "company"
+MULTICOMPANY_FLAG_NAME = "multicompany_current_company"
+MULTICOMPANY_CONTEXT_DROPDOWN = "multicompany_dropdown"
+MULTICOMPANY_CONTEXT_CURRENT_COMPANY = "multicompany_current"
+
+
+def multicompany_read_cookie(venue_settings=None):
+ return multicompany_read_and_update_cookie(venue_settings)
+
+
+def multicompany_read_and_update_cookie(venue_settings: "VenueSettings | None" = None):
+ cached = frappe.flags.get(MULTICOMPANY_FLAG_NAME, 0)
+ if cached != 0:
+ return cached
+
+ venue_settings: "VenueSettings" = venue_settings or frappe.get_single("Venue Settings")
+
+ if not venue_settings.enable_multi_companies:
+ multicompany_clear_cookie() # clear the cookie + set cache
+ return None
+
+ # Read the selected company from the query string parameters
+ # to overwrite the cookie "company" (MULTICOMPANY_COOKIE_NAME)
+ # if valid.
+ from_query = frappe.form_dict.get("selected_company", None)
+ if from_query:
+ is_valid = venue_settings.multicompany_is_company_allowed(from_query)
+ if is_valid:
+ multicompany_write_cookie(from_query) # overwrite the cookie
+ return from_query
+ else:
+ frappe.local.flags.redirect_location = "/"
+ raise frappe.Redirect
+ elif from_query == "":
+ # front-end wants to clear of the cookie
+ multicompany_clear_cookie() # clear the cookie + set cache
+ return None
+
+ try:
+ from_cookie = frappe.request.cookies.get(MULTICOMPANY_COOKIE_NAME, None)
+ from_cookie = unquote(from_cookie) if from_cookie else None
+ if venue_settings.multicompany_is_company_allowed(from_cookie):
+ multicompany_write_cookie(from_cookie) # refresh the cookie
+ return from_cookie
+ except RuntimeError:
+ # frappe.request is not available in some contexts
+ multicompany_clear_cookie() # clear the cookie
+ return None
+
+ multicompany_clear_cookie() # fallback: clear the cookie
+ return None
+
+
+def multicompany_write_cookie(value):
+ frappe.flags[MULTICOMPANY_FLAG_NAME] = value
+ if hasattr(frappe.local, "cookie_manager"):
+ expires = datetime.datetime.now() + datetime.timedelta(days=14)
+ frappe.local.cookie_manager.set_cookie(MULTICOMPANY_COOKIE_NAME, value, expires=expires)
+
+
+def multicompany_clear_cookie():
+ frappe.flags[MULTICOMPANY_FLAG_NAME] = None
+ if hasattr(frappe.local, "cookie_manager"):
+ frappe.local.cookie_manager.delete_cookie(MULTICOMPANY_COOKIE_NAME)
+
+
+## Custom fields for multi company
+def get_custom_fields_for_multicompany():
+ def _get_fields(insert_after: str, depends_on: str = None):
+ # hint for translation
+ # frappe._('Show only for selected companies')
+ # frappe._('Multi-venue mode')
+
+ return [{
+ 'insert_after': insert_after,
+ 'fieldname': '_section_break_multicompany',
+ 'fieldtype': 'Section Break',
+ 'label': 'Multi-venue mode',
+ 'collapsible': 0,
+ 'depends_on': depends_on,
+ }, {
+ 'insert_after': '_section_break_multicompany',
+ 'fieldname': 'only_companies',
+ 'fieldtype': 'Table MultiSelect',
+ 'label': 'Show only for the following companies',
+ 'options': 'Venue Selected Company',
+ }]
+ return {
+ 'Item Group': _get_fields(insert_after='website_specifications', depends_on='show_in_website'),
+ 'Website Item': _get_fields(insert_after='brand', depends_on='published'),
+ }
+
+def multicompany_create_custom_fields(venue_settings = None):
+ if not venue_settings:
+ venue_settings = frappe.get_single("Venue Settings")
+ if not getattr(venue_settings, "enable_multi_companies", False):
+ return
+
+ from frappe.custom.doctype.custom_field.custom_field import create_custom_fields
+ custom_fields = get_custom_fields_for_multicompany()
+ create_custom_fields(custom_fields)
+
+def multicompany_delete_custom_fields(venue_settings):
+ custom_fields = get_custom_fields_for_multicompany()
+ for doctype, fields in custom_fields.items():
+ for df in fields:
+ docname = frappe.db.get_value('Custom Field', {
+ 'dt': doctype,
+ 'fieldname': df['fieldname']
+ })
+
+ if docname:
+ frappe.delete_doc('Custom Field', docname)
+
+
+
+@frappe.whitelist()
+def create_role_profile_fields():
+ from frappe.custom.doctype.custom_field.custom_field import create_custom_fields
+
+ # For translations
+ __ = _("All users associated with this customer will be attributed this role profile")
+ __ = _("Role Profile")
+
+ custom_fields = {}
+ for dt, insert_after in {
+ "Customer": "customer_primary_contact",
+ "Subscription": "contact_person",
+ }.items():
+ df = dict(
+ doctype=dt,
+ fieldname="role_profile_name",
+ label="Role Profile",
+ fieldtype="Link",
+ insert_after=insert_after,
+ options="Role Profile",
+ description="All users associated with this customer will be attributed this role profile",
+ )
+ custom_fields[dt] = [df]
+
+ create_custom_fields(custom_fields)
+
+
+@frappe.whitelist()
+def get_duration_for_uom(uom, minute_uom):
+ from bookings.bookings.doctype.item_booking.item_booking import get_uom_in_minutes
+
+ return get_uom_in_minutes(uom, minute_uom) * 60
+
+
+@frappe.whitelist()
+def simultaneous_booking_enabled():
+ return bool(frappe.db.get_single_value("Venue Settings", "enable_simultaneous_booking"))
+
+
+@frappe.whitelist()
+def get_booking_uoms(*args):
+ valid_uoms: list[str] = []
+
+ venue_settings = frappe.get_single("Venue Settings")
+ for fieldname in ["minute_uom", "day_uom", "week_uom", "month_uom"]:
+ if uom := venue_settings.get(fieldname):
+ valid_uoms.append(uom)
+
+ all_uoms = valid_uoms + frappe.get_all(
+ "UOM Conversion Factor", filters={"to_uom": ("in", valid_uoms)}, pluck="from_uom"
+ )
+
+ out = []
+ seen = set()
+ for uom in all_uoms:
+ if uom not in seen:
+ out.append([uom])
+ seen.add(uom)
+
+ out.sort()
+ return out