Skip to content
小马哥 edited this page Jan 9, 2018 · 4 revisions

简介

Hprose 2.0 最大的亮点就是增加了推送功能的支持,而且这个功能的增加是在不修改现有通讯协议的方式下实现的,因此,这里的推送服务,即使不是 Hprose 2.0 的客户端或者服务器也可以使用。

当然,在旧版本的客户端调用推送服务,或者在旧版本的服务器上自己实现推送,需要多写一些代码。所以,如果你所使用的语言支持 Hprose 2.0,那么推荐直接使用 Hprose 2.0 的推送 API 来做推送,这样会极大的减少你的工作量。

下面我们来分别介绍一下客户端和服务器端增加的关于推送的 API。

客户端

客户端关于推送的方法只有两个,它们分别是:

Subscribe 方法

Subscribe(name string, id string, settings *InvokeSettings, callback interface{}) (err error)

Subscribe 方法的用处是订阅服务器端的推送服务。该方法有两种方式,一种是自动获取设置客户端 id,另一种是手动设置客户端 id

参数 name 是订阅的主题名,它实际上也是一个服务器端的方法,该方法与普通方法的区别是,它只有一个参数 id,该参数表示客户端的唯一编号,该方法的返回值即推送信息,当返回值为 nil 或者发生错误时,客户端会忽略并再次调用该 name 对应的方法。当该方法返回推送消息时,callback 回调函数会执行,并同时再次调用该 name 对应的方法。因此当没有推送消息时,该方法不应该立即返回值,而应该挂起等待,直到超时或者有推送消息时再返回结果。

当然,对于开发者来说,自己实现一个完善的推送方法还是有一定难度的。因此,Hprose 2.0 的服务器端已经提供了一整套的专门用于推送的 API,通过这些 API,可以方便的自动实现用于推送的服务方法。在后面介绍服务器端时,我们再介绍这部分内容。

参数 id 是客户端的唯一编号,如果设置为 "" 的话,客户端会使用自动编号机制,如果该自动编号未初始化,会自动调用一个名字为 # 的服务器端远程方法,之所以使用这个特殊的名字是为了防止跟用户发布的普通方法发生冲突。Hprose 2.0 服务器已经自动实现了该方法,但是用户也可以用自己的实现来替换它,它现在的默认实现是一个 UUID 字符串。当用户指定了 id 参数时,客户端会将它作为该 name 对应方法的参数值传给服务器端,但不会修改客户端的自动编号。如果自动编号获取失败,该方法将返回错误。

参数 callback 是用来处理推送消息的回调函数,如果该参数不是函数类型,该方法将返回错误。

参数 settings 是订阅选项,但与 Invoke 调用不同,Subscribe 方法仅有以下几个选项可以设置:

  • Timeout 字段
  • Failswitch 字段
  • JSONCompatible 字段
  • SetUserData 方法

Timeout 字段表示等待推送消息的超时时间,可以省略,省略时表示与客户端的 Timeout 默认设置相同。超时之后并不会产生异常,而是重新请求推送。因为服务器端推送本身也有超时机制,因此,Timeout 所设置的超时时间最好长于服务器端的超时时间,这样可以避免产生多余的通讯。如果用户要在服务器端自己实现推送方法,应当注意处理好同一个客户端对同一个推送方法可能会进行重复调用的问题。如果使用 Hprose 2.0 提供的推送 API,则不需要关心这一点。

Failswitch 字段表示当客户端与服务器端通讯中发生网络故障,是否自动切换服务器。默认值是 false,表示不切换。

JSONCompatible 字段表示如果回调方法中的参数类型为 interface{},而推送的信息中又包含 map 数据,当该字段设置为 true 时,map 将被反序列化为 map[string]interface{} 类型。默认是反序列化为 map[interface{}]interface{} 类型。

SetUserData 方法用于为中间件或过滤器的 context 参数设置一些初始化的用户数据,通常在推送中很少用的。

对于同一个推送主题,Subscribe 方法允许被多次调用,这样可以对同一个推送主题指定多个不同的回调方法。但通常没有必要也不推荐这样做。

Unsubscribe 方法

Unsubscribe(name string, id ...string)

该方法用于取消订阅推送主题。

如果 id 参数未指定,而且如果客户端自动编号未初始化,将取消该主题上的所有订阅,否则只取消该主题上与指定 id 或者自动编号对应的订阅。

通常来说,当你调用 Subscribe 方法时如果指定了 id 参数,那么当调用 Unsubscribe 方法时你也应该指定相同的 id 参数。当你调用 Subscribe 方法时没有指定 id 参数,那么当调用 Unsubscribe 方法时你也不需要指定 id 参数。

AutoID 方法

AutoID() (string, error)

该方法返回当前客户端在进行推送订阅时自动获取的唯一编号。如果自动编号没有初始化,则自动调用远程服务初始化。

ID 方法

ID() string

AutoID 类似,只是当自动编号没有初始化时,该方法返回空字符串。

IsSubscribed 方法

IsSubscribed(name string) bool

name 所对应的主题已被订阅时,返回 true,否则返回 false

SubscribedList 方法

SubscribedList() []string

返回已被订阅的主题的列表。返回的 []string 中的元素为已订阅的主题名称。

服务器端

服务器端提供了比较多的关于推送的 API,包括广播,多播和单播方式的推送,还有超时,心跳,推送事件等设置。

Timeout 字段

该字段用于设置推送空闲超时。默认值为 120 秒,即 2 分钟。

当服务器发布了推送主题后(后面会专门介绍推送),客户端会跟服务器端保持一个长连接,如果达到超时时间,仍然没有任何消息推送给客户端,则返回 nil,此时,如果客户端仍然在线的话,则会立即再次发送获取推送主题的请求。服务器端通过这个方式可以获知客户端是否还在线。

Heartbeat 字段

该字段用来设置推送的心跳检测间隔时间。该字段默认值为 3 秒钟。

当服务器端推送数据给客户端后,如果客户端在 Heartbeat 时间内没有取走推送数据,则服务器端认为客户端以掉线。对于以掉线的客户端,服务器端会清除为该客户端分配的内存空间,并将该客户端从推送列表中移除。

TimeoutHeartbeat 字段在检测客户端是否离线时是相互配合的,当服务器端没有向客户端推送任何消息时,服务器端需要至少 Timeout + Heartbeat 的时间才能检测到客户端以离线。当服务器端有向客户端推送消息时,则在推送消息之后经过 Heartbeat 时间可以检测到客户端以掉线。

TimeoutHeartbeat 设置的时间越短,检测到客户端离线的时间就越短。但是需要注意以下几个问题:

Timeout 时间越短,服务器端和客户端之间的用于检测是否掉线的通讯就越频繁,所以不应该将 Timeout 设置的过短,否则会严重增加服务器的负担。

因此,Timeout 的设置一般不应少于 30 秒。对于负载比较高的服务器,保持默认值就是一个不错的选项。

对于推送频繁的服务器来说,Heartbeat 时间越长,对于已经离线的客户端,在服务器端存储的离线消息就越多,这会严重的占用服务器端的内存,因此,不宜将 Heartbeat 的时间设置的过长。

如果 Heartbeat 的时间设置的过短,客户端可能会因为网络原因导致不能及时取走推送消息,这就会导致错误的离线判断,当错误离线判断发生后,会丢失一些推送消息。

因此,Heartbeat 的选择则应根据客户端的网络情况来决定,如果客户端都是来自局域网,并且客户端数量较少,设置为 1 秒甚至更短的时间也是可以的。而对于比较慢速且不太稳定的移动网络,设置为 5 秒或者 10 秒可能是一个比较合适的取值。对于普通的互联网客户端来说,保持默认值就可以了。

推送事件

服务器端有两个事件是跟推送有关的,它们是:

type subscribeEvent interface {
	OnSubscribe(topic string, id string, service Service)
}

type unsubscribeEvent interface {
	OnUnsubscribe(topic string, id string, service Service)
}

当编号为 id 的客户端订阅主题 topic 时,触发 OnSubscribe 事件。

当编号为 id 的客户端退订主题 topic 时,触发 OnUnsubscribe 事件。

这两个事件同样也是通过 Event 字段来设置。在后面的例子中,我们可以看到这两个事件如何设置。

Publish 方法

Publish(topic string, timeout time.Duration, heartbeat time.Duration) Service

该方法用于发布一个推送主题。这个推送的主题实际上是一个自动生成的远程服务方法。它的功能就是实现推送。

topic 为主题名。

这里 timeoutheartbeat 参数在前面的字段介绍里已经说明过了,这里不再重复。如果这两个参数设置 <= 0,表示使用服务器端对应字段的默认设置。

Publish 方法仅仅是告诉客户端,现在有一个叫做 topic 的推送主题可以订阅。

而要真正推送数据给客户端,则需要使用以下几个方法。

	Push(topic string, result interface{}, id ...string)
	Broadcast(topic string, result interface{}, callback func([]string))
	Multicast(topic string, ids []string, result interface{}, callback func([]string))
	Unicast(topic string, id string, result interface{}, callback func(bool))

广播

可以使用 Push 或者 Broadcast 方法实现广播功能。它们的区别在于,Broadcast 有一个回调函数,而 Push 没有。

Broadcast 的回调函数的参数是成功推送的客户端 id 列表。

一旦服务器启动,你可以在任何地方进行数据推送。比如在其它的服务方法中,在服务器事件中,甚至在服务器外的并行运行的函数中。例如:

time_push_server.go

package main

import (
	"fmt"
	"time"

	"github.com/hprose/hprose-golang/rpc"
)

type event struct{}

func (event) OnSubscribe(topic string, id string, service rpc.Service) {
	fmt.Println("client " + id + " subscribe topic: " + topic)
}

func (event) OnUnsubscribe(topic string, id string, service rpc.Service) {
	fmt.Println("client " + id + " unsubscribe topic: " + topic)
}

func main() {
	server := rpc.NewTCPServer("tcp4://0.0.0.0:2016/")
	server.Publish("time", 0, 0)
	server.Event = event{}
	var timer *time.Timer
	timer = time.AfterFunc(1*time.Second, func() {
		server.Broadcast("time", time.Now().String(), func(sended []string) {
			if len(sended) > 0 {
				fmt.Println(sended)
			}
		})
		timer.Reset(1 * time.Second)
	})
	server.Start()
}

time_push_client.go

package main

import (
	"fmt"

	"github.com/hprose/hprose-golang/rpc"
)
type event struct {}

func (e *event) OnError(name string, err error) {
    fmt.Printf("name: %s, err: %s\n", name, err.Error())
}

func main() {
	client := rpc.NewTCPClient("tcp4://127.0.0.1:2016/")
        client.SetEvent(&event{})
	count := 0
	done := make(chan struct{})
	client.Subscribe("time", "", nil, func(data string) {
		count++
		if count > 10 {
			client.Unsubscribe("time")
			done <- struct{}{}
		}
		fmt.Println(data)
	})
	<-done
}

运行上面两个程序,我们会看到如下结果:

服务器端输出

client 34c0d969-6445-468f-a733-8828455b3455 subscribe topic: time
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
[34c0d969-6445-468f-a733-8828455b3455]
client 34c0d969-6445-468f-a733-8828455b3455 unsubscribe topic: time

客户端输出

2016-10-29 13:47:37.403264657 +0800 CST
2016-10-29 13:47:38.406085093 +0800 CST
2016-10-29 13:47:39.410960974 +0800 CST
2016-10-29 13:47:40.412490658 +0800 CST
2016-10-29 13:47:41.413672273 +0800 CST
2016-10-29 13:47:42.414227648 +0800 CST
2016-10-29 13:47:43.415540827 +0800 CST
2016-10-29 13:47:44.415833625 +0800 CST
2016-10-29 13:47:45.419348921 +0800 CST
2016-10-29 13:47:46.420147822 +0800 CST
2016-10-29 13:47:47.421158194 +0800 CST

如果你同时运行两个或更多个客户端,会看到每个客户端都能收到推送信息。服务器端也会看到相应的输出。

有时候,你可能想在某个服务方法中推送数据给客户端,但是该服务方法可能在其它文件中定义。因此,你得不到 server 对象。那这时还能进行推送吗?

答案是:可以的。我们前面说过,在服务方法中我们可以得到一个 context 参数,这个 context 参数中就包含有一个 Clients 对象,这个对象上包含了所有跟推送有关的方法,这些方法跟 server 对象上的推送方法是完全一样的。

例如:

push_server.go

package main

import "github.com/hprose/hprose-golang/rpc"

func hello(name string, context *rpc.SocketContext) string {
	context.Clients().Push("ip", context.Conn.RemoteAddr().String())
	return "Hello " + name + "!"
}

func main() {
	server := rpc.NewTCPServer("tcp4://0.0.0.0:4321/")
	server.AddFunction("hello", hello)
	server.Publish("ip", 0, 0)
	server.Start()
}

push_client.go

package main

import (
	"fmt"

	"github.com/hprose/hprose-golang/rpc"
)

type HelloService struct {
	Hello func(string) (string, error)
}

func main() {
	client := rpc.NewClient("tcp://127.0.0.1:4321/")
	client.Subscribe("ip", "", nil, func(ip string) {
		fmt.Println(ip)
	})
	var helloService *HelloService
	client.UseService(&helloService)
	for i := 0; i < 10; i++ {
		fmt.Println(helloService.Hello("world"))
	}
}

然后分别运行服务器和客户端,会看到客户端有如下输出:

Hello world! <nil>
Hello world! <nil>
Hello world! <nil>
127.0.0.1:54998
Hello world! <nil>
127.0.0.1:54998
Hello world! <nil>
127.0.0.1:54998
Hello world! <nil>
127.0.0.1:54998
Hello world! <nil>
127.0.0.1:54998
Hello world! <nil>
127.0.0.1:54998
Hello world! <nil>
127.0.0.1:54998
Hello world! <nil>

因为订阅是异步的,所以客户端可能要在几个同步调用之后,才能正式与服务器建立推送连接。所以我们看到客户端收到的 IP 地址是在几个同步调用之后,才会显示。

_注意:虽然上面的例子都是使用的 TCP 服务器和客户端,但是并不是说只有 Hprose 的 TCP 实现才支持推送服务,实际上 Hprose 的 HTTP 和 WebSocket 实现也支持推送。

多播

可以使用 Push 或者 Multicast 方法实现多播功能。它们的区别除了参数顺序不同以外,Multicast 方法可以接收回调函数,而 Push 没有回调。Multicast 方法的回调跟 Broadcast 方法的回调意义相同。

单播

可以使用 Push 或者 Unicast 方法实现单播功能。Unicast 方法可以接收回调函数,Push 没有回调。Unicast 方法的回调参数是一个bool 值,推送成功该值为 true,失败为 false

IDList 方法

IDList(topic string) []string

该方法用于获取当前在线的订阅了主题 topic 的所有客户端的 id 列表。

Exist 方法

Exist(topic string, id string) bool

该方法用于快速判断 id 是否在当前在线的订阅了主题 topic 的客户端列表中。

注意,客户端在线状态是针对主题的,同一个客户端可能针对一个主题处于在线状态,但是针对另一个主题却处于离线状态,这种情况是正常的。