workbunny/webman-push-server
🐇 Webman plugin for push server implementation. 🐇
┌─────────────┐ 2 | 3
┌───> | Push-server | ─── ─ · ─
| └─────────────┘ 1 | 4 ··· n
| Hash | register ↑
| | PUB | SUB
┌────────────────────┐ ──┘ ┌──────────────┐ <────┘
| webman-push-server | ──────> | Redis-server |
└────────────────────┘ ──┐ └──────────────┘ <────┐
| | PUB | SUB
| Hash | register ↓
| ┌────────────┐ 2 | 3
└────> | API-server | ─── ─ · ─
└────────────┘ 1 | 4 ··· n
配置信息及对应功能在代码注释中均有解释,详见对应代码注释;
|-- config
|-- plugin
|-- webman-push-server
|-- app.php # 主配置信息
|-- bootstrap.php # 自动加载
|-- command.php # 支持命令
|-- log.php # 日志配置
|-- middlewares.php # 基础中间件
|-- process.php # 启动进程
|-- redis.php # redis配置
|-- route.php # APIs路由信息
|-- registrar.php # 分布式服务注册器配置
composer require workbunny/webman-push-server
method | url | 描述 |
---|---|---|
POST | /apps/[app_id]/events | 对应的pusher文档地址 |
POST | /apps/[app_id]/batch_events | 对应的pusher文档地址 |
GET | /apps/[app_id]/channels | 对应的pusher文档地址 |
GET | /apps/[app_id]/channels/[channel_name] | 对应的pusher文档地址 |
POST | /apps/[app_id]/users/[user_id]/terminate_connections | 对应的pusher文档地址 |
GET | /apps/[app_id]/channels/[channel_name]/users | 对应的pusher文档地址 |
<script src="https://github.com/workbunny/webman-push-server/raw/main/plugin/workbunny/webman-push-server/push.js"> </script>
TIps:每 new 一个 Push 会创建一个连接。
// 建立连接
var connection = new Push({
url: 'ws://127.0.0.1:8001', // websocket地址
app_key: '<app_key>', // 在config/plugin/workbunny/webman-push-server/app.php里配置
});
TIps:频道和事件可以是任意符合约定前缀的字符串,不需要服务端预先配置。
// 建立连接
var connection = new Push({
url: 'ws://127.0.0.1:8001', // websocket地址
app_key: '<app_key>', // 在config/plugin/workbunny/webman-push-server/app.php里配置
});
// 监听 public-test 公共频道
var user_channel = connection.subscribe('public-test');
// 当 public-test 频道有message事件的消息回调
user_channel.on('message', function(data) {
// data里是消息内容
console.log(data);
});
// 取消监听 public-test 频道
connection.unsubscribe('public-test')
// 取消所有频道的监听
connection.unsubscribeAll()
Tips:您需要先实现用于鉴权的接口服务
Tips:样例鉴权接口详见 config/plugin/workbunny/webman-push-server/route.php
// 订阅发生前,浏览器会发起一个ajax鉴权请求(ajax地址为new Push时auth参数配置的地址),开发者可以在这里判断,当前用户是否有权限监听这个频道。这样就保证了订阅的安全性。
var connection = new Push({
url: 'ws://127.0.0.1:8001', // websocket地址
app_key: '<app_key>',
auth: 'http://127.0.0.1:8002/subscribe/auth' // 该接口是样例接口,请根据源码自行实现业务
});
// 监听 private-test 私有频道
var user_channel = connection.subscribe('private-test');
// 当 private-test 频道有message事件的消息回调
user_channel.on('message', function(data) {
// data里是消息内容
console.log(data);
});
// 取消监听 private-test 频道
connection.unsubscribe('private-test')
// 取消所有频道的监听
connection.unsubscribeAll()
Tips:样例鉴权接口详见 config/plugin/workbunny/webman-push-server/route.php
// 方法一
// 订阅发生前,浏览器会发起一个ajax鉴权请求(ajax地址为new Push时auth参数配置的地址),开发者可以在这里判断,当前用户是否有权限监听这个频道。这样就保证了订阅的安全性。
var connection = new Push({
url: 'ws://127.0.0.1:8001', // websocket地址
app_key: '<app_key>',
auth: 'http://127.0.0.1:8002/subscribe/auth' // 该接口是样例接口,请根据源码自行实现业务
});
// 方法二
// 先通过接口查询获得用户信息,组装成如下
var channel_data = {
user_id: '100',
user_info: "{\'name\':\'John\',\'sex\':\'man\'}"
}
// 订阅发生前,浏览器会发起一个ajax鉴权请求(ajax地址为new Push时auth参数配置的地址),开发者可以在这里判断,当前用户是否有权限监听这个频道。这样就保证了订阅的安全性。
var connection = new Push({
url: 'ws://127.0.0.1:8001', // websocket地址
app_key: '<app_key>',
auth: 'http://127.0.0.1:8002/subscribe/auth', // 该接口是样例接口,请根据源码自行实现业务
channel_data: channel_data
});
// 监听 presence-test 状态频道
var user_channel = connection.subscribe('presence-test');
// 当 presence-test 频道有message事件的消息回调
user_channel.on('message', function(data) {
// data里是消息内容
console.log(data);
});
// 取消监听 presence-test 频道
connection.unsubscribe('presence-test')
// 取消所有频道的监听
connection.unsubscribeAll()
// 以上省略
// 私有频道
var user_channel = connection.subscribe('private-user-1');
user_channel.on('client-message', function (data) {
//
});
user_channel.trigger('client-message', {form_uid:2, content:"hello"});
// 状态频道
var user_channel = connection.subscribe('presence-user-1');
user_channel.on('client-message', function (data) {
//
});
user_channel.trigger('client-message', {form_uid:2, content:"hello"});
server {
# .... 这里省略了其它配置 ...
location /app
{
proxy_pass http://127.0.0.1:3131;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_set_header X-Real-IP $remote_addr;
}
}
var connection = new Push({
url: 'wss://example.com',
app_key: '<app_key>'
});
wss开头,不写端口,必须使用ssl证书对应的域名连接
use Workbunny\WebmanPushServer\WsClient;
use Workerman\Connection\AsyncTcpConnection;
use Workbunny\WebmanPushServer\EVENT_SUBSCRIBE;
use Workbunny\WebmanPushServer\EVENT_SUBSCRIPTION_SUCCEEDED;
// 创建连接
$client = WsClient::instance('127.0.0.1:8001', [
'app_key' => 'workbunny',
'heartbeat' => 60,
'auth' => 'http://127.0.0.1:8002/subscribe/auth',
'channel_data' => [] // channel_data
'query' => [], // query
'context_option' => []
])
// 建立连接
$client->connect();
// 关闭连接
$client->disconnect();
use Workbunny\WebmanPushServer\WsClient;
use Workerman\Connection\AsyncTcpConnection;
// 创建连接
$client = WsClient::instance('127.0.0.1:8001', [
'app_key' => 'workbunny',
'heartbeat' => 60,
'auth' => 'http://127.0.0.1:8002/subscribe/auth',
'channel_data' => [] // channel_data
'query' => [], // query
'context_option' => []
])
// 订阅一个私有通道,订阅成功后会执行回调函数
$client->subscribe('private-test', function (AsyncTcpConnection $connection, array $data) {
// 订阅成功后打印
dump($data);
});
// 订阅一个私有通道,不注册订阅成功后的回调
$client->subscribe('private-test');
// 取消订阅一个私有通道
$client->unsubscribe('private-test', function (AsyncTcpConnection $connection, array $data) {
// 取消订阅成功后打印
dump($data);
});
// 取消订阅一个私有通道,不注册订阅成功后的回调
$client->unsubscribe('private-test');
// 取消全部订阅
$client->unsubscribeAll();
// 向 private-test 通道发送 client-test 事件消息
$client->trigger('private-test', 'client-test', [
'message' => 'hello workbunny!'
]);
// 向 presence-test 通道发送 client-test 事件消息
$client->trigger('presence-test', 'client-test', [
'message' => 'hello workbunny!'
]);
// 事件不带 client- 前缀会抛出RuntimeException
try {
$client->trigger('presence-test', 'test', [
'message' => 'hello workbunny!'
]);
} catch (RuntimeException $exception){
dump($exception);
}
use Workerman\Connection\AsyncTcpConnection;
// 注册关注private-test通道的client-test事件
$client->eventOn('private-test', 'client-test', function(AsyncTcpConnection $connection, array $data) {
// 打印事件数据
dump($data);
});
// 取消关注private-test通道的client-test事件
$client->eventOff('private-test', 'client-test');
// 获取所有注册事件回调
$client->getEvents();
// 获取客户端id,当连接创建前该方法返回null
$client->getSocketId();
// 获取已订阅通道,订阅触发前该方法返回空数组
$client->getChannels();
// 发布消息
$client->publish();
// 更多详见 WsClient.php
或者使用\Workbunny\WebmanPushServer\ApiClient 【建议使用】
composer require workbunny/webman-push-server
使用pusher提供的api客户端 【不建议使用,客户端请求没有使用keep-alive】
composer require pusher/pusher-php-server
use Workbunny\WebmanPushServer\ApiClient;
try {
$pusher = new ApiClient(
'APP_KEY',
'APP_SECRET',
'APP_ID',
[
'host' =>"http://127.0.0.1:8001",
'timeout' => 60,
'keep-alive' => true
]
);
$pusher->trigger(
// 频道(channel)支持多个通道
["private-d"],
// 事件
"client-a",
// 消息体
[
'message' => 'hello workbunny!'
],
// query
[]
);
} catch (GuzzleException|ApiErrorException|PusherException $e) {
dump($e);
}
在一些服务器监控场景下,我们需要获取全量的往来信息,包括客户端的消息和服务端的回执等
<?php declare(strict_types=1);
namespace YourNamespace;
use Workbunny\WebmanPushServer\Traits\ChannelMethods;
class PushServerMiddleware
{
use ChannelMethods;
/** @inheritDoc */
public static function _subscribeResponse(string $type, array $data): void
{
// TODO
}
}
客户端与服务端的任何通讯消息会触达_subscribeResponse方法,请在_subscribeResponse方法中实现对应业务逻辑,入日志等;
在项目config/process.php或config/plugin/workbunny/webman-push-server/process.php中添加配置
// push-server-middleware
'push-server-middleware' => [
'handler' => YourNamespace\PushServerMiddleware::class,
'count' => 1,
],
我们在使用过程中可能需要为push-server的onMessage做一些安全性考虑或者数据过滤和拦截的功能,那么消息中间件非常适合该场景
// push server root middlewares
'push-server' => [
// 以拦截非websocket消息举例
function (Closure $next, TcpConnection $connection, $data): void
{
// 拦截非websocket服务消息
if (!$connection->protocol instanceof \Workerman\Protocols\Websocket) {
$connection->close('Not Websocket');
return;
}
$next($connection, $data);
}
],
我们在使用过程中,可能需要自定义事件响应客户端的消息,那么我们可以创建一个自定义响应类
<?php declare(strict_types=1);
namespace Tests\Examples;
use Workbunny\WebmanPushServer\Events\AbstractEvent;
use Workerman\Connection\TcpConnection;
class OtherEvent extends AbstractEvent
{
/**
* @inheritDoc
*/
public function response(TcpConnection $connection, array $request): void
{
// todo
}
}
\Workbunny\WebmanPushServer\Events\AbstractEvent::register('other', \Tests\Examples\OtherEvent::class);
内部广播默认存在client事件和server事件,push-server默认只会响应该两种事件,如果我们需要对其他额外的内部事件进行处理时可使用该方案
<?php declare(strict_types=1);
namespace Tests\Examples;
use Workbunny\WebmanPushServer\PublishTypes\AbstractPublishType;
class OtherType extends AbstractPublishType
{
/** @inheritDoc */
public static function response(array $data): void
{
static::verify($data, [
['appKey', 'is_string', true],
['channel', 'is_string', true],
['event', 'is_string', true],
['socketId', 'is_string', false]
]);
// todo
}
}
\Workbunny\WebmanPushServer\PublishTypes\AbstractPublishType::register('other', \Tests\Examples\OtherType::class);
\Workbunny\WebmanPushServer\PushServer::publish('other', [
'a' => 'a'
])
在一些场景下,我们可能需要对push-server进行二次开发,那么我们可以使用组合式拓展开发,以实现对push-server的拓展