Skip to content

Commit

Permalink
Reverse websocket client support /api and /event routes
Browse files Browse the repository at this point in the history
  • Loading branch information
yyuueexxiinngg committed Aug 4, 2020
1 parent 5e8d10a commit de3d929
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 68 deletions.
28 changes: 23 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,38 @@ debug: false
reverseHost: 127.0.0.1
# 反向Websocket端口
reversePort: 8080
# 访问口令, 默认为空, 即不设置Token
accessToken: ""
# 反向Websocket路径
reversePath: /ws
# 反向Websocket Api路径 尚未实现
# reverseApiPath: /ws/
# 反向Websocket Event路径 尚未实现
# reverseEventPath: /ws/
# 可选, 反向Websocket Api路径, 默认为reversePath
reverseApiPath: /api
# 可选, 反向Websocket Event路径, 默认为reversePath
reverseEventPath: /event
# 是否使用Universal客户端 默认为true
useUniversal: true
# 反向 WebSocket 客户端断线重连间隔,单位毫秒
reconnectInterval: 3000
- enable: true # 这里是第二个连接, 相当于CQHTTP分身版
postMessageFormat: string
reverseHost: 127.0.0.1
reversePort: 9222
reversePath: /ws
useUniversal: false
reconnectInterval: 3000
# 可选,正向Websocket服务器
ws:
# 可选,是否启用正向Websocket服务器,默认不启用
enable: true
# 可选,上报消息格式,string 为字符串格式,array 为数组格式, 默认为string
postMessageFormat: string
# 可选,访问口令, 默认为空, 即不设置Token
accessToken: ""
# 监听主机
wsHost: "0.0.0.0"
# 监听端口
wsPort: 8080

'0987654321': # 这里是第二个QQ Bot的配置
ws_reverse:
- enable: true
Expand All @@ -63,8 +81,8 @@ debug: false
- [x] 反向Websocket客户端
- [x] HTTP上报服务
- [x] Websocket服务端
- [ ] HTTP API
- [ ] Websocket服务端
## 已经支持的CQHTTP API
Expand Down
15 changes: 13 additions & 2 deletions src/main/kotlin/tech/mihoyo/mirai/util/CQMessgeParser.kt
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,11 @@ private suspend fun convertToMiraiMessage(
image = withContext(Dispatchers.IO) { contact!!.uploadImage(bis) }
}
startsWith("http") -> {
image = withContext(Dispatchers.IO) { contact!!.uploadImage(URL(args["file"]!!)) }
image = try {
withContext(Dispatchers.IO) { contact!!.uploadImage(URL(args["file"]!!)) }
} catch (e: Exception) {
null
}
}
else -> {
var fileIdOrPath = args["file"]!!
Expand Down Expand Up @@ -181,13 +185,20 @@ private suspend fun convertToMiraiMessage(
}
}
} else if (args.containsKey("url")) {
image = withContext(Dispatchers.IO) { contact!!.uploadImage(URL(args["url"]!!)) }
image = withContext(Dispatchers.IO) { contact!!.uploadImage(URL(args["url"])) }
}
if (image != null) {
if (args["type"] == "flash") {
return image!!.flash()
}
return image as Image
} else {
val imageUrl = when {
args.containsKey("file") && args["file"]!!.startsWith("http") -> args["file"]
args.containsKey("url") -> args["url"]
else -> null
}
return PlainText("Bot发了一张图片, 但是插件获取不到, 它心累了不想尝试" + if (imageUrl != null) ", 并给出了原图链接: $imageUrl" else "")
}
}
"share" -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import io.ktor.client.features.websocket.WebSockets
import io.ktor.http.cio.websocket.readText
import io.ktor.util.KtorExperimentalAPI
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.consumeEach
import net.mamoe.mirai.Bot
import net.mamoe.mirai.console.plugins.description

import net.mamoe.mirai.event.Listener
import net.mamoe.mirai.event.events.BotEvent
Expand All @@ -22,14 +23,14 @@ import net.mamoe.mirai.event.subscribeAlways
import net.mamoe.mirai.message.TempMessageEvent
import net.mamoe.mirai.utils.currentTimeMillis
import tech.mihoyo.mirai.BotSession
import tech.mihoyo.mirai.PluginBase
import tech.mihoyo.mirai.data.common.*
import tech.mihoyo.mirai.util.logger
import tech.mihoyo.mirai.util.toJson
import java.io.EOFException
import java.io.IOException
import java.net.ConnectException

@ExperimentalCoroutinesApi
@KtorExperimentalAPI
class WebSocketReverseClient(
val session: BotSession
Expand All @@ -42,111 +43,138 @@ class WebSocketReverseClient(
init {
serviceConfig = session.config.getConfigSectionList("ws_reverse").map { WebSocketReverseServiceConfig(it) }
serviceConfig.forEach {
logger.debug("Host: ${it.reverseHost}, Port: ${it.reversePort}, Enable: ${it.enable}")
logger.debug("Host: ${it.reverseHost}, Port: ${it.reversePort}, Enable: ${it.enable}, Use Universal: ${it.useUniversal}")
if (it.enable) {
val httpClientKey = "${it.reverseHost}:${it.reversePort}"
if (!httpClients.containsKey(httpClientKey)) {
httpClients[httpClientKey] = HttpClient {
install(WebSockets)
GlobalScope.launch {
if (it.useUniversal) {
startGeneralWebsocketClient(session.bot, it, "Universal")
} else {
startGeneralWebsocketClient(session.bot, it, "Api")
startGeneralWebsocketClient(session.bot, it, "Event")
}
GlobalScope.launch {
startWebsocketClient(session.bot, it)
}
} else {
logger.error("已有相同主机及端口的实例存在. Host: ${it.reverseHost}, Port: ${it.reversePort}")
}
}
}
}

@KtorExperimentalAPI
@ExperimentalCoroutinesApi
private suspend fun startWebsocketClient(bot: Bot, config: WebSocketReverseServiceConfig) {
val httpClientKey = "${config.reverseHost}:${config.reversePort}"
val isRawMessage = config.postMessageFormat != "array"
private suspend fun startGeneralWebsocketClient(
bot: Bot,
config: WebSocketReverseServiceConfig,
clientType: String
) {
val httpClientKey = "${config.reverseHost}:${config.reversePort}-Client-$clientType"

httpClients[httpClientKey] = HttpClient {
install(WebSockets)
}

logger.debug("$httpClientKey 开始启动")
val path = when (clientType) {
"Api" -> config.reverseApiPath
"Event" -> config.reverseEventPath
else -> config.reversePath
}

try {
logger.debug("$httpClientKey 开始启动")
httpClients[httpClientKey]!!.ws(
host = config.reverseHost,
port = config.reversePort,
path = config.reversePath,
path = path,
request = {
header("User-Agent", "CQHttp/4.15.0")
header("X-Self-ID", bot.id.toString())
header("X-Client-Role", "Universal")
header("X-Client-Role", clientType)
config.accessToken?.let {
header(
"Authorization",
"Token ${config.accessToken}"
)
if (it != "") {
header(
"Authorization",
"Token ${config.accessToken}"
)
}
}
}
) {
// 用来检测Websocket连接是否关闭
websocketSessionByHost[httpClientKey] = this
// 构建事件监听器
if (!subscriptionByHost.containsKey(httpClientKey)) {
// 通知服务方链接建立
send(Frame.Text(CQMetaEventDTO(bot.id, "connect", currentTimeMillis).toJson()))
subscriptionByHost[httpClientKey] = bot.subscribeAlways {
// 保存Event以便在WebsocketSession Block中使用
if (this.bot.id == session.botId) {
val event = this
when (event) {
is TempMessageEvent -> session.cqApiImpl.cachedTempContact[event.sender.id] =
event.group.id
is NewFriendRequestEvent -> session.cqApiImpl.cacheRequestQueue.add(event)
is MemberJoinRequestEvent -> session.cqApiImpl.cacheRequestQueue.add(event)
}
event.toCQDTO(isRawMessage = isRawMessage).takeIf { it !is CQIgnoreEventDTO }?.apply {
send(Frame.Text(this.toJson()))
}
}
}

logger.debug("$httpClientKey Websocket Client启动完毕")
startWebsocketConnectivityCheck(bot, config)

incoming.consumeEach {
when (it) {
is Frame.Text -> {
handleWebSocketActions(outgoing, session.cqApiImpl, it.readText())
}
else -> logger.warning("Unsupported incomeing frame")
when (clientType) {
"Api" -> listenApi(incoming, outgoing)
"Event" -> listenEvent(httpClientKey, config, outgoing)
"Universal" -> {
listenEvent(httpClientKey, config, outgoing)
listenApi(incoming, outgoing)
}
}

logger.debug("$httpClientKey Websocket Client启动完毕")
startWebsocketConnectivityCheck(bot, config, clientType)
} else {
logger.warning("Websocket session alredy exist, $httpClientKey")
}
}
} catch (e: Exception) {
when (e) {
is ConnectException -> {
logger.warning("Websocket连接出错, 请检查服务器是否开启并确认正确监听端口, 将在${config.reconnectInterval / 1000}秒后重试连接, Host: $httpClientKey")
logger.warning("Websocket连接出错, 请检查服务器是否开启并确认正确监听端口, 将在${config.reconnectInterval / 1000}秒后重试连接, Host: $httpClientKey Path: $path")
delay(config.reconnectInterval)
startWebsocketClient(bot, config)
startGeneralWebsocketClient(session.bot, config, clientType)
}
is EOFException -> {
logger.warning("Websocket连接出错, 服务器返回数据不正确, 请检查Websocket服务器是否配置正确, 将在${config.reconnectInterval / 1000}秒后重试连接, Host: $httpClientKey")
logger.warning("Websocket连接出错, 服务器返回数据不正确, 请检查Websocket服务器是否配置正确, 将在${config.reconnectInterval / 1000}秒后重试连接, Host: $httpClientKey Path: $path")
delay(config.reconnectInterval)
startWebsocketClient(bot, config)
startGeneralWebsocketClient(session.bot, config, clientType)
}
is IOException -> {
logger.warning("Websocket连接出错, 可能被服务器关闭, 将在${config.reconnectInterval / 1000}秒后重试连接, Host: $httpClientKey")
logger.warning("Websocket连接出错, 可能被服务器关闭, 将在${config.reconnectInterval / 1000}秒后重试连接, Host: $httpClientKey Path: $path")
delay(config.reconnectInterval)
startWebsocketClient(bot, config)
startGeneralWebsocketClient(session.bot, config, clientType)
}
is CancellationException -> logger.info("Websocket连接关闭中, Host: $httpClientKey")
is CancellationException -> logger.info("Websocket连接关闭中, Host: $httpClientKey Path: $path")
else -> logger.warning("Websocket连接出错, 未知错误, 放弃重试连接, 请检查配置正确后重启mirai " + e.message + e.javaClass.name)
}
}
}

private suspend fun listenApi(incoming: ReceiveChannel<Frame>, outgoing: SendChannel<Frame>) {
incoming.consumeEach {
when (it) {
is Frame.Text -> {
handleWebSocketActions(outgoing, session.cqApiImpl, it.readText())
}
else -> logger.warning("Unsupported incomeing frame")
}
}
}

private suspend fun listenEvent(
httpClientKey: String,
config: WebSocketReverseServiceConfig,
outgoing: SendChannel<Frame>
) {
val isRawMessage = config.postMessageFormat != "array"
subscriptionByHost[httpClientKey] = session.bot.subscribeAlways {
// 保存Event以便在WebsocketSession Block中使用
if (this.bot.id == session.botId) {
val event = this
when (event) {
is TempMessageEvent -> session.cqApiImpl.cachedTempContact[event.sender.id] =
event.group.id
is NewFriendRequestEvent -> session.cqApiImpl.cacheRequestQueue.add(event)
is MemberJoinRequestEvent -> session.cqApiImpl.cacheRequestQueue.add(event)
}
event.toCQDTO(isRawMessage = isRawMessage).takeIf { it !is CQIgnoreEventDTO }?.apply {
outgoing.send(Frame.Text(this.toJson()))
}
}
}
}

@KtorExperimentalAPI
@ExperimentalCoroutinesApi
private fun startWebsocketConnectivityCheck(bot: Bot, config: WebSocketReverseServiceConfig) {
private fun startWebsocketConnectivityCheck(bot: Bot, config: WebSocketReverseServiceConfig, clientType: String) {
GlobalScope.launch {
val httpClientKey = "${config.reverseHost}:${config.reversePort}"
if (httpClients.containsKey(httpClientKey)) {
Expand All @@ -173,12 +201,12 @@ class WebSocketReverseClient(
httpClients[httpClientKey] = HttpClient {
install(WebSockets)
}
startWebsocketClient(bot, config)
startGeneralWebsocketClient(bot, config, clientType)
break
}
delay(5000)
}
startWebsocketConnectivityCheck(bot, config)
startWebsocketConnectivityCheck(bot, config, clientType)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,17 @@ class WebSocketReverseServiceConfig(serviceConfig: ConfigSection) {
/**
* 反向Websocket Api路径 尚未实现
*/
// val reverseApiPath: String by botConfig.withDefault { "/ws" }
val reverseApiPath: String by serviceConfig.withDefault { reversePath }

/**
* 反向Websocket Event路径 尚未实现
*/
// val reverseEventPath: String by botConfig.withDefault { "/ws" }
val reverseEventPath: String by serviceConfig.withDefault { reversePath }

/**
* 是否使用Universal客户端
*/
val useUniversal: Boolean by serviceConfig.withDefault { true }

/**
* 反向 WebSocket 客户端断线重连间隔
Expand Down

0 comments on commit de3d929

Please sign in to comment.