package space.luminic.budgerapp.services import com.mongodb.DBRef import org.bson.BsonNull import org.bson.Document import org.bson.types.ObjectId import org.slf4j.LoggerFactory import org.springframework.cache.annotation.CacheEvict import org.springframework.cache.annotation.Cacheable import org.springframework.data.domain.Sort import org.springframework.data.domain.Sort.Direction import org.springframework.data.mongodb.core.ReactiveMongoTemplate import org.springframework.data.mongodb.core.aggregation.Aggregation.* import org.springframework.data.mongodb.core.aggregation.DateOperators.DateToString import org.springframework.data.mongodb.core.aggregation.SortOperation import org.springframework.data.mongodb.core.query.Criteria import org.springframework.data.mongodb.core.query.Query import org.springframework.security.core.context.ReactiveSecurityContextHolder import org.springframework.stereotype.Service import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.scheduler.Schedulers import space.luminic.budgerapp.mappers.BudgetMapper import space.luminic.budgerapp.mappers.TransactionsMapper import space.luminic.budgerapp.models.* import space.luminic.budgerapp.repos.BudgetRepo import space.luminic.budgerapp.repos.CategoryRepo import space.luminic.budgerapp.repos.TransactionRepo import space.luminic.budgerapp.repos.WarnRepo import java.time.* import java.time.temporal.TemporalAdjusters import java.util.* @Service class FinancialService( val budgetRepo: BudgetRepo, val warnRepo: WarnRepo, val transactionsRepo: TransactionRepo, val recurrentService: RecurrentService, val userService: UserService, val reactiveMongoTemplate: ReactiveMongoTemplate, private val categoryRepo: CategoryRepo, val transactionsMapper: TransactionsMapper, val budgetMapper: BudgetMapper ) { private val logger = LoggerFactory.getLogger(FinancialService::class.java) fun updateBudgetOnCreate(transaction: Transaction): Mono { return findProjectedBudget( transaction.space!!.id!!, budgetId = null, transaction.date, transaction.date ).flatMap { budget -> val budgetCategory = budget.categories.firstOrNull { it.category.id == transaction.category.id } if (transaction.category.type.code == "INCOME") { return@flatMap budgetRepo.save(budget) } else if (budgetCategory == null) { return@flatMap Mono.error(RuntimeException("Budget category not found in the budget")) } return@flatMap getBudgetSumsByCategory(transaction.category.id!!, budget).flatMap { sums -> budgetCategory.currentPlanned = sums.getDouble("plannedAmount") ?: 0.0 budgetCategory.currentSpent = sums.getDouble("instantAmount") ?: 0.0 // При совпадении бюджетов разница просто корректирует лимит if (transaction.type.code == "PLANNED") { budgetCategory.currentLimit += transaction.amount } logger.info("updateBudgetOnCreate end") budgetRepo.save(budget).then() } // }.then() // Возвращаем корректный Mono } fun updateBudgetOnEdit( oldTransaction: Transaction, newTransaction: Transaction, difference: Double ): Mono { logger.info("updateBudgetOnEdit start ") return Mono.zip( findProjectedBudget( newTransaction.space!!.id!!, budgetId = null, oldTransaction.date, oldTransaction.date ).map { logger.info("got old budget") it }.switchIfEmpty(Mono.error(BudgetNotFoundException("Old budget cannot be null"))), findProjectedBudget( newTransaction.space!!.id!!, budgetId = null, newTransaction.date, newTransaction.date ).map { logger.info("got new budget") it }.switchIfEmpty(Mono.error(BudgetNotFoundException("New budget cannot be null"))) ).flatMap { tuple -> val oldBudget = tuple.t1 val newBudget = tuple.t2 val isSameBudget = oldBudget.id == newBudget.id if (isSameBudget) { // Если бюджеты совпадают — обновляем соответствующую категорию в новом (едином) бюджете. val budgetCategory = if (newTransaction.category.type.code == "EXPENSE") newBudget.categories.firstOrNull { it.category.id == newTransaction.category.id } else newBudget.incomeCategories.firstOrNull { it.category.id == newTransaction.category.id } if (budgetCategory == null) { return@flatMap Mono.error(RuntimeException("Budget category not found in the budget")) } return@flatMap getBudgetSumsByCategory(newTransaction.category.id!!, newBudget).flatMap { sums -> budgetCategory.currentPlanned = sums.getDouble("plannedAmount") ?: 0.0 budgetCategory.currentSpent = sums.getDouble("instantAmount") ?: 0.0 // При совпадении бюджетов разница просто корректирует лимит if (newTransaction.type.code == "PLANNED") { budgetCategory.currentLimit += difference } logger.info("updateBudgetOnEdit end") budgetRepo.save(newBudget).then() } } else { // Если бюджеты различаются — отдельно обновляем категории в старом и новом бюджетах. val oldBudgetCategory = oldBudget.categories.firstOrNull { it.category.id == oldTransaction.category.id } val newBudgetCategory = newBudget.categories.firstOrNull { it.category.id == newTransaction.category.id } val oldUpdate: Mono = if (oldBudgetCategory == null) { Mono.error(RuntimeException("Old budget category not found")) } else { getBudgetSumsByCategory(oldTransaction.category.id!!, oldBudget).flatMap { sums -> oldBudgetCategory.currentPlanned = sums.getDouble("plannedAmount") ?: 0.0 oldBudgetCategory.currentSpent = sums.getDouble("instantAmount") ?: 0.0 // В старом бюджете вычитаем разницу, так как транзакция перемещается if (oldTransaction.type.code == "PLANNED") { oldBudgetCategory.currentLimit -= difference } budgetRepo.save(oldBudget).then() } } val newUpdate: Mono = if (newBudgetCategory == null) { Mono.error(RuntimeException("New budget category not found")) } else { getBudgetSumsByCategory(newTransaction.category.id!!, newBudget).flatMap { sums -> newBudgetCategory.currentPlanned = sums.getDouble("plannedAmount") ?: 0.0 newBudgetCategory.currentSpent = sums.getDouble("instantAmount") ?: 0.0 // В новом бюджете прибавляем разницу if (newTransaction.type.code == "PLANNED") { newBudgetCategory.currentLimit += difference } budgetRepo.save(newBudget).then() } } logger.info("updateBudgetOnEdit end") return@flatMap Mono.`when`(oldUpdate, newUpdate).then() } } } fun updateBudgetOnDelete(transaction: Transaction): Mono { return findProjectedBudget( transaction.space!!.id!!, budgetId = null, transaction.date, transaction.date ).flatMap { budget -> getBudgetCategories(transaction.space?.id!!, budget.dateFrom, budget.dateTo).flatMap { categories -> val updatedCategoriesMono: Mono> = when (transaction.type.code) { "PLANNED" -> Flux.fromIterable(budget.categories).map { category -> if (category.category.id == transaction.category.id) { categories[category.category.id]?.let { data -> category.currentSpent = data["instantAmount"] ?: 0.0 category.currentPlanned = data["plannedAmount"] ?: 0.0 category.currentLimit -= transaction.amount } } category }.collectList() "INSTANT" -> Flux.fromIterable(budget.categories).map { category -> if (category.category.id == transaction.category.id) { categories[category.category.id]?.let { data -> category.currentSpent = data["instantAmount"] ?: 0.0 category.currentPlanned = data["plannedAmount"] ?: 0.0 } } category }.collectList() else -> Mono.just(budget.categories) } updatedCategoriesMono.flatMap { updated -> budget.categories = updated.toMutableList() budgetRepo.save(budget).then() // Гарантируем завершение } } }.then() // Возвращаем корректный Mono } fun getBudgets(spaceId: String, sortSetting: SortSetting? = null): Mono> { val sort = sortSetting?.let { Sort.by(it.order, it.by) } ?: Sort.by(Direction.DESC, "dateFrom") return findProjectedBudgets(ObjectId(spaceId), sort) } fun findProjectedBudgets(spaceId: ObjectId, sortRequested: Sort? = null): Mono> { val lookupCategories = lookup("categories", "categories.category.\$id", "_id", "categoriesDetails") val lookupIncomeCategories = lookup("categories", "incomeCategories.category.\$id", "_id", "incomeCategoriesDetails") val lookupSpace = lookup("spaces", "space.\$id", "_id", "spaceDetails") val unwindSpace = unwind("spaceDetails") val matchStage = match(Criteria.where("spaceDetails._id").`is`(spaceId)) // matchCriteria.add(Criteria.where("spaceDetails._id").`is`(ObjectId(spaceId))) val projectStage = project("_id", "name", "dateFrom", "dateTo") // Оставляем только нужные поля val sort = sortRequested?.let { sort(it) } ?: sort( Sort.by(Direction.DESC, "date").and(Sort.by(Direction.DESC, "createdAt")) ) val aggregation = newAggregation(lookupCategories, lookupIncomeCategories, lookupSpace, unwindSpace, matchStage, sort) return reactiveMongoTemplate.aggregate(aggregation, "budgets", Document::class.java).collectList().map { docs -> docs.map { doc -> budgetMapper.fromDocument(doc) }.toMutableList() } } fun findProjectedBudget( spaceId: String, budgetId: String? = null, dateFrom: LocalDate? = null, dateTo: LocalDate? = null ): Mono { val lookupCategories = lookup("categories", "categories.category.\$id", "_id", "categoriesDetails") val unwindCategories = unwind("categoriesDetails") val lookupIncomeCategories = lookup("categories", "incomeCategories.category.\$id", "_id", "incomeCategoriesDetails") val unwindIncomeCategories = unwind("incomeCategoriesDetails") val lookupSpace = lookup("spaces", "space.\$id", "_id", "spaceDetails") val unwindSpace = unwind("spaceDetails") val matchCriteria = mutableListOf() budgetId?.let { matchCriteria.add(Criteria.where("_id").`is`(ObjectId(it))) } dateFrom?.let { matchCriteria.add(Criteria.where("dateFrom").lte(dateTo!!)) } dateTo?.let { matchCriteria.add(Criteria.where("dateTo").gte(it)) } matchCriteria.add(Criteria.where("spaceDetails._id").`is`(ObjectId(spaceId))) val matchStage = match(Criteria().andOperator(*matchCriteria.toTypedArray())) val aggregation = newAggregation(lookupCategories, lookupIncomeCategories, lookupSpace, unwindSpace, matchStage) return reactiveMongoTemplate.aggregate(aggregation, "budgets", Document::class.java).next().map { doc -> budgetMapper.fromDocument(doc) } } // @Cacheable("budgets", key = "#id") fun getBudget(spaceId: String, id: String): Mono { return findProjectedBudget( spaceId, id ).switchIfEmpty(Mono.error(IllegalArgumentException("Budget not found for spaceId: $spaceId and budgetId: $id"))) .flatMap { budget -> // Если доступ есть, продолжаем процесс val budgetDTO = BudgetDTO( budget.id, budget.space, budget.name, budget.dateFrom, budget.dateTo, budget.createdAt, categories = budget.categories, incomeCategories = budget.incomeCategories, ) logger.info("Fetching categories and transactions") val categoriesMono = getBudgetCategories(spaceId, budgetDTO.dateFrom, budgetDTO.dateTo) val transactionsMono = getTransactionsByTypes(spaceId, budgetDTO.dateFrom, budgetDTO.dateTo) Mono.zip(categoriesMono, transactionsMono).flatMap { tuple -> val categories = tuple.t1 val transactions = tuple.t2 Flux.fromIterable(budgetDTO.categories).map { category -> categories[category.category.id]?.let { data -> category.currentSpent = data["instantAmount"] ?: 0.0 category.currentPlanned = data["plannedAmount"] ?: 0.0 } category }.collectList().map { updatedCategories -> budgetDTO.categories = updatedCategories budgetDTO.plannedExpenses = transactions["plannedExpenses"] as MutableList budgetDTO.plannedIncomes = transactions["plannedIncomes"] as MutableList budgetDTO.transactions = transactions["instantTransactions"] as MutableList budgetDTO } } }.doOnError { error -> logger.error("Error fetching budget: ${error.message}", error) }.switchIfEmpty(Mono.error(BudgetNotFoundException("Budget not found with id: $id"))) } // fun regenBudgets(): Mono { // return budgetRepo.findAll() // .flatMap { budget -> // spaceService.getSpace("67af3c0f652da946a7dd9931") // .map { space -> // budget.space = space // budget // } // .flatMap { updatedBudget -> budgetRepo.save(updatedBudget) } // } // .then() // } // fun regenTransactions(): Mono { // return transactionsRepo.findAll().flatMap { transaction -> // spaceService.getSpace("67af3c0f652da946a7dd9931") // .map { space -> // transaction.space = space // transaction // } // .flatMap { updatedTransaction -> transactionsRepo.save(updatedTransaction) } // } // .then() // } fun regenCats(): Mono { return categoryRepo.findBySpaceId(ObjectId("67b352b13384483a1c2282ed")).flatMap { cat -> // if (cat.space?.id == "67b352b13384483a1c2282ed") { categoryRepo.deleteById(cat.id!!) // Возвращаем `Mono` // } else { // Mono.empty() // Если не удаляем, возвращаем пустой `Mono` // } }.then() // Убедимся, что все операции завершены } fun createCategory(space: Space, category: Category): Mono { category.space = space return categoryRepo.save(category) .flatMap { savedCategory -> findProjectedBudgets(ObjectId(space.id)) .flatMapMany { Flux.fromIterable(it) } // Преобразуем List в Flux .flatMap { budget -> when (savedCategory.type.code) { "INCOME" -> budget.incomeCategories.add(BudgetCategory(0.0, 0.0, 0.0, savedCategory)) "EXPENSE" -> budget.categories.add(BudgetCategory(0.0, 0.0, 0.0, savedCategory)) } budgetRepo.save(budget) // Сохраняем каждый обновленный бюджет } .then(Mono.just(savedCategory)) // Возвращаем сохраненную категорию после обработки всех бюджетов } } fun createBudget(space: Space, budget: Budget, createRecurrent: Boolean): Mono { return Mono.zip(getBudgetByDate(budget.dateFrom, space.id!!).map { Optional.ofNullable(it) } .switchIfEmpty(Mono.just(Optional.empty())), getBudgetByDate(budget.dateTo, space.id!!).map { Optional.ofNullable(it) } .switchIfEmpty(Mono.just(Optional.empty()))).flatMap { tuple -> val startBudget = tuple.t1.orElse(null) val endBudget = tuple.t2.orElse(null) // Проверяем, пересекаются ли бюджеты по датам if (startBudget != null || endBudget != null) { return@flatMap Mono.error(IllegalArgumentException("Бюджет с теми же датами найден")) } // Получаем Space по spaceId // Присваиваем Space бюджету budget.space = space // Если createRecurrent=true, создаем рекуррентные транзакции val recurrentsCreation = if (createRecurrent) { recurrentService.createRecurrentsForBudget(space, budget) } else { Mono.empty() } // Создаем бюджет после возможного создания рекуррентных транзакций recurrentsCreation.then( getCategoryTransactionPipeline( space.id!!, budget.dateFrom, budget.dateTo ).flatMap { categories -> budget.categories = categories budgetRepo.save(budget) }.publishOn(Schedulers.boundedElastic()).doOnNext { savedBudget -> // Выполнение updateBudgetWarns в фоне updateBudgetWarns(budget = savedBudget).doOnError { error -> // Логируем ошибку, если произошла logger.error("Error during updateBudgetWarns: ${error.message}") }.subscribe() }).then( getCategoryTransactionPipeline( space.id!!, budget.dateFrom, budget.dateTo, "INCOME" ).flatMap { categories -> budget.incomeCategories = categories budgetRepo.save(budget) }.publishOn(Schedulers.boundedElastic()).doOnNext { savedBudget -> // Выполнение updateBudgetWarns в фоне updateBudgetWarns(budget = savedBudget).doOnError { error -> // Логируем ошибку, если произошла logger.error("Error during updateBudgetWarns: ${error.message}") }.subscribe() }) } } fun getBudgetByDate(date: LocalDate, spaceId: String): Mono { return budgetRepo.findByDateFromLessThanEqualAndDateToGreaterThanEqualAndSpace(date, date, ObjectId(spaceId)) .switchIfEmpty(Mono.empty()) } fun getBudgetCategories(id: String): Mono> { return budgetRepo.findById(id).flatMap { budget -> val lookup = lookup("categories", "category.\$id", "_id", "categoryDetailed") val unwind = unwind("categoryDetailed") val projectDouble = project("categoryDetailed", "amount", "date").andExpression("{ \$toDouble: \"\$amount\" }") .`as`("amount") val match = match(Criteria.where("date").gte(budget.dateFrom).lt(budget.dateTo)) val group = group("categoryDetailed").sum("amount").`as`("currentSpent") val project = project("currentSpent").and("_id").`as`("category") val sort = sort(Sort.by(Sort.Order.asc("_id"))) val aggregation = newAggregation(lookup, unwind, projectDouble, match, group, project, sort) reactiveMongoTemplate.aggregate(aggregation, "transactions", BudgetCategory::class.java) .collectList() // Преобразование результата в список } } // fun getBudgetTransactionsByType(budgetId: String): Mono>> { // return budgetRepo.findById(budgetId).flatMap { it -> // getTransactionsByTypes(it.dateFrom, it.dateTo) // } // } fun deleteBudget(spaceId: String, budgetId: String): Mono { return findProjectedBudget( spaceId, budgetId ).switchIfEmpty(Mono.error(IllegalArgumentException("Budget not found for spaceId: $spaceId and budgetId: $budgetId"))) .flatMap { budget -> getTransactionsToDelete(spaceId, budget.dateFrom, budget.dateTo).flatMapMany { transactions -> Flux.fromIterable(transactions).flatMap { transaction -> deleteTransaction(transaction) } }.then( budgetRepo.deleteById(budget.id!!) ) } } fun setCategoryLimit(spaceId: String, budgetId: String, catId: String, limit: Double): Mono { return findProjectedBudget(spaceId, budgetId = budgetId).flatMap { budget -> val catEdit = budget.categories.firstOrNull { it.category.id == catId } ?: return@flatMap Mono.error( Exception("Category not found in the budget") ) calcTransactionsSum(spaceId, budget, catId, "PLANNED").flatMap { catPlanned -> if (catPlanned > limit) { Mono.error(Exception("Limit can't be less than planned expenses on category. Current planned value: $catPlanned")) } else { catEdit.currentLimit = limit budgetRepo.save(budget).flatMap { updateBudgetWarns(it).thenReturn(catEdit) } } } } } fun getWarns(budgetId: String, isHide: Boolean? = null): Mono> { return warnRepo.findAllByBudgetIdAndIsHide(budgetId, isHide == true).collectList() } fun hideWarn(warnId: String): Mono { return warnRepo.findById(warnId) // Ищем предупреждение .flatMap { warn -> warn.isHide = true // Обновляем поле warnRepo.save(warn) // Сохраняем изменённое предупреждение } } fun updateBudgetWarns(budget: Budget? = null): Mono> { logger.info("STARTED WARNS UPDATE") val finalBudgetMono = budget?.let { Mono.just(it) } ?: return Mono.just(emptyList()) return finalBudgetMono.flatMap { finalBudget -> if (finalBudget.categories.isEmpty()) { logger.info("No categories found for budget ${finalBudget.id}") return@flatMap Mono.just(emptyList()) } val averageSumsMono = getAverageSpendingByCategory() val averageIncomeMono = getAverageIncome() val currentBudgetIncomeMono = calcTransactionsSum( budget.space!!.id!!, finalBudget, transactionType = "PLANNED", categoryType = "INCOME" ) val plannedIncomeMono = calcTransactionsSum( budget.space!!.id!!, finalBudget, categoryType = "INCOME", transactionType = "PLANNED" ) val plannedSavingMono = calcTransactionsSum( budget.space!!.id!!, finalBudget, categoryId = "675850148198643f121e466a", transactionType = "PLANNED" ) Mono.zip( averageSumsMono, averageIncomeMono, currentBudgetIncomeMono, plannedIncomeMono, plannedSavingMono ).flatMap { tuple -> val averageSums = tuple.t1 val averageIncome = tuple.t2 val currentBudgetIncome = tuple.t3 val plannedIncome = tuple.t4 val plannedSaving = tuple.t5 Flux.fromIterable(finalBudget.categories).flatMap { category -> processCategoryWarnings( category, finalBudget, averageSums, averageIncome, currentBudgetIncome, plannedIncome, plannedSaving ) }.collectList().flatMap { warns -> warnRepo.saveAll(warns.filterNotNull()).collectList() }.doOnSuccess { logger.info("ENDED WARNS UPDATE") } .map { it.sortedByDescending { warn -> warn.serenity.sort } } } }.doOnError { error -> logger.error("Error updating budget warns: ${error.message}", error) }.onErrorResume { Mono.just(emptyList()) // Возвращаем пустой список в случае ошибки } } private fun processCategoryWarnings( category: BudgetCategory, finalBudget: Budget, averageSums: Map, averageIncome: Double, currentBudgetIncome: Double, plannedIncome: Double, plannedSaving: Double ): Flux { val warnsForCategory = mutableListOf>() val averageSum = averageSums[category.category.id] ?: 0.0 val categorySpentRatioInAvgIncome = if (averageIncome > 0.0) averageSum / averageIncome else 0.0 val projectedAvailableSum = currentBudgetIncome * categorySpentRatioInAvgIncome val contextAtAvg = "category${category.category.id}atbudget${finalBudget.id}lessavg" val lowSavingContext = "savingValueLess10atBudget${finalBudget.id}" if (averageSum > category.currentLimit) { val warnMono = warnRepo.findWarnByContext(contextAtAvg).switchIfEmpty( Mono.just( Warn( serenity = WarnSerenity.MAIN, message = PushMessage( title = "Внимание на ${category.category.name}!", body = "Лимит меньше средних трат (Среднее: ${averageSum.toInt()} ₽ Текущий лимит: ${category.currentLimit.toInt()} ₽)." + "\nСредняя доля данной категории в доходах: ${(categorySpentRatioInAvgIncome * 100).toInt()}%." + "\nПроецируется на текущие поступления: ${projectedAvailableSum.toInt()} ₽", icon = category.category.icon ), budgetId = finalBudget.id!!, context = contextAtAvg, isHide = false ) ) ) warnsForCategory.add(warnMono) } else { warnRepo.findWarnByContext(contextAtAvg).flatMap { warnRepo.delete(it).then(Mono.empty()) } } if (category.category.id == "675850148198643f121e466a") { val savingRatio = if (plannedIncome > 0.0) category.currentLimit / plannedIncome else 0.0 if (savingRatio < 0.1) { val warnMono = warnRepo.findWarnByContext(lowSavingContext).switchIfEmpty( Mono.just( Warn( serenity = WarnSerenity.IMPORTANT, message = PushMessage( title = "Доля сбережений очень мала!", body = "Текущие плановые сбережения равны ${plannedSaving.toInt()} (${ (savingRatio * 100).toInt() }%)! Исправьте!", icon = category.category.icon ), budgetId = finalBudget.id!!, context = lowSavingContext, isHide = false ) ) ) warnsForCategory.add(warnMono) } else { warnRepo.findWarnByContext(lowSavingContext).flatMap { warnRepo.delete(it).then(Mono.empty()) } } } return Flux.fromIterable(warnsForCategory).flatMap { it } } fun getTransactions( spaceId: String, dateFrom: LocalDate? = null, dateTo: LocalDate? = null, transactionType: String? = null, isDone: Boolean? = null, categoryId: String? = null, categoryType: String? = null, userId: String? = null, parentId: String? = null, isChild: Boolean? = null, sortSetting: SortSetting? = null, limit: Int? = null, offset: Int? = null, ): Mono> { val matchCriteria = mutableListOf() // Добавляем фильтры matchCriteria.add(Criteria.where("spaceDetails._id").`is`(ObjectId(spaceId))) dateFrom?.let { matchCriteria.add(Criteria.where("date").gte(it)) } dateTo?.let { matchCriteria.add(Criteria.where("date").lt(it)) } transactionType?.let { matchCriteria.add(Criteria.where("type.code").`is`(it)) } isDone?.let { matchCriteria.add(Criteria.where("isDone").`is`(it)) } categoryId?.let { matchCriteria.add(Criteria.where("categoryDetails._id").`is`(it)) } categoryType?.let { matchCriteria.add( Criteria.where("categoryDetails.type.code").`is`(it) ) } userId?.let { matchCriteria.add(Criteria.where("userDetails._id").`is`(ObjectId(it))) } parentId?.let { matchCriteria.add(Criteria.where("parentId").`is`(it)) } isChild?.let { matchCriteria.add(Criteria.where("parentId").exists(it)) } // Сборка агрегации val lookup = lookup("categories", "category.\$id", "_id", "categoryDetails") val unwindCategory = unwind("categoryDetails") val lookupSpaces = lookup("spaces", "space.\$id", "_id", "spaceDetails") val unwindSpace = unwind("spaceDetails") val lookupUsers = lookup("users", "user.\$id", "_id", "userDetails") val unwindUser = unwind("userDetails") val match = match(Criteria().andOperator(*matchCriteria.toTypedArray())) var sort = sort(Sort.by(Direction.DESC, "date").and(Sort.by(Direction.DESC, "createdAt"))) sortSetting?.let { sort = sort(Sort.by(it.order, it.by).and(Sort.by(Direction.ASC, "createdAt"))) } val aggregationBuilder = mutableListOf( lookup, unwindCategory, lookupSpaces, unwindSpace, lookupUsers, unwindUser, match.takeIf { matchCriteria.isNotEmpty() }, sort, offset?.let { skip(it.toLong()) }, limit?.let { limit(it.toLong()) }).filterNotNull() val aggregation = newAggregation(aggregationBuilder) return reactiveMongoTemplate.aggregate( aggregation, "transactions", Document::class.java ).collectList().map { docs -> val test = docs.map { doc -> transactionsMapper.fromDocument(doc) }.toMutableList() test } } fun getTransactionByParentId( parentId: String ): Mono { // Сборка агрегации val lookup = lookup("categories", "category.\$id", "_id", "categoryDetails") val unwindCategory = unwind("categoryDetails") val lookupSpaces = lookup("spaces", "space.\$id", "_id", "spaceDetails") val unwindSpace = unwind("spaceDetails") val lookupUsers = lookup("users", "user.\$id", "_id", "userDetails") val unwindUser = unwind("userDetails") val match = match(Criteria.where("parentId").`is`(parentId)) val aggregationBuilder = mutableListOf( lookup, unwindCategory, lookupSpaces, unwindSpace, lookupUsers, unwindUser, match ) val aggregation = newAggregation(aggregationBuilder) return reactiveMongoTemplate.aggregate( aggregation, "transactions", Document::class.java ).map { doc -> transactionsMapper.fromDocument(doc) }.next() } fun getTransactionsToDelete(spaceId: String, dateFrom: LocalDate, dateTo: LocalDate): Mono> { val matchCriteria = mutableListOf() val lookupSpaces = lookup("spaces", "space.\$id", "_id", "spaceDetails") val unwindSpace = unwind("spaceDetails") val lookupCategory = lookup("categories", "category.\$id", "_id", "categoryDetails") val unwindCategory = unwind("categoryDetails") matchCriteria.add(Criteria.where("spaceDetails._id").`is`(ObjectId(spaceId))) matchCriteria.add(Criteria.where("date").gte(dateFrom)) matchCriteria.add(Criteria.where("date").lt(dateTo)) matchCriteria.add( Criteria().orOperator( Criteria.where("type.code").`is`("PLANNED"), Criteria.where("parentId").exists(true) ) ) val match = match(Criteria().andOperator(*matchCriteria.toTypedArray())) val project = project("_id", "type", "comment", "date", "amount", "isDone", "categoryDetails") val aggregationBuilder = mutableListOf( lookupSpaces, unwindSpace, lookupCategory, unwindCategory, match, project ) val aggregation = newAggregation(aggregationBuilder) return reactiveMongoTemplate.aggregate(aggregation, "transactions", Document::class.java) .collectList() // Собирать все результаты в список .map { docs -> docs.map { doc -> transactionsMapper.fromDocument(doc) } } } fun getTransactionById(id: String): Mono { val lookupSpaces = lookup("spaces", "space.\$id", "_id", "spaceDetails") val unwindSpaces = unwind("spaceDetails") val lookupCategory = lookup("categories", "category.\$id", "_id", "categoryDetails") val unwindCategory = unwind("categoryDetails") val lookupUser = lookup("users", "user.\$id", "_id", "userDetails") val unwindUser = unwind("userDetails") val matchCriteria = mutableListOf() matchCriteria.add(Criteria.where("_id").`is`(ObjectId(id))) val match = match(Criteria().andOperator(*matchCriteria.toTypedArray())) // val project = project("_iid", "type", "comment", "date", "amount", "isDone") val aggregationBuilder = mutableListOf( lookupSpaces, unwindSpaces, lookupCategory, unwindCategory, lookupUser, unwindUser, match ) val aggregation = newAggregation(aggregationBuilder) return reactiveMongoTemplate.aggregate(aggregation, "transactions", Document::class.java).next().map { doc -> transactionsMapper.fromDocument(doc) } } fun createTransaction(space: Space, transaction: Transaction): Mono { return ReactiveSecurityContextHolder.getContext().map { it.authentication }.flatMap { authentication -> val username = authentication.name userService.getByUsername(username) .switchIfEmpty(Mono.error(IllegalArgumentException("User not found for username: $username"))) .flatMap { user -> if (space.users.none { it.id.toString() == user.id }) { return@flatMap Mono.error(IllegalArgumentException("User does not have access to this Space")) } // Привязываем space и user к транзакции transaction.user = user transaction.space = space transactionsRepo.save(transaction).flatMap { savedTransaction -> updateBudgetOnCreate(savedTransaction).thenReturn(savedTransaction) // Ждём выполнения updateBudgetOnCreate перед возвратом } } } } @CacheEvict(cacheNames = ["transactions", "budgets"], allEntries = true) fun editTransaction(transaction: Transaction): Mono { return getTransactionById(transaction.id!!).flatMap { oldStateOfTransaction -> val changed = compareSumDateDoneIsChanged(oldStateOfTransaction, transaction) if (!changed) { return@flatMap transactionsRepo.save(transaction) // Сохраняем, если изменений нет } val amountDifference = transaction.amount - oldStateOfTransaction.amount // Обработка дочерней транзакции handleChildTransaction( oldStateOfTransaction, transaction ).then(transactionsRepo.save(transaction)) // Сохраняем основную транзакцию .flatMap { savedTransaction -> updateBudgetOnEdit(oldStateOfTransaction, savedTransaction, amountDifference).thenReturn( savedTransaction ) // Ждем выполнения updateBudgetOnEdit и возвращаем транзакцию } }.switchIfEmpty( Mono.error(IllegalArgumentException("Transaction not found with id: ${transaction.id}")) ) } private fun handleChildTransaction( oldTransaction: Transaction, newTransaction: Transaction ): Mono { return getTransactionByParentId( newTransaction.id!! ).flatMap { childTransaction -> logger.info("Updating child: $childTransaction") val updatedChild = childTransaction.copy( amount = newTransaction.amount, category = newTransaction.category, comment = newTransaction.comment, user = newTransaction.user ) transactionsRepo.save(updatedChild).then() }.switchIfEmpty( Mono.defer { if (!oldTransaction.isDone && newTransaction.isDone) { val newChildTransaction = newTransaction.copy( id = null, type = TransactionType("INSTANT", "Текущие"), parentId = newTransaction.id ) return@defer transactionsRepo.save(newChildTransaction).flatMap { updateBudgetOnCreate(it) } .then() } else { return@defer Mono.empty() // Используем Mono.empty() для пустой операции } }).then( Mono.defer { if (oldTransaction.isDone && !newTransaction.isDone) { getTransactionByParentId(newTransaction.id!!) .flatMap { child -> deleteTransaction(child) }.then() } else { return@defer Mono.empty() // Вернем пустую операцию, если условия не выполняются } }) } fun compareSumDateDoneIsChanged(t1: Transaction, t2: Transaction): Boolean { return if (t1.amount != t2.amount) { true } else if (t1.date != t2.date) { true } else if (t1.isDone != t2.isDone) { true } else if (t1.category.id != t2.category.id) { return true } else { return false } } fun deleteTransaction(transaction: Transaction): Mono { return transactionsRepo.deleteById(transaction.id!!) // Удаляем транзакцию .then(Mono.defer { updateBudgetOnDelete(transaction) }) .then() // Завершаем Mono, так как нам не нужно возвращать результат } // @CacheEvict(cacheNames = ["transactions", "childTransactions"], allEntries = true) // fun setTransactionDone(transaction: Transaction): Transaction { // val oldStateTransaction = transactionsRepo.findById(transaction.id!!) // .orElseThrow { RuntimeException("Transaction ${transaction.id} not found") } // // if (transaction.isDone) { // if (oldStateTransaction.isDone) { // throw RuntimeException("Transaction ${transaction.id} is already done") // } // // // Создание дочерней транзакции // val childTransaction = transaction.copy( // id = null, // type = TransactionType("INSTANT", "Текущие"), // parentId = transaction.id // ) // createTransaction(childTransaction) // } else { // // Удаление дочерней транзакции, если она существует // transactionsRepo.findByParentId(transaction.id!!).getOrNull()?.let { // deleteTransaction(it.id!!) // } ?: logger.warn("Child transaction of parent ${transaction.id} not found") // } // // return editTransaction(transaction) // } @Cacheable("childTransactions", key = "#parentId") fun getChildTransaction(parentId: String): Mono { return transactionsRepo.findByParentId(parentId) } // fun getTransactionByOldId(id: Int): Transaction? { // return transactionsRepo.findByOldId(id).getOrNull() // } // fun transferTransactions(): Mono { // var transactions = transactionsRepoSQl.getTransactions() // return transactionsRepo.saveAll(transactions).then() // } // fun calcTransactionsSum( spaceId: String, budget: Budget, categoryId: String? = null, categoryType: String? = null, transactionType: String? = null, isDone: Boolean? = null ): Mono { val matchCriteria = mutableListOf() // Добавляем фильтры matchCriteria.add(Criteria.where("spaceDetails._id").`is`(ObjectId(spaceId))) matchCriteria.add(Criteria.where("date").gte(budget.dateFrom)) matchCriteria.add(Criteria.where("date").lt(budget.dateTo)) categoryId?.let { matchCriteria.add(Criteria.where("category.\$id").`is`(ObjectId(it))) } categoryType?.let { matchCriteria.add(Criteria.where("categoryDetails.type.code").`is`(it)) } transactionType?.let { matchCriteria.add(Criteria.where("type.code").`is`(it)) } isDone?.let { matchCriteria.add(Criteria.where("isDone").`is`(it)) } // Сборка агрегации val lookup = lookup("categories", "category.\$id", "_id", "categoryDetails") val unwind = unwind("categoryDetails") val lookupSpace = lookup("space", "space.\$id", "_id", "spaceDetails") val unwindSpace = unwind("spaceDetails") val match = match(Criteria().andOperator(*matchCriteria.toTypedArray())) val project = project("category").andExpression("{ \$toDouble: \"\$amount\" }").`as`("amount") val group = group(categoryId ?: "all").sum("amount").`as`("totalSum") val projectSum = project("totalSum") val aggregation = newAggregation(lookup, unwind, lookupSpace, unwindSpace, match, project, group, projectSum) return reactiveMongoTemplate.aggregate(aggregation, "transactions", Map::class.java).map { result -> val totalSum = result["totalSum"] if (totalSum is Double) { totalSum } else { 0.0 } }.reduce(0.0) { acc, sum -> acc + sum } // Суммируем значения, если несколько результатов } // @Cacheable("transactions") fun getAverageSpendingByCategory(): Mono> { val firstDateOfMonth = LocalDate.now().with(TemporalAdjusters.firstDayOfMonth()) val lookup = lookup("categories", "category.\$id", "_id", "categoryDetails") val unwind = unwind("categoryDetails") val match = match( Criteria.where("categoryDetails.type.code").`is`("EXPENSE").and("type.code").`is`("INSTANT").and("date") .lt(firstDateOfMonth) ) val projectDate = project("_id", "category", "amount", "categoryDetails").and(DateToString.dateOf("date").toString("%Y-%m")) .`as`("month").andExpression("{ \$toDouble: \"\$amount\" }").`as`("amount") val groupByMonthAndCategory = group("month", "category.\$id").sum("amount").`as`("sum") val groupByCategory = group("_id.id").avg("sum").`as`("averageAmount") val project = project().and("_id").`as`("category").and("averageAmount").`as`("avgAmount") val sort = sort(Sort.by(Sort.Order.asc("_id"))) val aggregation = newAggregation( lookup, unwind, match, projectDate, groupByMonthAndCategory, groupByCategory, project, sort ) return reactiveMongoTemplate.aggregate(aggregation, "transactions", Map::class.java).collectList() .map { results -> results.associate { result -> val category = result["category"]?.toString() ?: "Unknown" val avgAmount = (result["avgAmount"] as? Double) ?: 0.0 category to avgAmount } }.defaultIfEmpty(emptyMap()) // Возвращаем пустую карту, если результатов нет } @Cacheable("transactionTypes") fun getTransactionTypes(): List { val types = mutableListOf() types.add(TransactionType("PLANNED", "Плановые")) types.add(TransactionType("INSTANT", "Текущие")) return types } fun getAverageIncome(): Mono { val lookup = lookup("categories", "category.\$id", "_id", "detailedCategory") val unwind = unwind("detailedCategory") val match = match( Criteria.where("detailedCategory.type.code").`is`("INCOME").and("type.code").`is`("INSTANT").and("isDone") .`is`(true) ) val project = project("_id", "category", "detailedCategory").and(DateToString.dateOf("date").toString("%Y-%m")) .`as`("month").andExpression("{ \$toDouble: \"\$amount\" }").`as`("amount") val groupByMonth = group("month").sum("amount").`as`("sum") val groupForAverage = group("avgIncomeByMonth").avg("sum").`as`("averageAmount") val aggregation = newAggregation(lookup, unwind, match, project, groupByMonth, groupForAverage) return reactiveMongoTemplate.aggregate(aggregation, "transactions", Map::class.java) .singleOrEmpty() // Ожидаем только один результат .map { result -> result["averageAmount"] as? Double ?: 0.0 }.defaultIfEmpty(0.0) // Если результат пустой, возвращаем 0.0 } fun getTransactionsByTypes( spaceId: String, dateFrom: LocalDate, dateTo: LocalDate ): Mono>> { val pipeline = listOf( Document( "\$lookup", Document("from", "categories").append("localField", "category.\$id").append("foreignField", "_id") .append("as", "categoryDetailed") ), Document( "\$lookup", Document("from", "spaces").append("localField", "space.\$id").append("foreignField", "_id") .append("as", "spaceDetailed") ), Document( "\$lookup", Document("from", "users").append("localField", "user.\$id").append("foreignField", "_id") .append("as", "userDetailed") ), Document( "\$unwind", Document("path", "\$categoryDetailed").append("preserveNullAndEmptyArrays", true) ), Document( "\$unwind", Document("path", "\$spaceDetailed").append("preserveNullAndEmptyArrays", true) ), Document( "\$unwind", Document("path", "\$userDetailed").append("preserveNullAndEmptyArrays", true) ), Document( "\$match", Document( "\$and", listOf( Document("spaceDetailed._id", ObjectId(spaceId)), Document( "date", Document( "\$gte", Date.from( LocalDateTime.of(dateFrom, LocalTime.MIN).atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC).toInstant() ) ) ), Document( "date", Document( "\$lt", Date.from( LocalDateTime.of(dateTo, LocalTime.MAX).atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC).toInstant() ) ) ) ) ) ), Document( "\$facet", Document( "plannedExpenses", listOf( Document( "\$match", Document("type.code", "PLANNED").append("categoryDetailed.type.code", "EXPENSE") ), Document("\$sort", Document("date", 1).append("_id", 1)) ) ).append( "plannedIncomes", listOf( Document( "\$match", Document("type.code", "PLANNED").append("categoryDetailed.type.code", "INCOME") ), Document("\$sort", Document("date", 1).append("_id", 1)) ) ).append( "instantTransactions", listOf( Document("\$match", Document("type.code", "INSTANT")), Document("\$sort", Document("date", 1).append("_id", 1)) ) ) ) ) // getCategoriesExplainReactive(pipeline) // .doOnNext { explainResult -> // logger.info("Explain Result: ${explainResult.toJson()}") // } // .subscribe() // Этот вызов лучше оставить только для отладки return reactiveMongoTemplate.getCollection("transactions") .flatMapMany { it.aggregate(pipeline, Document::class.java) } .single() // Получаем только первый результат агрегации .flatMap { aggregationResult -> Mono.zip( extractTransactions(aggregationResult, "plannedExpenses"), extractTransactions(aggregationResult, "plannedIncomes"), extractTransactions(aggregationResult, "instantTransactions") ).map { tuple -> val plannedExpenses = tuple.t1 val plannedIncomes = tuple.t2 val instantTransactions = tuple.t3 mapOf( "plannedExpenses" to plannedExpenses, "plannedIncomes" to plannedIncomes, "instantTransactions" to instantTransactions ) } } } private fun extractTransactions(aggregationResult: Document, key: String): Mono> { val resultTransactions = aggregationResult[key] as? List ?: emptyList() return Flux.fromIterable(resultTransactions).map { documentToTransactionMapper(it) }.collectList() } private fun documentToTransactionMapper(document: Document): Transaction { val transactionType = document["type"] as Document var user: User? val userDocument = document["userDetailed"] as Document user = User( id = (userDocument["_id"] as ObjectId).toString(), username = userDocument["username"] as String, firstName = userDocument["firstName"] as String, tgId = userDocument["tgId"] as String?, tgUserName = userDocument["tgUserName"]?.let { it as String }, password = null, isActive = userDocument["isActive"] as Boolean, regDate = (userDocument["regDate"] as Date).toInstant().atZone(ZoneId.systemDefault()).toLocalDate(), createdAt = (userDocument["createdAt"] as Date).toInstant().atZone(ZoneId.systemDefault()) .toLocalDateTime(), roles = userDocument["roles"] as ArrayList, ) val categoryDocument = document["categoryDetailed"] as Document val categoryTypeDocument = categoryDocument["type"] as Document val category = Category( id = (categoryDocument["_id"] as ObjectId).toString(), type = CategoryType(categoryTypeDocument["code"] as String, categoryTypeDocument["name"] as String), name = categoryDocument["name"] as String, description = categoryDocument["description"] as String, icon = categoryDocument["icon"] as String ) return Transaction( (document["_id"] as ObjectId).toString(), null, TransactionType( transactionType["code"] as String, transactionType["name"] as String ), user = user, category = category, comment = document["comment"] as String, date = (document["date"] as Date).toInstant().atZone(ZoneId.systemDefault()).toLocalDate(), amount = document["amount"] as Double, isDone = document["isDone"] as Boolean, parentId = if (document["parentId"] != null) document["parentId"] as String else null, createdAt = LocalDateTime.ofInstant((document["createdAt"] as Date).toInstant(), ZoneOffset.UTC), ) } fun getBudgetSumsByCategory(categoryId: String, budget: Budget): Mono { logger.info("getting budget sums for category $categoryId") val pipeline = listOf( Document( "\$match", Document("category.\$id", ObjectId(categoryId)).append( "date", Document( "\$gte", Date.from( LocalDateTime.of(budget.dateFrom, LocalTime.MIN).atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC).toInstant() ) ).append( "\$lt", Date.from( LocalDateTime.of(budget.dateTo, LocalTime.MIN).atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC).toInstant() ) ) ) ), Document( "\$group", Document("_id", BsonNull()).append( "plannedAmount", Document( "\$sum", Document( "\$cond", listOf(Document("\$eq", listOf("\$type.code", "PLANNED")), "\$amount", 0.0) ) ) ).append( "instantAmount", Document( "\$sum", Document( "\$cond", listOf(Document("\$eq", listOf("\$type.code", "INSTANT")), "\$amount", 0.0) ) ) ) ), Document( "\$project", Document("_id", 0).append("plannedAmount", 1).append("instantAmount", 1) ) ) return reactiveMongoTemplate.getCollection("transactions") // Исправлено на transactions .flatMapMany { it.aggregate(pipeline) }.map { logger.info("getting budget sums for category $categoryId end") it }.next() // Берём первый документ, а не весь список } fun getBudgetCategories( spaceId: String, dateFrom: LocalDate, dateTo: LocalDate ): Mono>> { val pipeline = listOf( Document( "\$lookup", Document("from", "spaces").append("localField", "space.\$id").append("foreignField", "_id") .append("as", "spaceDetailed") ), Document( "\$unwind", Document("path", "\$spaceDetailed").append("preserveNullAndEmptyArrays", true) ), Document( "\$match", Document( Document("spaceDetailed._id", ObjectId(spaceId)) ) ), Document( "\$lookup", Document("from", "transactions").append( "let", Document("categoryId", "\$_id") ).append( "pipeline", listOf( Document( "\$match", Document( "\$expr", Document( "\$and", listOf( Document("\$eq", listOf("\$category.\$id", "\$\$categoryId")), Document( "\$gte", listOf( "\$date", Date.from( LocalDateTime.of(dateFrom, LocalTime.MIN) .atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC).toInstant() ), ) ), Document( "\$lte", listOf( "\$date", Date.from( LocalDateTime.of(dateTo, LocalTime.MIN) .atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC).toInstant() ) ) ) ) ) ) ), Document( "\$group", Document("_id", "\$type.code").append( "totalAmount", Document("\$sum", "\$amount") ) ) ) ).append("as", "transactionSums") ), Document( "\$project", Document("_id", 1L).append("transactionSums", 1L) ), Document( "\$project", Document("_id", 1L).append( "plannedAmount", Document( "\$arrayElemAt", listOf( Document( "\$filter", Document("input", "\$transactionSums").append("as", "sum").append( "cond", Document("\$eq", listOf("\$\$sum._id", "PLANNED")) ) ), 0L ) ) ).append( "instantAmount", Document( "\$arrayElemAt", listOf( Document( "\$filter", Document("input", "\$transactionSums").append("as", "sum").append( "cond", Document("\$eq", listOf("\$\$sum._id", "INSTANT")) ) ), 0L ) ) ) ), Document( "\$addFields", Document( "plannedAmount", Document("\$ifNull", listOf("\$plannedAmount.totalAmount", 0.0)) ).append( "instantAmount", Document("\$ifNull", listOf("\$instantAmount.totalAmount", 0.0)) ) ) ) // Анализ плана выполнения (вывод для отладки) // getCategoriesExplainReactive(pipeline) // .doOnNext { explainResult -> // logger.info("Explain Result: ${explainResult.toJson()}") // } // .subscribe() // Этот вызов лучше оставить только для отладки // return reactiveMongoTemplate.getCollection("categories").flatMapMany { it.aggregate(pipeline) }.collectList() .flatMap { result -> val categories = result.associate { document -> val id = document["_id"].toString() val values = mapOf( "plannedAmount" to (document["plannedAmount"] as Double? ?: 0.0), "instantAmount" to (document["instantAmount"] as Double? ?: 0.0) ) id to values } Mono.just(categories) } } fun getCategoryTransactionPipeline( spaceId: String, dateFrom: LocalDate, dateTo: LocalDate, catType: String? = "EXPENSE" ): Mono> { val pipeline = listOf( Document( "\$lookup", Document("from", "spaces").append("localField", "space.\$id").append("foreignField", "_id") .append("as", "spaceDetailed") ), Document( "\$unwind", Document("path", "\$spaceDetailed").append("preserveNullAndEmptyArrays", true) ), Document( "\$match", Document( Document("spaceDetailed._id", ObjectId(spaceId)) ) ), Document("\$match", Document("type.code", catType)), Document( "\$lookup", Document("from", "transactions").append( "let", Document("categoryId", "\$_id") ).append( "pipeline", listOf( Document( "\$match", Document( "\$expr", Document( "\$and", listOf( Document("\$eq", listOf("\$category.\$id", "\$\$categoryId")), Document( "\$gte", listOf( "\$date", Date.from( LocalDateTime.of(dateFrom, LocalTime.MIN) .atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC).toInstant() ) ) ), Document( "\$lt", listOf( "\$date", Date.from( LocalDateTime.of(dateTo, LocalTime.MIN) .atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC).toInstant() ) ) ) ) ) ) ), Document( "\$group", Document("_id", "\$type.code").append( "totalAmount", Document("\$sum", "\$amount") ) ) ) ).append("as", "transactionSums") ), Document( "\$project", Document("_id", 1L).append("type", 1L).append("name", 1L).append("description", 1L).append("icon", 1L) .append( "plannedAmount", Document( "\$arrayElemAt", listOf( Document( "\$filter", Document("input", "\$transactionSums").append("as", "sum").append( "cond", Document("\$eq", listOf("\$\$sum._id", "PLANNED")) ) ), 0.0 ) ) ).append( "instantAmount", Document( "\$arrayElemAt", listOf( Document( "\$filter", Document("input", "\$transactionSums").append("as", "sum").append( "cond", Document("\$eq", listOf("\$\$sum._id", "INSTANT")) ) ), 0.0 ) ) ) ), Document( "\$addFields", Document( "plannedAmount", Document("\$ifNull", listOf("\$plannedAmount.totalAmount", 0.0)) ).append( "instantAmount", Document("\$ifNull", listOf("\$instantAmount.totalAmount", 0.0)) ) ) ) return reactiveMongoTemplate.getCollection("categories") .flatMapMany { it.aggregate(pipeline, Document::class.java) }.map { document -> val catType = document["type"] as Document BudgetCategory( currentSpent = document["instantAmount"] as Double, currentLimit = document["plannedAmount"] as Double, currentPlanned = document["plannedAmount"] as Double, category = Category( document["_id"].toString(), type = CategoryType(catType["code"] as String, catType["name"] as String), name = document["name"] as String, description = document["description"] as String, icon = document["icon"] as String ) ) }.collectList().map { it.toMutableList() } } fun getCategorySumsPipeline(dateFrom: LocalDate, dateTo: LocalDate): Mono> { val pipeline = listOf( Document( "\$lookup", Document("from", "categories").append("localField", "category.\$id").append("foreignField", "_id") .append("as", "categoryDetails") ), Document("\$unwind", "\$categoryDetails"), Document( "\$match", Document( "date", Document( "\$gte", Date.from( LocalDateTime.of(dateFrom, LocalTime.MIN).atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC).toInstant() ) ).append( "\$lte", LocalDateTime.of(dateTo, LocalTime.MIN).atZone(ZoneId.systemDefault()) .withZoneSameInstant(ZoneOffset.UTC).toInstant() ) ) ), Document( "\$group", Document( "_id", Document("categoryId", "\$categoryDetails._id").append("categoryName", "\$categoryDetails.name") .append("year", Document("\$year", "\$date")).append("month", Document("\$month", "\$date")) ).append("totalAmount", Document("\$sum", "\$amount")) ), Document( "\$group", Document("_id", "\$_id.categoryId").append("categoryName", Document("\$first", "\$_id.categoryName")) .append( "monthlyData", Document( "\$push", Document( "month", Document( "\$concat", listOf( Document("\$toString", "\$_id.year"), "-", Document( "\$cond", listOf( Document("\$lt", listOf("\$_id.month", 10L)), Document( "\$concat", listOf( "0", Document("\$toString", "\$_id.month") ) ), Document("\$toString", "\$_id.month") ) ) ) ) ).append("totalAmount", "\$totalAmount") ) ) ), Document( "\$addFields", Document( "completeMonthlyData", Document( "\$map", Document("input", Document("\$range", listOf(0L, 6L))).append("as", "offset").append( "in", Document( "month", Document( "\$dateToString", Document("format", "%Y-%m").append( "date", Document( "\$dateAdd", Document("startDate", Date(1754006400000L)).append("unit", "month") .append( "amount", Document("\$multiply", listOf("\$\$offset", 1L)) ) ) ) ) ).append( "totalAmount", Document( "\$let", Document( "vars", Document( "matched", Document( "\$arrayElemAt", listOf( Document( "\$filter", Document("input", "\$monthlyData").append("as", "data") .append( "cond", Document( "\$eq", listOf( "\$\$data.month", Document( "\$dateToString", Document("format", "%Y-%m").append( "date", Document( "\$dateAdd", Document( "startDate", Date( 1733011200000L ) ).append( "unit", "month" ).append( "amount", Document( "\$multiply", listOf( "\$\$offset", 1L ) ) ) ) ) ) ) ) ) ), 0L ) ) ) ).append( "in", Document("\$ifNull", listOf("\$\$matched.totalAmount", 0L)) ) ) ) ) ) ) ), Document( "\$project", Document("_id", 0L).append("categoryId", "\$_id").append("categoryName", "\$categoryName") .append("monthlyData", "\$completeMonthlyData") ) ) return reactiveMongoTemplate.getCollection("transactions").flatMapMany { it.aggregate(pipeline) }.map { it["categoryId"] = it["categoryId"].toString() it }.collectList() } fun getCategorySummaries(spaceId: String, dateFrom: LocalDate): Mono> { val sixMonthsAgo = Date.from( LocalDateTime.of(dateFrom, LocalTime.MIN).atZone(ZoneId.systemDefault()).withZoneSameInstant(ZoneOffset.UTC) .toInstant() ) // Пример даты, можно заменить на вычисляемую val aggregation = listOf( // 1. Фильтр за последние 6 месяцев Document( "\$lookup", Document("from", "spaces").append("localField", "space.\$id").append("foreignField", "_id") .append("as", "spaceInfo") ), // 4. Распаковываем массив категорий Document("\$unwind", "\$spaceInfo"), Document("\$match", Document("spaceInfo._id", ObjectId(spaceId))), Document( "\$match", Document("date", Document("\$gte", sixMonthsAgo).append("\$lt", Date())).append("type.code", "INSTANT") ), // 2. Группируем по категории + (год, месяц) Document( "\$group", Document( "_id", Document("category", "\$category.\$id").append("year", Document("\$year", "\$date")) .append("month", Document("\$month", "\$date")) ).append("totalAmount", Document("\$sum", "\$amount")) ), // 3. Подтягиваем информацию о категории Document( "\$lookup", Document("from", "categories").append("localField", "_id.category").append("foreignField", "_id") .append("as", "categoryInfo") ), // 4. Распаковываем массив категорий Document("\$unwind", "\$categoryInfo"), // 5. Фильтруем по типу категории (EXPENSE) Document("\$match", Document("categoryInfo.type.code", "EXPENSE")), // 6. Группируем обратно по категории, собирая все (год, месяц, total) Document( "\$group", Document("_id", "\$_id.category").append("categoryName", Document("\$first", "\$categoryInfo.name")) .append("categoryType", Document("\$first", "\$categoryInfo.type.code")) .append("categoryIcon", Document("\$first", "\$categoryInfo.icon")).append( "monthlySums", Document( "\$push", Document("year", "\$_id.year").append("month", "\$_id.month") .append("total", "\$totalAmount") ) ) ), // 7. Формируем единый массив из 6 элементов: // - каждый элемент = {year, month, total}, // - если нет записей за месяц, ставим total=0 Document( "\$project", Document("categoryName", 1).append("categoryType", 1).append("categoryIcon", 1).append( "monthlySums", Document( "\$map", Document("input", Document("\$range", listOf(0, 6))).append("as", "i").append( "in", Document( "\$let", Document( "vars", Document( "subDate", Document( "\$dateSubtract", Document("startDate", Date()).append("unit", "month") .append("amount", "$\$i") ) ) ).append( "in", Document("year", Document("\$year", "$\$subDate")).append( "month", Document("\$month", "$\$subDate") ).append( "total", Document( "\$ifNull", listOf( Document( "\$getField", Document("field", "total").append( "input", Document( "\$arrayElemAt", listOf( Document( "\$filter", Document( "input", "\$monthlySums" ).append("as", "ms").append( "cond", Document( "\$and", listOf( Document( "\$eq", listOf( "$\$ms.year", Document( "\$year", "$\$subDate" ) ) ), Document( "\$eq", listOf( "$\$ms.month", Document( "\$month", "$\$subDate" ) ) ) ) ) ) ), 0.0 ) ) ) ), 0.0 ) ) ) ) ) ) ) ) ), // 8. Сортируем результат по имени категории Document("\$sort", Document("categoryName", 1)) ) // Выполняем агрегацию return reactiveMongoTemplate.getCollection("transactions").flatMapMany { it.aggregate(aggregation) } .map { document -> // Преобразуем _id в строку document["_id"] = document["_id"].toString() // Получаем monthlySums и приводим к изменяемому списку val monthlySums = (document["monthlySums"] as? List<*>)?.map { monthlySum -> if (monthlySum is Document) { // Создаем копию Document, чтобы избежать изменений в исходном списке Document(monthlySum).apply { // Добавляем поле date val date = LocalDate.of(getInteger("year"), getInteger("month"), 1) this["date"] = date } } else { monthlySum } }?.toMutableList() // Сортируем monthlySums по полю date val sortedMonthlySums = monthlySums?.sortedBy { (it as? Document)?.get("date") as? LocalDate } // Рассчитываем разницу между текущим и предыдущим месяцем var previousMonthSum = 0.0 sortedMonthlySums?.forEach { monthlySum -> if (monthlySum is Document) { val currentMonthSum = monthlySum.getDouble("total") ?: 0.0 // Рассчитываем разницу в процентах val difference = if (previousMonthSum != 0.0 && currentMonthSum != 0.0) { (((currentMonthSum - previousMonthSum) / previousMonthSum) * 100).toInt() } else { 0 } // Добавляем поле difference monthlySum["difference"] = difference // Обновляем previousMonthSum для следующей итерации previousMonthSum = currentMonthSum } } // Обновляем документ с отсортированными и обновленными monthlySums document["monthlySums"] = sortedMonthlySums document }.collectList() } }