+ bot + notifications
This commit is contained in:
@@ -4,10 +4,10 @@ import kotlinx.coroutines.reactive.awaitFirstOrNull
|
||||
import kotlinx.coroutines.reactor.awaitSingle
|
||||
import kotlinx.coroutines.reactor.awaitSingleOrNull
|
||||
import org.springframework.cache.annotation.Cacheable
|
||||
import org.springframework.security.core.context.ReactiveSecurityContextHolder
|
||||
import org.springframework.security.core.userdetails.UsernameNotFoundException
|
||||
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder
|
||||
import org.springframework.stereotype.Service
|
||||
import reactor.core.publisher.Mono
|
||||
import space.luminic.budgerapp.configs.AuthException
|
||||
import space.luminic.budgerapp.models.TokenStatus
|
||||
import space.luminic.budgerapp.models.User
|
||||
@@ -28,6 +28,16 @@ class AuthService(
|
||||
) {
|
||||
private val passwordEncoder = BCryptPasswordEncoder()
|
||||
|
||||
suspend fun getSecurityUser(): User {
|
||||
val securityContextHolder = ReactiveSecurityContextHolder.getContext().awaitSingleOrNull()
|
||||
?: throw AuthException("Authentication failed")
|
||||
val authentication = securityContextHolder.authentication
|
||||
|
||||
val username = authentication.name
|
||||
// Получаем пользователя по имени
|
||||
return userService.getByUsername(username)
|
||||
}
|
||||
|
||||
suspend fun login(username: String, password: String): String {
|
||||
val user = userRepository.findByUsername(username).awaitFirstOrNull()
|
||||
?: throw UsernameNotFoundException("Пользователь не найден")
|
||||
|
||||
308
src/main/kotlin/space/luminic/budgerapp/services/BotService.kt
Normal file
308
src/main/kotlin/space/luminic/budgerapp/services/BotService.kt
Normal file
@@ -0,0 +1,308 @@
|
||||
package space.luminic.budgerapp.services
|
||||
|
||||
import kotlinx.coroutines.reactive.awaitSingle
|
||||
import kotlinx.coroutines.reactor.awaitSingleOrNull
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.bson.Document
|
||||
import org.bson.types.ObjectId
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.springframework.data.mongodb.core.ReactiveMongoTemplate
|
||||
import org.springframework.data.mongodb.core.aggregation.Aggregation.*
|
||||
|
||||
import org.springframework.data.mongodb.core.query.Criteria
|
||||
import org.springframework.stereotype.Service
|
||||
import org.telegram.telegrambots.bots.TelegramLongPollingBot
|
||||
import org.telegram.telegrambots.meta.api.methods.send.SendMessage
|
||||
import org.telegram.telegrambots.meta.api.methods.updatingmessages.DeleteMessage
|
||||
import org.telegram.telegrambots.meta.api.methods.updatingmessages.EditMessageText
|
||||
import org.telegram.telegrambots.meta.api.objects.Update
|
||||
import org.telegram.telegrambots.meta.api.objects.replykeyboard.InlineKeyboardMarkup
|
||||
import org.telegram.telegrambots.meta.api.objects.replykeyboard.buttons.InlineKeyboardButton
|
||||
import org.telegram.telegrambots.meta.exceptions.TelegramApiException
|
||||
import space.luminic.budgerapp.configs.TelegramBotConfig
|
||||
import space.luminic.budgerapp.configs.TelegramBotProperties
|
||||
import space.luminic.budgerapp.models.*
|
||||
import space.luminic.budgerapp.repos.BotStatesRepo
|
||||
import java.time.LocalDate
|
||||
import java.time.LocalDateTime
|
||||
|
||||
@Service
|
||||
class BotService(
|
||||
private val telegramBotProperties: TelegramBotProperties,
|
||||
private val botStatesRepo: BotStatesRepo,
|
||||
private val reactiveMongoTemplate: ReactiveMongoTemplate,
|
||||
private val userService: UserService,
|
||||
private val categoriesService: CategoryService,
|
||||
private val financialService: FinancialService,
|
||||
private val spaceService: SpaceService,
|
||||
) : TelegramLongPollingBot(telegramBotProperties.token) {
|
||||
private val logger = LoggerFactory.getLogger(javaClass)
|
||||
|
||||
override fun getBotUsername(): String {
|
||||
return telegramBotProperties.username
|
||||
}
|
||||
|
||||
override fun onUpdateReceived(update: Update) = runBlocking {
|
||||
logger.info("Received message $update")
|
||||
try {
|
||||
if (update.hasCallbackQuery()) {
|
||||
processCallback(update)
|
||||
} else if (update.hasMessage()) {
|
||||
if (update.message.hasText()) {
|
||||
processMessage(update)
|
||||
} else if (update.message.hasPhoto()) {
|
||||
processPhoto(update)
|
||||
} else if (update.message.hasVideo()) {
|
||||
processVideo(update)
|
||||
}
|
||||
}
|
||||
} catch (e: TelegramBotException) {
|
||||
e.printStackTrace()
|
||||
logger.error(e.message)
|
||||
sendMessage(e.chatId.toString(), "${e.message}")
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun processCallback(update: Update) {
|
||||
val chatId = update.callbackQuery.message.chatId.toString()
|
||||
val tgUserId = update.callbackQuery.from.id
|
||||
val user = userService.getUserByTelegramId(tgUserId) ?: throw TelegramBotException(
|
||||
"User ${update.callbackQuery.from.userName} not found",
|
||||
chatId = update.callbackQuery.message.chatId
|
||||
)
|
||||
val state = getState(user.id!!)
|
||||
if (state != null) {
|
||||
when (state.data.first { it.chatId == chatId }.state) {
|
||||
BotStates.WAIT_CATEGORY ->
|
||||
if (update.callbackQuery.data.startsWith("category_")) {
|
||||
confirmTransaction(
|
||||
chatId,
|
||||
user = userService.getUserByTelegramId(tgUserId)!!,
|
||||
update
|
||||
)
|
||||
} else if (update.callbackQuery.data == "cancel") {
|
||||
finishState(chatId, user)
|
||||
val deleteMsg = DeleteMessage(chatId, update.callbackQuery.message.messageId)
|
||||
execute(deleteMsg)
|
||||
sendMessage(chatId, "Введите сумму и комментарий когда будете готовы.")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun processMessage(update: Update) {
|
||||
val user = userService.getUserByTelegramId(update.message.from.id) ?: throw TelegramBotException(
|
||||
"Мы не знакомы",
|
||||
chatId = update.message.chatId,
|
||||
)
|
||||
getState(user.id!!)?.data?.find { it.chatId == update.message.chatId.toString() }?.let {
|
||||
if (it.state == BotStates.WAIT_CATEGORY) {
|
||||
throw TelegramBotException(
|
||||
"Уже есть открытый выбор категории",
|
||||
update.message.chatId
|
||||
)
|
||||
}
|
||||
}
|
||||
newExpense(
|
||||
update.message.chatId.toString(),
|
||||
user = user,
|
||||
text = update.message.text,
|
||||
)
|
||||
}
|
||||
|
||||
private fun processPhoto(update: Update) {
|
||||
|
||||
}
|
||||
|
||||
private fun processVideo(update: Update) {
|
||||
|
||||
}
|
||||
|
||||
private fun sendMessage(chatId: String, text: String) {
|
||||
val message = SendMessage(chatId, text)
|
||||
try {
|
||||
execute(message)
|
||||
} catch (e: TelegramApiException) {
|
||||
e.printStackTrace()
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun newExpense(chatId: String, user: User, text: String) {
|
||||
|
||||
val splitText = text.split(" ")
|
||||
if (splitText.size < 2) {
|
||||
try {
|
||||
throw TelegramBotException("Сумма или комментарий не введены", chatId.toLong())
|
||||
|
||||
} catch (e: TelegramApiException) {
|
||||
e.printStackTrace()
|
||||
}
|
||||
} else {
|
||||
val sum = try {
|
||||
splitText[0].toInt()
|
||||
} catch (e: NumberFormatException) {
|
||||
throw TelegramBotException("Кажется первый параметр не цифра", chatId.toLong())
|
||||
}
|
||||
val textWOSum = splitText.drop(1)
|
||||
var comment = ""
|
||||
textWOSum.map { word ->
|
||||
comment += "$word "
|
||||
}
|
||||
|
||||
val categories =
|
||||
categoriesService.getCategories(
|
||||
"67af3c0f652da946a7dd9931",
|
||||
"EXPENSE",
|
||||
sortBy = "name",
|
||||
direction = "ASC"
|
||||
)
|
||||
.awaitSingle()
|
||||
val keyboard = InlineKeyboardMarkup()
|
||||
val buttonLines = mutableListOf<MutableList<InlineKeyboardButton>>()
|
||||
categories.map { category ->
|
||||
val btn = InlineKeyboardButton.builder().text("${category.icon} ${category.name}")
|
||||
.callbackData("category_${category.id}").build()
|
||||
|
||||
if (category.name.length >= 15) {
|
||||
// Если текст длинный, создаём отдельную строку для кнопки
|
||||
buttonLines.add(mutableListOf(btn))
|
||||
} else {
|
||||
var isAdded = false
|
||||
|
||||
// Пытаемся добавить кнопку в существующую строку
|
||||
for (line in buttonLines) {
|
||||
if (line.size < 2 && (line.isEmpty() || line[0].text.length < 14)) {
|
||||
line.add(btn)
|
||||
isAdded = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Если не нашли подходящую строку, создаём новую
|
||||
if (!isAdded) {
|
||||
buttonLines.add(mutableListOf(btn))
|
||||
} else {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
val backButton = InlineKeyboardButton.builder().text("Отмена").callbackData("cancel").build()
|
||||
|
||||
buttonLines.add(mutableListOf(backButton))
|
||||
keyboard.keyboard = buttonLines
|
||||
|
||||
val message = SendMessage()
|
||||
message.chatId = chatId
|
||||
val msg = "Выберите категорию"
|
||||
message.text = msg
|
||||
message.replyMarkup = keyboard
|
||||
val userState = BotUserState(user = user)
|
||||
val chatData =
|
||||
userState.data.find { it.chatId == chatId } ?: ChatData(chatId, state = BotStates.WAIT_CATEGORY)
|
||||
chatData.data["sum"] = sum.toString()
|
||||
chatData.data["comment"] = comment
|
||||
userState.data.add(chatData)
|
||||
try {
|
||||
execute(message)
|
||||
setState(userState)
|
||||
} catch (e: TelegramApiException) {
|
||||
e.printStackTrace()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun confirmTransaction(chatId: String, user: User, update: Update) {
|
||||
val state = getState(user.id!!)
|
||||
if (state == null) {
|
||||
sendMessage(chatId, "Не можем найти информацию о сумме и комментарии")
|
||||
return
|
||||
}
|
||||
val stateData = state.data.find { it.chatId == chatId }
|
||||
if (stateData == null) {
|
||||
sendMessage(chatId, "Не можем найти информацию о сумме и комментарии")
|
||||
return
|
||||
}
|
||||
|
||||
val category = categoriesService.getCategories(
|
||||
"67af3c0f652da946a7dd9931",
|
||||
"EXPENSE",
|
||||
sortBy = "name",
|
||||
direction = "ASC"
|
||||
)
|
||||
.awaitSingle()
|
||||
.first { it.id == update.callbackQuery.data.split("_")[1] }
|
||||
val space = spaceService.getSpace("67af3c0f652da946a7dd9931")
|
||||
val instantType = financialService.getTransactionTypes().first { it.code == "INSTANT" }
|
||||
val transaction = financialService.createTransaction(
|
||||
space,
|
||||
transaction = Transaction(
|
||||
space = space,
|
||||
type = instantType,
|
||||
user = user,
|
||||
category = category,
|
||||
comment = stateData.data["comment"]!!.trim(),
|
||||
date = LocalDate.now(),
|
||||
amount = stateData.data["sum"]!!.toDouble(),
|
||||
isDone = true,
|
||||
parentId = null,
|
||||
createdAt = LocalDateTime.now()
|
||||
),
|
||||
user = user
|
||||
)
|
||||
val editMsg = EditMessageText()
|
||||
editMsg.chatId = chatId
|
||||
editMsg.messageId = update.callbackQuery.message.messageId
|
||||
editMsg.text = "Успешно создали транзакцию c id ${transaction.id}"
|
||||
try {
|
||||
execute(editMsg)
|
||||
// execute(msg)
|
||||
finishState(chatId, user)
|
||||
} catch (e: TelegramApiException) {
|
||||
e.printStackTrace()
|
||||
logger.error(e.message)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private suspend fun getState(userId: String): BotUserState? {
|
||||
val lookup = lookup("users", "user.\$id", "_id", "userDetails")
|
||||
val unwind = unwind("userDetails")
|
||||
val match = match(Criteria.where("userDetails._id").`is`(ObjectId(userId)))
|
||||
|
||||
val aggregation = newAggregation(lookup, unwind, match)
|
||||
return reactiveMongoTemplate.aggregate(aggregation, "bot-user-states", Document::class.java)
|
||||
.next()
|
||||
.map { doc ->
|
||||
val dataList = doc.getList("data", Document::class.java)
|
||||
BotUserState(
|
||||
id = doc.getObjectId("_id").toString(),
|
||||
user = User(doc.get("userDetails", Document::class.java).getObjectId("_id").toString()),
|
||||
data = dataList.map {
|
||||
val data = it.get("data", Document::class.java)
|
||||
ChatData(
|
||||
chatId = it.getString("chatId"),
|
||||
state = BotStates.valueOf(it.getString("state")),
|
||||
data = (data.toMap().mapValues { it.value.toString() }.toMutableMap())
|
||||
)
|
||||
}.toMutableList(),
|
||||
)
|
||||
}
|
||||
.awaitSingleOrNull()
|
||||
}
|
||||
|
||||
private suspend fun setState(userState: BotUserState): BotUserState {
|
||||
val stateToSave = userState.user.id?.let { userId ->
|
||||
getState(userId)?.copy(data = userState.data)
|
||||
?: BotUserState(user = userState.user, data = userState.data)
|
||||
} ?: BotUserState(user = userState.user, data = userState.data)
|
||||
|
||||
return botStatesRepo.save(stateToSave).awaitSingle()
|
||||
}
|
||||
|
||||
private suspend fun finishState(chatId: String, user: User) {
|
||||
val state = getState(user.id!!)
|
||||
state?.data?.removeIf { it.chatId == chatId }
|
||||
state?.let { botStatesRepo.save(state).awaitSingle() }
|
||||
}
|
||||
|
||||
}
|
||||
@@ -17,10 +17,7 @@ import org.springframework.data.mongodb.core.query.isEqualTo
|
||||
import org.springframework.stereotype.Service
|
||||
import reactor.core.publisher.Mono
|
||||
import space.luminic.budgerapp.mappers.CategoryMapper
|
||||
import space.luminic.budgerapp.models.Category
|
||||
import space.luminic.budgerapp.models.CategoryType
|
||||
import space.luminic.budgerapp.models.NotFoundException
|
||||
import space.luminic.budgerapp.models.Space
|
||||
import space.luminic.budgerapp.models.*
|
||||
import space.luminic.budgerapp.repos.BudgetRepo
|
||||
import space.luminic.budgerapp.repos.CategoryRepo
|
||||
|
||||
@@ -133,7 +130,7 @@ class CategoryService(
|
||||
return categoryRepo.save(category).awaitSingle() // Сохраняем категорию, если тип не изменился
|
||||
}
|
||||
|
||||
suspend fun deleteCategory(space: Space, categoryId: String) {
|
||||
suspend fun deleteCategory(space: Space, categoryId: String, author: User) {
|
||||
findCategory(space, categoryId)
|
||||
val transactions = financialService.getTransactions(space.id!!, categoryId = categoryId).awaitSingle()
|
||||
if (transactions.isNotEmpty()) {
|
||||
@@ -154,7 +151,7 @@ class CategoryService(
|
||||
|
||||
transactions.map { transaction ->
|
||||
transaction.category = otherCategory
|
||||
financialService.editTransaction(transaction)
|
||||
financialService.editTransaction(transaction, author)
|
||||
}
|
||||
}
|
||||
val budgets = financialService.findProjectedBudgets(
|
||||
|
||||
@@ -1,11 +1,8 @@
|
||||
package space.luminic.budgerapp.services
|
||||
|
||||
import kotlinx.coroutines.async
|
||||
import kotlinx.coroutines.awaitAll
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.flow.map
|
||||
import kotlinx.coroutines.flow.toList
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.reactive.asFlow
|
||||
import kotlinx.coroutines.reactive.awaitFirstOrNull
|
||||
import kotlinx.coroutines.reactive.awaitSingle
|
||||
@@ -35,6 +32,7 @@ import space.luminic.budgerapp.repos.CategoryRepo
|
||||
import space.luminic.budgerapp.repos.TransactionRepo
|
||||
import space.luminic.budgerapp.repos.WarnRepo
|
||||
import java.time.*
|
||||
import java.time.format.DateTimeFormatter
|
||||
import java.time.temporal.TemporalAdjusters
|
||||
import java.util.*
|
||||
|
||||
@@ -48,7 +46,8 @@ class FinancialService(
|
||||
val reactiveMongoTemplate: ReactiveMongoTemplate,
|
||||
private val categoryRepo: CategoryRepo,
|
||||
val transactionsMapper: TransactionsMapper,
|
||||
val budgetMapper: BudgetMapper
|
||||
val budgetMapper: BudgetMapper,
|
||||
private val subscriptionService: SubscriptionService
|
||||
) {
|
||||
private val logger = LoggerFactory.getLogger(FinancialService::class.java)
|
||||
|
||||
@@ -846,25 +845,45 @@ class FinancialService(
|
||||
|
||||
}
|
||||
|
||||
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
|
||||
|
||||
suspend fun createTransaction(space: Space, transaction: Transaction): Transaction {
|
||||
val securityContextHolder = ReactiveSecurityContextHolder.getContext().awaitSingle()
|
||||
val user = userService.getByUserNameWoPass(securityContextHolder.authentication.name)
|
||||
if (space.users.none { it.id.toString() == user.id }) {
|
||||
suspend fun createTransaction(space: Space, transaction: Transaction, user: User? = null): Transaction {
|
||||
val author = user
|
||||
?: userService.getByUserNameWoPass(
|
||||
ReactiveSecurityContextHolder.getContext().awaitSingle().authentication.name
|
||||
)
|
||||
if (space.users.none { it.id.toString() == author.id }) {
|
||||
throw IllegalArgumentException("User does not have access to this Space")
|
||||
}
|
||||
// Привязываем space и user к транзакции
|
||||
transaction.user = user
|
||||
transaction.user = author
|
||||
transaction.space = space
|
||||
|
||||
val savedTransaction = transactionsRepo.save(transaction).awaitSingle()
|
||||
|
||||
updateBudgetOnCreate(savedTransaction)
|
||||
scope.launch {
|
||||
val dateFormatter = DateTimeFormatter.ofPattern("dd.MM.yyyy")
|
||||
val transactionType = if (transaction.type.code == "INSTANT") "текущую" else "плановую"
|
||||
|
||||
subscriptionService.sendToSpaceOwner(
|
||||
space.owner!!.id!!, PushMessage(
|
||||
title = "Новая транзакция в пространстве ${space.name}!",
|
||||
body = "Пользователь ${author.username} создал $transactionType транзакцию на сумму ${transaction.amount.toInt()} с комментарием ${transaction.comment} с датой ${
|
||||
dateFormatter.format(
|
||||
transaction.date
|
||||
)
|
||||
}",
|
||||
url = "https://luminic.space/"
|
||||
)
|
||||
)
|
||||
}
|
||||
return savedTransaction
|
||||
}
|
||||
|
||||
|
||||
@CacheEvict(cacheNames = ["transactions", "budgets"], allEntries = true)
|
||||
suspend fun editTransaction(transaction: Transaction): Transaction {
|
||||
suspend fun editTransaction(transaction: Transaction, author: User): Transaction {
|
||||
val oldStateOfTransaction = getTransactionById(transaction.id!!)
|
||||
val changed = compareSumDateDoneIsChanged(oldStateOfTransaction, transaction)
|
||||
if (!changed) {
|
||||
@@ -878,8 +897,57 @@ class FinancialService(
|
||||
transaction
|
||||
)
|
||||
}
|
||||
val space = transaction.space
|
||||
val savedTransaction = transactionsRepo.save(transaction).awaitSingle()
|
||||
updateBudgetOnEdit(oldStateOfTransaction, savedTransaction, amountDifference)
|
||||
scope.launch {
|
||||
var whatChanged = "nothing"
|
||||
val dateFormatter = DateTimeFormatter.ofPattern("dd.MM.yyyy")
|
||||
val transactionType = if (transaction.type.code == "INSTANT") "текущую" else "плановую"
|
||||
|
||||
val sb = StringBuilder()
|
||||
if (oldStateOfTransaction.amount != transaction.amount) {
|
||||
sb.append("${oldStateOfTransaction.amount} → ${transaction.amount}\n")
|
||||
whatChanged = "main"
|
||||
}
|
||||
if (oldStateOfTransaction.comment != transaction.comment) {
|
||||
sb.append("${oldStateOfTransaction.comment} → ${transaction.comment}\n")
|
||||
whatChanged = "main"
|
||||
|
||||
}
|
||||
if (oldStateOfTransaction.date != transaction.date) {
|
||||
sb.append("${dateFormatter.format(oldStateOfTransaction.date)} → ${dateFormatter.format(transaction.date)}\n")
|
||||
whatChanged = "main"
|
||||
}
|
||||
if (!oldStateOfTransaction.isDone && transaction.isDone) {
|
||||
whatChanged = "done_true"
|
||||
}
|
||||
if (oldStateOfTransaction.isDone && !transaction.isDone) {
|
||||
whatChanged = "done_false"
|
||||
}
|
||||
|
||||
val body: String = when (whatChanged) {
|
||||
"main" -> {
|
||||
"Пользователь ${author.username} изменил $transactionType транзакцию:\n$sb"
|
||||
}
|
||||
"done_true" -> {
|
||||
"Пользователь ${author.username} выполнил ${transaction.comment} с суммой ${transaction.amount.toInt()}"
|
||||
}
|
||||
"done_false" -> {
|
||||
"Пользователь ${author.username} отменил выполнение ${transaction.comment} с суммой ${transaction.amount.toInt()}"
|
||||
}
|
||||
else -> "Изменения не обнаружены, но что то точно изменилось"
|
||||
}
|
||||
|
||||
subscriptionService.sendToSpaceOwner(
|
||||
space?.owner!!.id!!, PushMessage(
|
||||
title = "Новое действие в пространстве ${space.name}!",
|
||||
body = body,
|
||||
url = "https://luminic.space/"
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
return savedTransaction
|
||||
}
|
||||
|
||||
|
||||
@@ -37,14 +37,9 @@ class SpaceService(
|
||||
private val tagRepo: TagRepo
|
||||
) {
|
||||
|
||||
suspend fun isValidRequest(spaceId: String): Space {
|
||||
val securityContextHolder = ReactiveSecurityContextHolder.getContext().awaitSingleOrNull()
|
||||
?: throw AuthException("Authentication failed")
|
||||
val authentication = securityContextHolder.authentication
|
||||
|
||||
val username = authentication.name
|
||||
// Получаем пользователя по имени
|
||||
val user = userService.getByUsername(username)
|
||||
|
||||
suspend fun isValidRequest(spaceId: String, user: User): Space {
|
||||
val space = getSpace(spaceId)
|
||||
|
||||
// Проверяем доступ пользователя к пространству
|
||||
|
||||
@@ -3,13 +3,16 @@ package space.luminic.budgerapp.services
|
||||
|
||||
import com.interaso.webpush.VapidKeys
|
||||
import com.interaso.webpush.WebPushService
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.reactive.awaitSingle
|
||||
import kotlinx.serialization.encodeToString
|
||||
import kotlinx.serialization.json.Json
|
||||
import org.bson.types.ObjectId
|
||||
import org.slf4j.LoggerFactory
|
||||
import org.springframework.dao.DuplicateKeyException
|
||||
import org.springframework.stereotype.Service
|
||||
import reactor.core.publisher.Mono
|
||||
import space.luminic.budgerapp.models.PushMessage
|
||||
import space.luminic.budgerapp.models.Subscription
|
||||
import space.luminic.budgerapp.models.SubscriptionDTO
|
||||
@@ -36,38 +39,50 @@ class SubscriptionService(private val subscriptionRepo: SubscriptionRepo) {
|
||||
vapidKeys = VapidKeys.fromUncompressedBytes(VAPID_PUBLIC_KEY, VAPID_PRIVATE_KEY)
|
||||
)
|
||||
|
||||
fun sendNotification(endpoint: String, p256dh: String, auth: String, payload: PushMessage): Mono<Void> {
|
||||
return Mono.fromRunnable<Void> {
|
||||
|
||||
suspend fun sendToSpaceOwner(ownerId: String, message: PushMessage) = coroutineScope {
|
||||
val ownerTokens = subscriptionRepo.findByUserIdAndIsActive(ObjectId(ownerId)).collectList().awaitSingle()
|
||||
|
||||
ownerTokens.forEach { token ->
|
||||
launch(Dispatchers.IO) { // Теперь мы точно в корутин скоупе
|
||||
try {
|
||||
sendNotification(token.endpoint, token.p256dh, token.auth, message)
|
||||
} catch (e: Exception) {
|
||||
logger.error("Ошибка при отправке уведомления: ${e.message}", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
suspend fun sendNotification(endpoint: String, p256dh: String, auth: String, payload: PushMessage) {
|
||||
try {
|
||||
pushService.send(
|
||||
payload = Json.encodeToString(payload),
|
||||
endpoint = endpoint,
|
||||
p256dh = p256dh,
|
||||
auth = auth
|
||||
)
|
||||
logger.info("Уведомление успешно отправлено на endpoint: $endpoint")
|
||||
|
||||
} catch (e: Exception) {
|
||||
logger.error("Ошибка при отправке уведомления на endpoint $endpoint: ${e.message}")
|
||||
throw e
|
||||
}
|
||||
.doOnSuccess {
|
||||
logger.info("Уведомление успешно отправлено на endpoint: $endpoint")
|
||||
}
|
||||
.doOnError { e ->
|
||||
logger.error("Ошибка при отправке уведомления на endpoint $endpoint: ${e.message}")
|
||||
}
|
||||
.onErrorResume { e ->
|
||||
Mono.error(e) // Пробрасываем ошибку дальше, если нужна обработка выше
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fun sendToAll(payload: PushMessage): Mono<List<String>> {
|
||||
return subscriptionRepo.findAll()
|
||||
.flatMap { sub ->
|
||||
suspend fun sendToAll(payload: PushMessage) {
|
||||
|
||||
subscriptionRepo.findAll().collectList().awaitSingle().forEach { sub ->
|
||||
|
||||
try {
|
||||
sendNotification(sub.endpoint, sub.p256dh, sub.auth, payload)
|
||||
.then(Mono.just("${sub.user?.username} at endpoint ${sub.endpoint}"))
|
||||
.onErrorResume { e ->
|
||||
sub.isActive = false
|
||||
subscriptionRepo.save(sub).then(Mono.empty())
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
sub.isActive = false
|
||||
subscriptionRepo.save(sub).awaitSingle()
|
||||
}
|
||||
.collectList() // Собираем результаты в список
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -29,8 +29,12 @@ class UserService(val userRepo: UserRepo) {
|
||||
.switchIfEmpty(Mono.error(Exception("User not found"))) // Обрабатываем случай, когда пользователь не найден
|
||||
}
|
||||
|
||||
suspend fun getUserByTelegramId(telegramId: Long): User? {
|
||||
return userRepo.findByTgId(telegramId.toString()).awaitSingleOrNull()
|
||||
}
|
||||
|
||||
@Cacheable("users", key = "#username")
|
||||
|
||||
@Cacheable("users", key = "#username")
|
||||
suspend fun getByUserNameWoPass(username: String): User {
|
||||
return userRepo.findByUsernameWOPassword(username).awaitSingleOrNull()
|
||||
?: throw NotFoundException("User with username: $username not found")
|
||||
|
||||
Reference in New Issue
Block a user