package space.luminic.budgerapp.services import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList import kotlinx.coroutines.launch import kotlinx.coroutines.reactive.asFlow import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactive.awaitSingle import kotlinx.coroutines.reactor.awaitSingle import kotlinx.coroutines.reactor.awaitSingleOrNull import org.bson.BsonNull import org.bson.Document import org.bson.types.ObjectId import org.slf4j.LoggerFactory import org.springframework.cache.annotation.CacheEvict import org.springframework.cache.annotation.Cacheable import org.springframework.data.domain.Sort import org.springframework.data.domain.Sort.Direction import org.springframework.data.mongodb.core.ReactiveMongoTemplate import org.springframework.data.mongodb.core.aggregation.Aggregation.* import org.springframework.data.mongodb.core.aggregation.DateOperators.DateToString import org.springframework.data.mongodb.core.query.Criteria import org.springframework.security.core.context.ReactiveSecurityContextHolder import org.springframework.stereotype.Service import reactor.core.publisher.Flux import reactor.core.publisher.Mono import space.luminic.budgerapp.mappers.BudgetMapper import space.luminic.budgerapp.mappers.TransactionsMapper import space.luminic.budgerapp.models.* import space.luminic.budgerapp.repos.BudgetRepo import space.luminic.budgerapp.repos.CategoryRepo import space.luminic.budgerapp.repos.TransactionRepo import space.luminic.budgerapp.repos.WarnRepo import java.time.* import java.time.temporal.TemporalAdjusters import java.util.* @Service class FinancialService( val budgetRepo: BudgetRepo, val warnRepo: WarnRepo, val transactionsRepo: TransactionRepo, val recurrentService: RecurrentService, val userService: UserService, val reactiveMongoTemplate: ReactiveMongoTemplate, private val categoryRepo: CategoryRepo, val transactionsMapper: TransactionsMapper, val budgetMapper: BudgetMapper ) { private val logger = LoggerFactory.getLogger(FinancialService::class.java) suspend fun updateBudgetOnCreate(transaction: Transaction) { val budget = findProjectedBudget( transaction.space!!.id!!, budgetId = null, transaction.date, transaction.date ) val budgetCategory = budget.categories.firstOrNull { it.category.id == transaction.category.id } ?: throw NotFoundException("Budget category not found in the budget") if (transaction.category.type.code == "INCOME") { budgetRepo.save(budget).awaitSingle() } val categorySums = getBudgetSumsByCategory(transaction.category.id!!, budget) budgetCategory.currentPlanned = categorySums.getDouble("plannedAmount") budgetCategory.currentSpent = categorySums.getDouble("instantAmount") // При совпадении бюджетов разница просто корректирует лимит if (transaction.type.code == "PLANNED") { budgetCategory.currentLimit += transaction.amount } logger.info("updateBudgetOnCreate end") budgetRepo.save(budget).awaitSingle() } suspend fun updateBudgetOnEdit( oldTransaction: Transaction, newTransaction: Transaction, difference: Double ) = coroutineScope { logger.info("updateBudgetOnEdit start") val oldBudgetDeffer = async { findProjectedBudget( newTransaction.space?.id!!, budgetId = null, oldTransaction.date, oldTransaction.date ) } val newBudgetDeffer = async { findProjectedBudget( newTransaction.space?.id!!, budgetId = null, newTransaction.date, newTransaction.date ) } val (oldBudget, newBudget) = awaitAll(oldBudgetDeffer, newBudgetDeffer) val isSameBudget = oldBudget.id == newBudget.id if (isSameBudget) { updateSameBudget(oldTransaction, newTransaction, difference, newBudget) } else { updateDifferentBudgets(oldTransaction, newTransaction, difference, oldBudget, newBudget) } } private suspend fun updateSameBudget( oldTransaction: Transaction, newTransaction: Transaction, difference: Double, budget: Budget ) = coroutineScope { val oldCategory = findBudgetCategory(oldTransaction, budget) val newCategory = findBudgetCategory(newTransaction, budget) if (oldCategory.category.id == newCategory.category.id) { updateBudgetCategory(newTransaction, budget, difference) } else { logger.info("hui" + oldTransaction.category.id + " " + (-oldTransaction.amount).toString()) val updateOldBudgetCategoryDeffer = async { updateBudgetCategory( oldTransaction, budget, -difference, isCategoryChanged = true, isOldCategory = true ) } val updateNewBudgetCategoryDeffer = async { updateBudgetCategory( newTransaction, budget, difference, true ) } val (updateOldBudgetCategory, updateNewBudgetCategory) = awaitAll( updateNewBudgetCategoryDeffer, updateOldBudgetCategoryDeffer ) logger.info(updateOldBudgetCategory.toString()) logger.info(updateNewBudgetCategory.toString()) } } private suspend fun updateDifferentBudgets( oldTransaction: Transaction, newTransaction: Transaction, difference: Double, oldBudget: Budget, newBudget: Budget ) = coroutineScope { val oldCategory = findBudgetCategory(oldTransaction, oldBudget) val newCategory = findBudgetCategory(newTransaction, newBudget) if (oldCategory.category.id == newCategory.category.id) { async { updateBudgetCategory(oldTransaction, oldBudget, -difference) updateBudgetCategory(newTransaction, newBudget, difference) } } else { async { updateBudgetCategory( oldTransaction, oldBudget, -difference, isCategoryChanged = true, isOldCategory = true ) updateBudgetCategory(newTransaction, newBudget, difference) } } } private suspend fun findBudgetCategory(transaction: Transaction, budget: Budget): BudgetCategory { return if (transaction.category.type.code == "EXPENSE") { budget.categories.firstOrNull { it.category.id == transaction.category.id } ?: addCategoryToBudget(transaction.category, budget) } else { budget.incomeCategories.firstOrNull { it.category.id == transaction.category.id } ?:addCategoryToBudget(transaction.category, budget) } } private suspend fun addCategoryToBudget(category: Category, budget: Budget): BudgetCategory { val sums = getBudgetSumsByCategory(category.id!!, budget) val categoryBudget = BudgetCategory( currentSpent = sums.getDouble("instantAmount"), currentPlanned = sums.getDouble("plannedAmount"), currentLimit = sums.getDouble("plannedAmount"), category = category ) if (category.type.code == "EXPENSE") { budget.categories.add(categoryBudget) } else { budget.incomeCategories.add(categoryBudget) } budgetRepo.save(budget).awaitSingle() return categoryBudget } private suspend fun updateBudgetCategory( transaction: Transaction, budget: Budget, difference: Double, isCategoryChanged: Boolean = false, isOldCategory: Boolean = false ): Double { val sums = getBudgetSumsByCategory(transaction.category.id!!, budget) val categoryBudget = budget.categories.firstOrNull { it.category.id == transaction.category.id } ?: throw NotFoundException("Not found category in budget") categoryBudget.currentPlanned = sums.getDouble("plannedAmount") categoryBudget.currentSpent = sums.getDouble("instantAmount") if (transaction.type.code == "PLANNED") { if (isCategoryChanged) { if (isOldCategory) { categoryBudget.currentLimit -= transaction.amount } else categoryBudget.currentLimit += transaction.amount } else categoryBudget.currentLimit += difference } budgetRepo.save(budget).awaitSingle() return 1.0 } suspend fun updateBudgetOnDelete(transaction: Transaction) { // Найдем бюджет по дате val budget = findProjectedBudget(transaction.space!!.id!!, budgetId = null, transaction.date, transaction.date) // Получаем категории бюджета val budgetCategories = getBudgetCategories(transaction.space?.id!!, budget.dateFrom, budget.dateTo) ?: throw NotFoundException("Budget category not found in the budget") // Обрабатываем категории в зависимости от типа транзакции val updatedCategories = when (transaction.type.code) { "PLANNED" -> budget.categories.map { category -> if (category.category.id == transaction.category.id) { budgetCategories[category.category.id]?.let { data -> category.currentSpent = data["instantAmount"] ?: 0.0 category.currentPlanned = data["plannedAmount"] ?: 0.0 category.currentLimit -= transaction.amount // Корректируем лимит по расходу } } category // Возвращаем обновленную категорию } "INSTANT" -> budget.categories.map { category -> if (category.category.id == transaction.category.id) { budgetCategories[category.category.id]?.let { data -> category.currentSpent = data["instantAmount"] ?: 0.0 category.currentPlanned = data["plannedAmount"] ?: 0.0 } } category // Возвращаем обновленную категорию } else -> budget.categories } // Сохраняем обновленные категории в бюджете budget.categories = updatedCategories.toMutableList() // Обновляем категории budgetRepo.save(budget).awaitSingle() } fun getBudgets(spaceId: String, sortSetting: SortSetting? = null): Mono> { val sort = sortSetting?.let { Sort.by(it.order, it.by) } ?: Sort.by(Direction.DESC, "dateFrom") return findProjectedBudgets(ObjectId(spaceId), sortRequested = sort) } fun findProjectedBudgets( spaceId: ObjectId, projectKeys: Array = arrayOf("_id", "name", "dateFrom", "dateTo"), sortRequested: Sort? = null ): Mono> { val lookupCategories = lookup("categories", "categories.category.\$id", "_id", "categoriesDetails") val lookupIncomeCategories = lookup("categories", "incomeCategories.category.\$id", "_id", "incomeCategoriesDetails") val lookupSpace = lookup("spaces", "space.\$id", "_id", "spaceDetails") val unwindSpace = unwind("spaceDetails") val matchStage = match(Criteria.where("spaceDetails._id").`is`(spaceId)) // matchCriteria.add(Criteria.where("spaceDetails._id").`is`(ObjectId(spaceId))) val projectStage = project(*projectKeys) // Оставляем только нужные поля val sort = sortRequested?.let { sort(it) } ?: sort( Sort.by(Direction.DESC, "date").and(Sort.by(Direction.DESC, "createdAt")) ) val aggregation = newAggregation( lookupCategories, lookupIncomeCategories, lookupSpace, unwindSpace, matchStage, projectStage, sort ) return reactiveMongoTemplate.aggregate(aggregation, "budgets", Document::class.java).collectList().map { docs -> docs.map { doc -> budgetMapper.fromDocument(doc) }.toMutableList() } } suspend fun findProjectedBudget( spaceId: String, budgetId: String? = null, dateFrom: LocalDate? = null, dateTo: LocalDate? = null ): Budget { val lookupCategories = lookup("categories", "categories.category.\$id", "_id", "categoriesDetails") // val unwindCategories = unwind("categoriesDetails") val lookupIncomeCategories = lookup("categories", "incomeCategories.category.\$id", "_id", "incomeCategoriesDetails") // val unwindIncomeCategories = unwind("incomeCategoriesDetails") val lookupSpace = lookup("spaces", "space.\$id", "_id", "spaceDetails") val unwindSpace = unwind("spaceDetails") val matchCriteria = mutableListOf() budgetId?.let { matchCriteria.add(Criteria.where("_id").`is`(ObjectId(it))) } dateFrom?.let { matchCriteria.add(Criteria.where("dateFrom").lte(dateTo!!)) } dateTo?.let { matchCriteria.add(Criteria.where("dateTo").gte(it)) } matchCriteria.add(Criteria.where("spaceDetails._id").`is`(ObjectId(spaceId))) val matchStage = match(Criteria().andOperator(*matchCriteria.toTypedArray())) val aggregation = newAggregation(lookupCategories, lookupIncomeCategories, lookupSpace, unwindSpace, matchStage) return reactiveMongoTemplate.aggregate(aggregation, "budgets", Document::class.java).next().map { doc -> budgetMapper.fromDocument(doc) }.awaitSingleOrNull() ?: throw NotFoundException("Budget not found") } // @Cacheable("budgets", key = "#id") suspend fun getBudget(spaceId: String, id: String): BudgetDTO = coroutineScope { val budget = findProjectedBudget(spaceId, id) // Если доступ есть, продолжаем процесс val budgetDTO = BudgetDTO( budget.id, budget.space, budget.name, budget.dateFrom, budget.dateTo, budget.createdAt, categories = budget.categories, incomeCategories = budget.incomeCategories, ) logger.info("Fetching categories and transactions") val categoriesDeffer = async { getBudgetCategories(spaceId, budgetDTO.dateFrom, budgetDTO.dateTo) } val transactionsDeffer = async { getTransactionsByTypes(spaceId, budgetDTO.dateFrom, budgetDTO.dateTo) } val categories = categoriesDeffer.await() ?: throw NotFoundException("Categories in null") val transactions = transactionsDeffer.await() ?: throw NotFoundException("Categories in null") val updatedCategories = budgetDTO.categories.map { category -> categories.get(category.category.id)?.let { data -> category.currentSpent = data["instantAmount"] ?: 0.0 category.currentPlanned = data["plannedAmount"] ?: 0.0 } category // Возвращаем обновленную категорию } budgetDTO.categories = updatedCategories.toMutableList() budgetDTO.plannedExpenses = transactions.get("plannedExpenses") as MutableList budgetDTO.plannedIncomes = transactions["plannedIncomes"] as MutableList budgetDTO.transactions = transactions["instantTransactions"] as MutableList budgetDTO } // fun regenBudgets(): Mono { // return budgetRepo.findAll() // .flatMap { budget -> // spaceService.getSpace("67af3c0f652da946a7dd9931") // .map { space -> // budget.space = space // budget // } // .flatMap { updatedBudget -> budgetRepo.save(updatedBudget) } // } // .then() // } // fun regenTransactions(): Mono { // return transactionsRepo.findAll().flatMap { transaction -> // spaceService.getSpace("67af3c0f652da946a7dd9931") // .map { space -> // transaction.space = space // transaction // } // .flatMap { updatedTransaction -> transactionsRepo.save(updatedTransaction) } // } // .then() // } fun regenCats(): Mono { return categoryRepo.findBySpaceId(ObjectId("67b352b13384483a1c2282ed")).flatMap { cat -> // if (cat.space?.id == "67b352b13384483a1c2282ed") { categoryRepo.deleteById(cat.id!!) // Возвращаем `Mono` // } else { // Mono.empty() // Если не удаляем, возвращаем пустой `Mono` // } }.then() // Убедимся, что все операции завершены } fun createCategory(space: Space, category: Category): Mono { category.space = space return categoryRepo.save(category) .flatMap { savedCategory -> findProjectedBudgets( ObjectId(space.id), projectKeys = arrayOf( "_id", "name", "dateFrom", "dateTo", "space", "spaceDetails", "categories", "categoriesDetails", "incomeCategories", "incomeCategoriesDetails" ) ) .flatMapMany { Flux.fromIterable(it) } // Преобразуем List в Flux .flatMap { budget -> when (savedCategory.type.code) { "INCOME" -> budget.incomeCategories.add(BudgetCategory(0.0, 0.0, 0.0, savedCategory)) "EXPENSE" -> budget.categories.add(BudgetCategory(0.0, 0.0, 0.0, savedCategory)) } budgetRepo.save(budget) // Сохраняем каждый обновленный бюджет } .then(Mono.just(savedCategory)) // Возвращаем сохраненную категорию после обработки всех бюджетов } } suspend fun createBudget(space: Space, budget: Budget, createRecurrent: Boolean): Budget = coroutineScope { val startBudgetDeferred = async { getBudgetByDate(budget.dateFrom, space.id!!) } val endBudgetDeferred = async { getBudgetByDate(budget.dateTo, space.id!!) } val (startBudget, endBudget) = awaitAll(startBudgetDeferred, endBudgetDeferred) if (startBudget != null || endBudget != null) { throw IllegalArgumentException("Бюджет с теми же датами найден") } budget.space = space if (createRecurrent) { recurrentService.createRecurrentsForBudget(space, budget) } val categoriesDeferred = async { getCategoryTransactionPipeline(space.id!!, budget.dateFrom, budget.dateTo) } val incomeCategoriesDeferred = async { getCategoryTransactionPipeline(space.id!!, budget.dateFrom, budget.dateTo, "INCOME") } budget.categories = categoriesDeferred.await().toMutableList() val savedBudget = budgetRepo.save(budget).awaitSingle() launch { try { updateBudgetWarns(savedBudget) } catch (error: Exception) { logger.error("Error during updateBudgetWarns: ${error.message}") } } savedBudget.incomeCategories = incomeCategoriesDeferred.await().toMutableList() return@coroutineScope budgetRepo.save(savedBudget).awaitSingle().also { launch { try { updateBudgetWarns(it) } catch (error: Exception) { logger.error("Error during updateBudgetWarns: ${error.message}") } } } } suspend fun getBudgetByDate(date: LocalDate, spaceId: String): Budget? { return budgetRepo.findByDateFromLessThanEqualAndDateToGreaterThanEqualAndSpace(date, date, ObjectId(spaceId)) .awaitSingleOrNull() } fun getBudgetCategories(id: String): Mono> { return budgetRepo.findById(id).flatMap { budget -> val lookup = lookup("categories", "category.\$id", "_id", "categoryDetailed") val unwind = unwind("categoryDetailed") val projectDouble = project("categoryDetailed", "amount", "date").andExpression("{ \$toDouble: \"\$amount\" }") .`as`("amount") val match = match(Criteria.where("date").gte(budget.dateFrom).lt(budget.dateTo)) val group = group("categoryDetailed").sum("amount").`as`("currentSpent") val project = project("currentSpent").and("_id").`as`("category") val sort = sort(Sort.by(Sort.Order.asc("_id"))) val aggregation = newAggregation(lookup, unwind, projectDouble, match, group, project, sort) reactiveMongoTemplate.aggregate(aggregation, "transactions", BudgetCategory::class.java) .collectList() // Преобразование результата в список } } // fun getBudgetTransactionsByType(budgetId: String): Mono>> { // return budgetRepo.findById(budgetId).flatMap { it -> // getTransactionsByTypes(it.dateFrom, it.dateTo) // } // } suspend fun deleteBudget(spaceId: String, budgetId: String) { val budget = findProjectedBudget(spaceId, budgetId) getTransactionsToDelete(spaceId, budget.dateFrom, budget.dateTo).map { deleteTransaction(it) } budgetRepo.deleteById(budget.id!!) } suspend fun setCategoryLimit(spaceId: String, budgetId: String, catId: String, limit: Double): BudgetCategory = coroutineScope { val budget = findProjectedBudget(spaceId, budgetId = budgetId) val categoryToEdit = budget.categories.firstOrNull { it.category.id == catId } ?: throw NotFoundException("Category not found in the budget") val transactionSumsByCategory = calcTransactionsSum(spaceId, budget, catId, "PLANNED").awaitSingle() if (transactionSumsByCategory > limit) { throw IllegalArgumentException("Limit can't be less than planned expenses on category. Current planned value: $transactionSumsByCategory") } else { categoryToEdit.currentLimit = limit launch { updateBudgetWarns(budget) } budgetRepo.save(budget).awaitSingle() categoryToEdit } } fun getWarns(budgetId: String, isHide: Boolean? = null): Mono> { return warnRepo.findAllByBudgetIdAndIsHide(budgetId, isHide == true).collectList() } fun hideWarn(warnId: String): Mono { return warnRepo.findById(warnId) // Ищем предупреждение .flatMap { warn -> warn.isHide = true // Обновляем поле warnRepo.save(warn) // Сохраняем изменённое предупреждение } } fun updateBudgetWarns(budget: Budget? = null): Mono> { logger.info("STARTED WARNS UPDATE") val finalBudgetMono = budget?.let { Mono.just(it) } ?: return Mono.just(emptyList()) return finalBudgetMono.flatMap { finalBudget -> if (finalBudget.categories.isEmpty()) { logger.info("No categories found for budget ${finalBudget.id}") return@flatMap Mono.just(emptyList()) } val averageSumsMono = getAverageSpendingByCategory() val averageIncomeMono = getAverageIncome() val currentBudgetIncomeMono = calcTransactionsSum( budget.space!!.id!!, finalBudget, transactionType = "PLANNED", categoryType = "INCOME" ) val plannedIncomeMono = calcTransactionsSum( budget.space!!.id!!, finalBudget, categoryType = "INCOME", transactionType = "PLANNED" ) val plannedSavingMono = calcTransactionsSum( budget.space!!.id!!, finalBudget, categoryId = "675850148198643f121e466a", transactionType = "PLANNED" ) Mono.zip( averageSumsMono, averageIncomeMono, currentBudgetIncomeMono, plannedIncomeMono, plannedSavingMono ).flatMap { tuple -> val averageSums = tuple.t1 val averageIncome = tuple.t2 val currentBudgetIncome = tuple.t3 val plannedIncome = tuple.t4 val plannedSaving = tuple.t5 Flux.fromIterable(finalBudget.categories).flatMap { category -> processCategoryWarnings( category, finalBudget, averageSums, averageIncome, currentBudgetIncome, plannedIncome, plannedSaving ) }.collectList().flatMap { warns -> warnRepo.saveAll(warns.filterNotNull()).collectList() }.doOnSuccess { logger.info("ENDED WARNS UPDATE") } .map { it.sortedByDescending { warn -> warn.serenity.sort } } } }.doOnError { error -> logger.error("Error updating budget warns: ${error.message}", error) }.onErrorResume { Mono.just(emptyList()) // Возвращаем пустой список в случае ошибки } } private fun processCategoryWarnings( category: BudgetCategory, finalBudget: Budget, averageSums: Map, averageIncome: Double, currentBudgetIncome: Double, plannedIncome: Double, plannedSaving: Double ): Flux { val warnsForCategory = mutableListOf>() val averageSum = averageSums[category.category.id] ?: 0.0 val categorySpentRatioInAvgIncome = if (averageIncome > 0.0) averageSum / averageIncome else 0.0 val projectedAvailableSum = currentBudgetIncome * categorySpentRatioInAvgIncome val contextAtAvg = "category${category.category.id}atbudget${finalBudget.id}lessavg" val lowSavingContext = "savingValueLess10atBudget${finalBudget.id}" if (averageSum > category.currentLimit) { val warnMono = warnRepo.findWarnByContext(contextAtAvg).switchIfEmpty( Mono.just( Warn( serenity = WarnSerenity.MAIN, message = PushMessage( title = "Внимание на ${category.category.name}!", body = "Лимит меньше средних трат (Среднее: ${averageSum.toInt()} ₽ Текущий лимит: ${category.currentLimit.toInt()} ₽)." + "\nСредняя доля данной категории в доходах: ${(categorySpentRatioInAvgIncome * 100).toInt()}%." + "\nПроецируется на текущие поступления: ${projectedAvailableSum.toInt()} ₽", icon = category.category.icon ), budgetId = finalBudget.id!!, context = contextAtAvg, isHide = false ) ) ) warnsForCategory.add(warnMono) } else { warnRepo.findWarnByContext(contextAtAvg).flatMap { warnRepo.delete(it).then(Mono.empty()) } } if (category.category.id == "675850148198643f121e466a") { val savingRatio = if (plannedIncome > 0.0) category.currentLimit / plannedIncome else 0.0 if (savingRatio < 0.1) { val warnMono = warnRepo.findWarnByContext(lowSavingContext).switchIfEmpty( Mono.just( Warn( serenity = WarnSerenity.IMPORTANT, message = PushMessage( title = "Доля сбережений очень мала!", body = "Текущие плановые сбережения равны ${plannedSaving.toInt()} (${ (savingRatio * 100).toInt() }%)! Исправьте!", icon = category.category.icon ), budgetId = finalBudget.id!!, context = lowSavingContext, isHide = false ) ) ) warnsForCategory.add(warnMono) } else { warnRepo.findWarnByContext(lowSavingContext) .flatMap { warnRepo.delete(it).then(Mono.empty()) } } } return Flux.fromIterable(warnsForCategory).flatMap { it } } fun getTransactions( spaceId: String, dateFrom: LocalDate? = null, dateTo: LocalDate? = null, transactionType: String? = null, isDone: Boolean? = null, categoryId: String? = null, categoryType: String? = null, userId: String? = null, parentId: String? = null, isChild: Boolean? = null, sortSetting: SortSetting? = null, limit: Int? = null, offset: Int? = null, ): Mono> { val matchCriteria = mutableListOf() // Добавляем фильтры matchCriteria.add(Criteria.where("spaceDetails._id").`is`(ObjectId(spaceId))) dateFrom?.let { matchCriteria.add(Criteria.where("date").gte(it)) } dateTo?.let { matchCriteria.add(Criteria.where("date").lt(it)) } transactionType?.let { matchCriteria.add(Criteria.where("type.code").`is`(it)) } isDone?.let { matchCriteria.add(Criteria.where("isDone").`is`(it)) } categoryId?.let { matchCriteria.add(Criteria.where("categoryDetails._id").`is`(it)) } categoryType?.let { matchCriteria.add( Criteria.where("categoryDetails.type.code").`is`(it) ) } userId?.let { matchCriteria.add(Criteria.where("userDetails._id").`is`(ObjectId(it))) } parentId?.let { matchCriteria.add(Criteria.where("parentId").`is`(it)) } isChild?.let { matchCriteria.add(Criteria.where("parentId").exists(it)) } // Сборка агрегации val lookup = lookup("categories", "category.\$id", "_id", "categoryDetails") val unwindCategory = unwind("categoryDetails") val lookupSpaces = lookup("spaces", "space.\$id", "_id", "spaceDetails") val unwindSpace = unwind("spaceDetails") val lookupUsers = lookup("users", "user.\$id", "_id", "userDetails") val unwindUser = unwind("userDetails") val match = match(Criteria().andOperator(*matchCriteria.toTypedArray())) var sort = sort(Sort.by(Direction.DESC, "date").and(Sort.by(Direction.DESC, "createdAt"))) sortSetting?.let { sort = sort(Sort.by(it.order, it.by).and(Sort.by(Direction.ASC, "createdAt"))) } val aggregationBuilder = mutableListOf( lookup, unwindCategory, lookupSpaces, unwindSpace, lookupUsers, unwindUser, match.takeIf { matchCriteria.isNotEmpty() }, sort, offset?.let { skip(it.toLong()) }, limit?.let { limit(it.toLong()) }).filterNotNull() val aggregation = newAggregation(aggregationBuilder) return reactiveMongoTemplate.aggregate( aggregation, "transactions", Document::class.java ).collectList().map { docs -> val test = docs.map { doc -> transactionsMapper.fromDocument(doc) }.toMutableList() test } } suspend fun getTransactionByParentId(parentId: String): Transaction { // Сборка агрегации val lookup = lookup("categories", "category.\$id", "_id", "categoryDetails") val unwindCategory = unwind("categoryDetails") val lookupSpaces = lookup("spaces", "space.\$id", "_id", "spaceDetails") val unwindSpace = unwind("spaceDetails") val lookupUsers = lookup("users", "user.\$id", "_id", "userDetails") val unwindUser = unwind("userDetails") val match = match(Criteria.where("parentId").`is`(parentId)) val aggregationBuilder = mutableListOf( lookup, unwindCategory, lookupSpaces, unwindSpace, lookupUsers, unwindUser, match ) val aggregation = newAggregation(aggregationBuilder) return reactiveMongoTemplate.aggregate( aggregation, "transactions", Document::class.java ).map { doc -> transactionsMapper.fromDocument(doc) }.awaitFirstOrNull() ?: throw NotFoundException("Child transaction with parent id $parentId not found") } suspend fun getTransactionsToDelete( spaceId: String, dateFrom: LocalDate, dateTo: LocalDate ): List { val matchCriteria = mutableListOf() val lookupSpaces = lookup("spaces", "space.\$id", "_id", "spaceDetails") val unwindSpace = unwind("spaceDetails") val lookupCategory = lookup("categories", "category.\$id", "_id", "categoryDetails") val unwindCategory = unwind("categoryDetails") matchCriteria.add(Criteria.where("spaceDetails._id").`is`(ObjectId(spaceId))) matchCriteria.add(Criteria.where("date").gte(dateFrom)) matchCriteria.add(Criteria.where("date").lt(dateTo)) matchCriteria.add( Criteria().orOperator( Criteria.where("type.code").`is`("PLANNED"), Criteria.where("parentId").exists(true) ) ) val match = match(Criteria().andOperator(*matchCriteria.toTypedArray())) val project = project("_id", "type", "comment", "date", "amount", "isDone", "categoryDetails") val aggregationBuilder = mutableListOf( lookupSpaces, unwindSpace, lookupCategory, unwindCategory, match, project ) val aggregation = newAggregation(aggregationBuilder) return reactiveMongoTemplate.aggregate(aggregation, "transactions", Document::class.java) .collectList() // Собирать все результаты в список .map { docs -> docs.map { doc -> transactionsMapper.fromDocument(doc) } }.awaitSingle() } suspend fun getTransactionById(id: String): Transaction { val lookupSpaces = lookup("spaces", "space.\$id", "_id", "spaceDetails") val unwindSpaces = unwind("spaceDetails") val lookupCategory = lookup("categories", "category.\$id", "_id", "categoryDetails") val unwindCategory = unwind("categoryDetails") val lookupUser = lookup("users", "user.\$id", "_id", "userDetails") val unwindUser = unwind("userDetails") val matchCriteria = mutableListOf() matchCriteria.add(Criteria.where("_id").`is`(ObjectId(id))) val match = match(Criteria().andOperator(*matchCriteria.toTypedArray())) // val project = project("_iid", "type", "comment", "date", "amount", "isDone") val aggregationBuilder = mutableListOf( lookupSpaces, unwindSpaces, lookupCategory, unwindCategory, lookupUser, unwindUser, match ) val aggregation = newAggregation(aggregationBuilder) return reactiveMongoTemplate.aggregate(aggregation, "transactions", Document::class.java).next() .map { doc -> transactionsMapper.fromDocument(doc) }.awaitSingleOrNull() ?: throw NotFoundException("Transaction with id $id not found") } suspend fun createTransaction(space: Space, transaction: Transaction): Transaction { val securityContextHolder = ReactiveSecurityContextHolder.getContext().awaitSingle() val user = userService.getByUserNameWoPass(securityContextHolder.authentication.name) if (space.users.none { it.id.toString() == user.id }) { throw IllegalArgumentException("User does not have access to this Space") } // Привязываем space и user к транзакции transaction.user = user transaction.space = space val savedTransaction = transactionsRepo.save(transaction).awaitSingle() updateBudgetOnCreate(savedTransaction) return savedTransaction } @CacheEvict(cacheNames = ["transactions", "budgets"], allEntries = true) suspend fun editTransaction(transaction: Transaction): Transaction { val oldStateOfTransaction = getTransactionById(transaction.id!!) val changed = compareSumDateDoneIsChanged(oldStateOfTransaction, transaction) if (!changed) { return transactionsRepo.save(transaction).awaitSingle() } val amountDifference = transaction.amount - oldStateOfTransaction.amount if (oldStateOfTransaction.isDone && oldStateOfTransaction.type.code == "PLANNED") { handleChildTransaction( oldStateOfTransaction, transaction ) } val savedTransaction = transactionsRepo.save(transaction).awaitSingle() updateBudgetOnEdit(oldStateOfTransaction, savedTransaction, amountDifference) return savedTransaction } private suspend fun handleChildTransaction(oldTransaction: Transaction, newTransaction: Transaction) { val childTransaction = getTransactionByParentId(newTransaction.id!!) logger.info("Updating child: $childTransaction") val updatedChild = childTransaction.copy( amount = newTransaction.amount, category = newTransaction.category, comment = newTransaction.comment, user = newTransaction.user ) transactionsRepo.save(updatedChild).awaitSingle() if (!oldTransaction.isDone && newTransaction.isDone) { val newChildTransaction = newTransaction.copy( id = null, type = TransactionType("INSTANT", "Текущие"), parentId = newTransaction.id ) transactionsRepo.save(newChildTransaction).awaitSingle() updateBudgetOnCreate(newChildTransaction) } else if (oldTransaction.isDone && !newTransaction.isDone) { deleteTransaction(childTransaction) } } fun compareSumDateDoneIsChanged(t1: Transaction, t2: Transaction): Boolean { return if (t1.amount != t2.amount) { true } else if (t1.date != t2.date) { true } else if (t1.isDone != t2.isDone) { true } else if (t1.category.id != t2.category.id) { return true } else { return false } } suspend fun deleteTransaction(transaction: Transaction) = coroutineScope { transactionsRepo.deleteById(transaction.id!!).awaitFirstOrNull() launch { updateBudgetOnDelete(transaction) } } // @CacheEvict(cacheNames = ["transactions", "childTransactions"], allEntries = true) // fun setTransactionDone(transaction: Transaction): Transaction { // val oldStateTransaction = transactionsRepo.findById(transaction.id!!) // .orElseThrow { RuntimeException("Transaction ${transaction.id} not found") } // // if (transaction.isDone) { // if (oldStateTransaction.isDone) { // throw RuntimeException("Transaction ${transaction.id} is already done") // } // // // Создание дочерней транзакции // val childTransaction = transaction.copy( // id = null, // type = TransactionType("INSTANT", "Текущие"), // parentId = transaction.id // ) // createTransaction(childTransaction) // } else { // // Удаление дочерней транзакции, если она существует // transactionsRepo.findByParentId(transaction.id!!).getOrNull()?.let { // deleteTransaction(it.id!!) // } ?: logger.warn("Child transaction of parent ${transaction.id} not found") // } // // return editTransaction(transaction) // } // @Cacheable("childTransactions", key = "#parentId") // fun getChildTransaction(parentId: String): Mono { // return transactionsRepo.findByParentId(parentId) // } // fun getTransactionByOldId(id: Int): Transaction? { // return transactionsRepo.findByOldId(id).getOrNull() // } // fun transferTransactions(): Mono { // var transactions = transactionsRepoSQl.getTransactions() // return transactionsRepo.saveAll(transactions).then() // } // fun calcTransactionsSum( spaceId: String, budget: Budget, categoryId: String? = null, categoryType: String? = null, transactionType: String? = null, isDone: Boolean? = null ): Mono { val matchCriteria = mutableListOf() // Добавляем фильтры matchCriteria.add(Criteria.where("spaceDetails._id").`is`(ObjectId(spaceId))) matchCriteria.add(Criteria.where("date").gte(budget.dateFrom)) matchCriteria.add(Criteria.where("date").lt(budget.dateTo)) categoryId?.let { matchCriteria.add(Criteria.where("category.\$id").`is`(ObjectId(it))) } categoryType?.let { matchCriteria.add(Criteria.where("categoryDetails.type.code").`is`(it)) } transactionType?.let { matchCriteria.add(Criteria.where("type.code").`is`(it)) } isDone?.let { matchCriteria.add(Criteria.where("isDone").`is`(it)) } // Сборка агрегации val lookup = lookup("categories", "category.\$id", "_id", "categoryDetails") val unwind = unwind("categoryDetails") val lookupSpace = lookup("space", "space.\$id", "_id", "spaceDetails") val unwindSpace = unwind("spaceDetails") val match = match(Criteria().andOperator(*matchCriteria.toTypedArray())) val project = project("category").andExpression("{ \$toDouble: \"\$amount\" }").`as`("amount") val group = group(categoryId ?: "all").sum("amount").`as`("totalSum") val projectSum = project("totalSum") val aggregation = newAggregation(lookup, unwind, lookupSpace, unwindSpace, match, project, group, projectSum) return reactiveMongoTemplate.aggregate(aggregation, "transactions", Map::class.java).map { result -> val totalSum = result["totalSum"] if (totalSum is Double) { totalSum } else { 0.0 } }.reduce(0.0) { acc, sum -> acc + sum } // Суммируем значения, если несколько результатов } // @Cacheable("transactions") fun getAverageSpendingByCategory(): Mono> { val firstDateOfMonth = LocalDate.now().with(TemporalAdjusters.firstDayOfMonth()) val lookup = lookup("categories", "category.\$id", "_id", "categoryDetails") val unwind = unwind("categoryDetails") val match = match( Criteria.where("categoryDetails.type.code").`is`("EXPENSE").and("type.code").`is`("INSTANT") .and("date") .lt(firstDateOfMonth) ) val projectDate = project("_id", "category", "amount", "categoryDetails").and( DateToString.dateOf("date").toString("%Y-%m") ) .`as`("month").andExpression("{ \$toDouble: \"\$amount\" }").`as`("amount") val groupByMonthAndCategory = group("month", "category.\$id").sum("amount").`as`("sum") val groupByCategory = group("_id.id").avg("sum").`as`("averageAmount") val project = project().and("_id").`as`("category").and("averageAmount").`as`("avgAmount") val sort = sort(Sort.by(Sort.Order.asc("_id"))) val aggregation = newAggregation( lookup, unwind, match, projectDate, groupByMonthAndCategory, groupByCategory, project, sort ) return reactiveMongoTemplate.aggregate(aggregation, "transactions", Map::class.java).collectList() .map { results -> results.associate { result -> val category = result["category"]?.toString() ?: "Unknown" val avgAmount = (result["avgAmount"] as? Double) ?: 0.0 category to avgAmount } }.defaultIfEmpty(emptyMap()) // Возвращаем пустую карту, если результатов нет } @Cacheable("transactionTypes") fun getTransactionTypes(): List { val types = mutableListOf() types.add(TransactionType("PLANNED", "Плановые")) types.add(TransactionType("INSTANT", "Текущие")) return types } fun getAverageIncome(): Mono { val lookup = lookup("categories", "category.\$id", "_id", "detailedCategory") val unwind = unwind("detailedCategory") val match = match( Criteria.where("detailedCategory.type.code").`is`("INCOME").and("type.code").`is`("INSTANT") .and("isDone") .`is`(true) ) val project = project("_id", "category", "detailedCategory").and(DateToString.dateOf("date").toString("%Y-%m")) .`as`("month").andExpression("{ \$toDouble: \"\$amount\" }").`as`("amount") val groupByMonth = group("month").sum("amount").`as`("sum") val groupForAverage = group("avgIncomeByMonth").avg("sum").`as`("averageAmount") val aggregation = newAggregation(lookup, unwind, match, project, groupByMonth, groupForAverage) return reactiveMongoTemplate.aggregate(aggregation, "transactions", Map::class.java) .singleOrEmpty() // Ожидаем только один результат .map { result -> result["averageAmount"] as? Double ?: 0.0 }.defaultIfEmpty(0.0) // Если результат пустой, возвращаем 0.0 } suspend fun getTransactionsByTypes( spaceId: String, dateFrom: LocalDate, dateTo: LocalDate ): Map>? = coroutineScope { val pipeline = listOf( Document( "\$lookup", Document("from", "categories").append("localField", "category.\$id") .append("foreignField", "_id") .append("as", "categoryDetailed") ), Document( "\$lookup", Document("from", "spaces").append("localField", "space.\$id").append("foreignField", "_id") .append("as", "spaceDetailed") ), Document( "\$lookup", Document("from", "users").append("localField", "user.\$id").append("foreignField", "_id") .append("as", "userDetailed") ), Document( "\$unwind", Document("path", "\$categoryDetailed").append("preserveNullAndEmptyArrays", true) ), Document( "\$unwind", Document("path", "\$spaceDetailed").append("preserveNullAndEmptyArrays", true) ), Document( "\$unwind", Document("path", "\$userDetailed").append("preserveNullAndEmptyArrays", true) ), Document( "\$match", Document( "\$and", listOf( Document("spaceDetailed._id", ObjectId(spaceId)), Document( "date", Document( "\$gte", Date.from( LocalDateTime.of(dateFrom, LocalTime.MIN).atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC).toInstant() ) ) ), Document( "date", Document( "\$lt", Date.from( LocalDateTime.of(dateTo, LocalTime.MAX).atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC).toInstant() ) ) ) ) ) ), Document( "\$facet", Document( "plannedExpenses", listOf( Document( "\$match", Document("type.code", "PLANNED").append("categoryDetailed.type.code", "EXPENSE") ), Document("\$sort", Document("date", 1).append("_id", 1)) ) ).append( "plannedIncomes", listOf( Document( "\$match", Document("type.code", "PLANNED").append("categoryDetailed.type.code", "INCOME") ), Document("\$sort", Document("date", 1).append("_id", 1)) ) ).append( "instantTransactions", listOf( Document("\$match", Document("type.code", "INSTANT")), Document("\$sort", Document("date", 1).append("_id", 1)) ) ) ) ) val collection = reactiveMongoTemplate.getCollection("transactions").awaitSingle() val aggregationResult = collection.aggregate(pipeline, Document::class.java).awaitSingle() val plannedExpensesDeffer = async { extractTransactions(aggregationResult, "plannedExpenses") } val plannedIncomesDeffer = async { extractTransactions(aggregationResult, "plannedIncomes") } val instantTransactionsDeffer = async { extractTransactions(aggregationResult, "instantTransactions") } val (plannedExpenses, plannedIncomes, instantTransactions) = awaitAll( plannedExpensesDeffer, plannedIncomesDeffer, instantTransactionsDeffer ) mapOf( "plannedExpenses" to plannedExpenses, "plannedIncomes" to plannedIncomes, "instantTransactions" to instantTransactions ) } private suspend fun extractTransactions(aggregationResult: Document, key: String): List { val resultTransactions = aggregationResult[key] as? List ?: emptyList() return resultTransactions.map { documentToTransactionMapper(it) } } private fun documentToTransactionMapper(document: Document): Transaction { val transactionType = document["type"] as Document val user: User? val userDocument = document["userDetailed"] as Document user = User( id = (userDocument["_id"] as ObjectId).toString(), username = userDocument["username"] as String, firstName = userDocument["firstName"] as String, tgId = userDocument["tgId"] as String?, tgUserName = userDocument["tgUserName"]?.let { it as String }, password = null, isActive = userDocument["isActive"] as Boolean, regDate = (userDocument["regDate"] as Date).toInstant().atZone(ZoneId.systemDefault()) .toLocalDate(), createdAt = (userDocument["createdAt"] as Date).toInstant().atZone(ZoneId.systemDefault()) .toLocalDateTime(), roles = userDocument["roles"] as ArrayList, ) val categoryDocument = document["categoryDetailed"] as Document val categoryTypeDocument = categoryDocument["type"] as Document val category = Category( id = (categoryDocument["_id"] as ObjectId).toString(), type = CategoryType(categoryTypeDocument["code"] as String, categoryTypeDocument["name"] as String), name = categoryDocument["name"] as String, description = categoryDocument["description"] as String, icon = categoryDocument["icon"] as String ) return Transaction( (document["_id"] as ObjectId).toString(), null, TransactionType( transactionType["code"] as String, transactionType["name"] as String ), user = user, category = category, comment = document["comment"] as String, date = (document["date"] as Date).toInstant().atZone(ZoneId.systemDefault()).toLocalDate(), amount = document["amount"] as Double, isDone = document["isDone"] as Boolean, parentId = if (document["parentId"] != null) document["parentId"] as String else null, createdAt = LocalDateTime.ofInstant((document["createdAt"] as Date).toInstant(), ZoneOffset.UTC), ) } suspend fun getBudgetSumsByCategory(categoryId: String, budget: Budget): Document { logger.info("getting budget sums for category $categoryId") val pipeline = listOf( Document( "\$match", Document("category.\$id", ObjectId(categoryId)).append( "date", Document( "\$gte", Date.from( LocalDateTime.of(budget.dateFrom, LocalTime.MIN).atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC).toInstant() ) ).append( "\$lt", Date.from( LocalDateTime.of(budget.dateTo, LocalTime.MIN).atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC).toInstant() ) ) ) ), Document( "\$group", Document("_id", BsonNull()).append( "plannedAmount", Document( "\$sum", Document( "\$cond", listOf(Document("\$eq", listOf("\$type.code", "PLANNED")), "\$amount", 0.0) ) ) ).append( "instantAmount", Document( "\$sum", Document( "\$cond", listOf(Document("\$eq", listOf("\$type.code", "INSTANT")), "\$amount", 0.0) ) ) ) ), Document( "\$project", Document("_id", 0).append("plannedAmount", 1).append("instantAmount", 1) ) ) val collection = reactiveMongoTemplate.getCollection("transactions").awaitSingle() val result = collection.aggregate(pipeline).awaitFirstOrNull() return result ?: Document("plannedAmount", 0.0).append("instantAmount", 0.0) } suspend fun getBudgetCategories( spaceId: String, dateFrom: LocalDate, dateTo: LocalDate ): Map>? { val pipeline = listOf( Document( "\$lookup", Document("from", "spaces").append("localField", "space.\$id").append("foreignField", "_id") .append("as", "spaceDetailed") ), Document( "\$unwind", Document("path", "\$spaceDetailed").append("preserveNullAndEmptyArrays", true) ), Document( "\$match", Document( Document("spaceDetailed._id", ObjectId(spaceId)) ) ), Document( "\$lookup", Document("from", "transactions").append( "let", Document("categoryId", "\$_id") ).append( "pipeline", listOf( Document( "\$match", Document( "\$expr", Document( "\$and", listOf( Document("\$eq", listOf("\$category.\$id", "\$\$categoryId")), Document( "\$gte", listOf( "\$date", Date.from( LocalDateTime.of(dateFrom, LocalTime.MIN) .atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC).toInstant() ), ) ), Document( "\$lte", listOf( "\$date", Date.from( LocalDateTime.of(dateTo, LocalTime.MIN) .atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC).toInstant() ) ) ) ) ) ) ), Document( "\$group", Document("_id", "\$type.code").append( "totalAmount", Document("\$sum", "\$amount") ) ) ) ).append("as", "transactionSums") ), Document( "\$project", Document("_id", 1L).append("transactionSums", 1L) ), Document( "\$project", Document("_id", 1L).append( "plannedAmount", Document( "\$arrayElemAt", listOf( Document( "\$filter", Document("input", "\$transactionSums").append("as", "sum").append( "cond", Document("\$eq", listOf("\$\$sum._id", "PLANNED")) ) ), 0L ) ) ).append( "instantAmount", Document( "\$arrayElemAt", listOf( Document( "\$filter", Document("input", "\$transactionSums").append("as", "sum").append( "cond", Document("\$eq", listOf("\$\$sum._id", "INSTANT")) ) ), 0L ) ) ) ), Document( "\$addFields", Document( "plannedAmount", Document("\$ifNull", listOf("\$plannedAmount.totalAmount", 0.0)) ).append( "instantAmount", Document("\$ifNull", listOf("\$instantAmount.totalAmount", 0.0)) ) ) ) val collection = reactiveMongoTemplate.getCollection("categories").awaitSingle() val aggregation = collection.aggregate(pipeline, Document::class.java).asFlow().map { document -> val id = document["_id"].toString() val values = mapOf( "plannedAmount" to (document["plannedAmount"] as Double? ?: 0.0), "instantAmount" to (document["instantAmount"] as Double? ?: 0.0) ) // Возвращаем id и values в виде пары, чтобы дальше с ними можно было работать id to values }.toList().toMap() return aggregation } suspend fun getCategoryTransactionPipeline( spaceId: String, dateFrom: LocalDate, dateTo: LocalDate, catType: String? = "EXPENSE" ): List { val pipeline = listOf( Document( "\$lookup", Document("from", "spaces").append("localField", "space.\$id").append("foreignField", "_id") .append("as", "spaceDetailed") ), Document( "\$unwind", Document("path", "\$spaceDetailed").append("preserveNullAndEmptyArrays", true) ), Document( "\$match", Document( Document("spaceDetailed._id", ObjectId(spaceId)) ) ), Document("\$match", Document("type.code", catType)), Document( "\$lookup", Document("from", "transactions").append( "let", Document("categoryId", "\$_id") ).append( "pipeline", listOf( Document( "\$match", Document( "\$expr", Document( "\$and", listOf( Document("\$eq", listOf("\$category.\$id", "\$\$categoryId")), Document( "\$gte", listOf( "\$date", Date.from( LocalDateTime.of(dateFrom, LocalTime.MIN) .atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC).toInstant() ) ) ), Document( "\$lt", listOf( "\$date", Date.from( LocalDateTime.of(dateTo, LocalTime.MIN) .atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC).toInstant() ) ) ) ) ) ) ), Document( "\$group", Document("_id", "\$type.code").append( "totalAmount", Document("\$sum", "\$amount") ) ) ) ).append("as", "transactionSums") ), Document( "\$project", Document("_id", 1L).append("type", 1L).append("name", 1L).append("description", 1L) .append("icon", 1L) .append( "plannedAmount", Document( "\$arrayElemAt", listOf( Document( "\$filter", Document("input", "\$transactionSums").append("as", "sum").append( "cond", Document("\$eq", listOf("\$\$sum._id", "PLANNED")) ) ), 0.0 ) ) ).append( "instantAmount", Document( "\$arrayElemAt", listOf( Document( "\$filter", Document("input", "\$transactionSums").append("as", "sum").append( "cond", Document("\$eq", listOf("\$\$sum._id", "INSTANT")) ) ), 0.0 ) ) ) ), Document( "\$addFields", Document( "plannedAmount", Document("\$ifNull", listOf("\$plannedAmount.totalAmount", 0.0)) ).append( "instantAmount", Document("\$ifNull", listOf("\$instantAmount.totalAmount", 0.0)) ) ) ) val collection = reactiveMongoTemplate.getCollection("categories").awaitSingle() return collection.aggregate(pipeline, Document::class.java) .asFlow() // Преобразуем Flux в Flow .map { document -> val categoryType = document["type"] as Document BudgetCategory( currentSpent = document["instantAmount"] as Double, currentLimit = document["plannedAmount"] as Double, currentPlanned = document["plannedAmount"] as Double, category = Category( id = document["_id"].toString(), type = CategoryType(categoryType["code"] as String, categoryType["name"] as String), name = document["name"] as String, description = document["description"] as String, icon = document["icon"] as String ) ) }.toList() } // fun getCategorySumsPipeline(dateFrom: LocalDate, dateTo: LocalDate): Mono> { // val pipeline = listOf( // Document( // "\$lookup", // Document("from", "categories").append("localField", "category.\$id") // .append("foreignField", "_id") // .append("as", "categoryDetails") // ), Document("\$unwind", "\$categoryDetails"), Document( // "\$match", Document( // "date", Document( // "\$gte", Date.from( // LocalDateTime.of(dateFrom, LocalTime.MIN).atZone(ZoneId.systemDefault()) // .withZoneSameInstant(ZoneOffset.UTC).toInstant() // ) // ).append( // "\$lte", // LocalDateTime.of(dateTo, LocalTime.MIN).atZone(ZoneId.systemDefault()) // .withZoneSameInstant(ZoneOffset.UTC).toInstant() // ) // ) // ), Document( // "\$group", Document( // "_id", // Document("categoryId", "\$categoryDetails._id").append( // "categoryName", // "\$categoryDetails.name" // ) // .append("year", Document("\$year", "\$date")) // .append("month", Document("\$month", "\$date")) // ).append("totalAmount", Document("\$sum", "\$amount")) // ), Document( // "\$group", // Document("_id", "\$_id.categoryId").append( // "categoryName", // Document("\$first", "\$_id.categoryName") // ) // .append( // "monthlyData", Document( // "\$push", Document( // "month", Document( // "\$concat", listOf( // Document("\$toString", "\$_id.year"), "-", Document( // "\$cond", listOf( // Document("\$lt", listOf("\$_id.month", 10L)), Document( // "\$concat", listOf( // "0", Document("\$toString", "\$_id.month") // ) // ), Document("\$toString", "\$_id.month") // ) // ) // ) // ) // ).append("totalAmount", "\$totalAmount") // ) // ) // ), Document( // "\$addFields", Document( // "completeMonthlyData", Document( // "\$map", // Document("input", Document("\$range", listOf(0L, 6L))).append("as", "offset").append( // "in", Document( // "month", Document( // "\$dateToString", Document("format", "%Y-%m").append( // "date", Document( // "\$dateAdd", // Document("startDate", Date(1754006400000L)).append("unit", "month") // .append( // "amount", Document("\$multiply", listOf("\$\$offset", 1L)) // ) // ) // ) // ) // ).append( // "totalAmount", Document( // "\$let", Document( // "vars", Document( // "matched", Document( // "\$arrayElemAt", listOf( // Document( // "\$filter", // Document("input", "\$monthlyData").append("as", "data") // .append( // "cond", Document( // "\$eq", listOf( // "\$\$data.month", Document( // "\$dateToString", // Document("format", "%Y-%m").append( // "date", Document( // "\$dateAdd", Document( // "startDate", Date( // 1733011200000L // ) // ).append( // "unit", "month" // ).append( // "amount", // Document( // "\$multiply", // listOf( // "\$\$offset", // 1L // ) // ) // ) // ) // ) // ) // ) // ) // ) // ), 0L // ) // ) // ) // ).append( // "in", Document("\$ifNull", listOf("\$\$matched.totalAmount", 0L)) // ) // ) // ) // ) // ) // ) // ), Document( // "\$project", // Document("_id", 0L).append("categoryId", "\$_id").append("categoryName", "\$categoryName") // .append("monthlyData", "\$completeMonthlyData") // ) // ) // // return reactiveMongoTemplate.getCollection("transactions").flatMapMany { it.aggregate(pipeline) }.map { // it["categoryId"] = it["categoryId"].toString() // it // }.collectList() // } suspend fun getCategorySummaries(spaceId: String, dateFrom: LocalDate): List { val sixMonthsAgo = Date.from( LocalDateTime.of(dateFrom, LocalTime.MIN).atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC) .toInstant() ) // Пример даты, можно заменить на вычисляемую val aggregation = listOf( // 1. Фильтр за последние 6 месяцев Document( "\$lookup", Document("from", "spaces").append("localField", "space.\$id").append("foreignField", "_id") .append("as", "spaceInfo") ), // 4. Распаковываем массив категорий Document("\$unwind", "\$spaceInfo"), Document("\$match", Document("spaceInfo._id", ObjectId(spaceId))), Document( "\$match", Document("date", Document("\$gte", sixMonthsAgo).append("\$lt", Date())).append( "type.code", "INSTANT" ) ), // 2. Группируем по категории + (год, месяц) Document( "\$group", Document( "_id", Document("category", "\$category.\$id").append("year", Document("\$year", "\$date")) .append("month", Document("\$month", "\$date")) ).append("totalAmount", Document("\$sum", "\$amount")) ), // 3. Подтягиваем информацию о категории Document( "\$lookup", Document("from", "categories").append("localField", "_id.category") .append("foreignField", "_id") .append("as", "categoryInfo") ), // 4. Распаковываем массив категорий Document("\$unwind", "\$categoryInfo"), // 5. Фильтруем по типу категории (EXPENSE) Document("\$match", Document("categoryInfo.type.code", "EXPENSE")), // 6. Группируем обратно по категории, собирая все (год, месяц, total) Document( "\$group", Document("_id", "\$_id.category").append( "categoryName", Document("\$first", "\$categoryInfo.name") ) .append("categoryType", Document("\$first", "\$categoryInfo.type.code")) .append("categoryIcon", Document("\$first", "\$categoryInfo.icon")).append( "monthlySums", Document( "\$push", Document("year", "\$_id.year").append("month", "\$_id.month") .append("total", "\$totalAmount") ) ) ), // 7. Формируем единый массив из 6 элементов: // - каждый элемент = {year, month, total}, // - если нет записей за месяц, ставим total=0 Document( "\$project", Document("categoryName", 1).append("categoryType", 1).append("categoryIcon", 1).append( "monthlySums", Document( "\$map", Document("input", Document("\$range", listOf(0, 6))).append("as", "i").append( "in", Document( "\$let", Document( "vars", Document( "subDate", Document( "\$dateSubtract", Document("startDate", Date()).append("unit", "month") .append("amount", "$\$i") ) ) ).append( "in", Document("year", Document("\$year", "$\$subDate")).append( "month", Document("\$month", "$\$subDate") ).append( "total", Document( "\$ifNull", listOf( Document( "\$getField", Document("field", "total").append( "input", Document( "\$arrayElemAt", listOf( Document( "\$filter", Document( "input", "\$monthlySums" ).append("as", "ms").append( "cond", Document( "\$and", listOf( Document( "\$eq", listOf( "$\$ms.year", Document( "\$year", "$\$subDate" ) ) ), Document( "\$eq", listOf( "$\$ms.month", Document( "\$month", "$\$subDate" ) ) ) ) ) ) ), 0.0 ) ) ) ), 0.0 ) ) ) ) ) ) ) ) ), // 8. Сортируем результат по имени категории Document("\$sort", Document("categoryName", 1)) ) // Выполняем агрегацию return reactiveMongoTemplate.getCollection("transactions").flatMapMany { it.aggregate(aggregation) } .map { document -> // Преобразуем _id в строку document["_id"] = document["_id"].toString() // Получаем monthlySums и приводим к изменяемому списку val monthlySums = (document["monthlySums"] as? List<*>)?.map { monthlySum -> if (monthlySum is Document) { // Создаем копию Document, чтобы избежать изменений в исходном списке Document(monthlySum).apply { // Добавляем поле date val date = LocalDate.of(getInteger("year"), getInteger("month"), 1) this["date"] = date } } else { monthlySum } }?.toMutableList() // Сортируем monthlySums по полю date val sortedMonthlySums = monthlySums?.sortedBy { (it as? Document)?.get("date") as? LocalDate } // Рассчитываем разницу между текущим и предыдущим месяцем var previousMonthSum = 0.0 sortedMonthlySums?.forEach { monthlySum -> if (monthlySum is Document) { val currentMonthSum = monthlySum.getDouble("total") ?: 0.0 // Рассчитываем разницу в процентах val difference = if (previousMonthSum != 0.0 && currentMonthSum != 0.0) { (((currentMonthSum - previousMonthSum) / previousMonthSum) * 100).toInt() } else { 0 } // Добавляем поле difference monthlySum["difference"] = difference // Обновляем previousMonthSum для следующей итерации previousMonthSum = currentMonthSum } } // Обновляем документ с отсортированными и обновленными monthlySums document["monthlySums"] = sortedMonthlySums document }.collectList().awaitSingle() } }