windows环境下TP5.1使用think-worker(Workerman/GatewayWorker)

来源:blog.csdn.net 时间:2021-05-07 22:22
 

 

 

测试环境 windows10;PHP7.2;TP5.1;

这里只介绍如何使用TP集成的workerman扩展库think-worker,原生workerman的使用请参考官方文档

TP5.1集成了workerman,使用composer require topthink/think-worker=2.0.*安装即可。
TP5.1只能安装think-worker2.0版本,最新的think-worker3.0版本是给TP6.0用的,但依赖安装workerman的版本是最新的。

虽然集成了,但是在windows下使用还是有许多问题,比如直接运行命令php think woker:gateway会报错GatewayWorker Not Support On Windows. windows解决方案 ,Linux下可以直接运行(应该吧~)。
官方的使用文档也不够详细,只列举了workerworker:server两种运行方式的简单示列。但是大部分使用workerman都是奔着GatewayWorker去的,毕竟自己用workerman完全搭建还是需要技术和时间的。

单纯的使用workerman,直接运行php think workerphp think worker:server就可以,调试也非常简单,TP官方文档有说明就不赘述了,重点是gatewayworker。

首先是解决如何运行gatewayworker

根据workerman的文档,windows下不能在同一个php文件中运行多个worker,所以需要修改tinkphp的命令行
新建自定义命令行文件application\common\command\Workerman.php

<?php

namespace app\common\command;

use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use think\facade\Config;
use Workerman\Worker;

/**
 * Worker 命令行
 */
class Workerman extends Command
{
    protected function configure()
    {
        $this->setName('workerman')
            ->addArgument('service', Argument::OPTIONAL, 'workerman service: gateway|register|businessworker', null)
            ->addOption('host', 'H', Option::VALUE_OPTIONAL, 'the host of workerman server', null)
            ->addOption('port', 'P', Option::VALUE_OPTIONAL, 'the port of workerman server', null)
            ->addOption('daemon', 'd', Option::VALUE_OPTIONAL, 'Run the workerman server in daemon mode.')
            ->setDescription('workerman Server for ThinkPHP');
    }

    public function execute(Input $input, Output $output)
    {
        $service = $input->getArgument('service');

        $option = Config::pull('gateway_worker');

        if ($input->hasOption('host')) {
            $host = $input->getOption('host');
        } else {
            $host = !empty($option['host']) ? $option['host'] : '0.0.0.0';
        }

        if ($input->hasOption('port')) {
            $port = $input->getOption('port');
        } else {
            $port = !empty($option['port']) ? $option['port'] : '2347';
        }

        $registerAddress = !empty($option['registerAddress']) ? $option['registerAddress'] : '127.0.0.1:1236';
        switch ($service) {
            case 'register':
                $this->register($registerAddress);
            break;
            case 'businessworker':
                $this->businessWorker($registerAddress, isset($option['businessWorker']) ? $option['businessWorker'] : []);
            break;
            case 'gateway':
                $this->gateway($registerAddress, $host, $port, $option);
                break;
            default:
                $output->writeln("<error>Invalid argument action:{$service}, Expected gateway|register|businessworker .</error>");
                exit(1);
                break;
        }

        Worker::runAll();
    }

    /**
     * 启动register
     * @access public
     * @param  string   $registerAddress
     * @return void
     */
    public function register($registerAddress)
    {
        // 初始化register
        new Register('text://' . $registerAddress);
    }

    /**
     * 启动businessWorker
     * @access public
     * @param  string   $registerAddress registerAddress
     * @param  array    $option 参数
     * @return void
     */
    public function businessWorker($registerAddress, $option = [])
    {
        // 初始化 bussinessWorker 进程
        $worker = new BusinessWorker();

        $this->option($worker, $option);

        $worker->registerAddress = $registerAddress;
    }

    /**
     * 启动gateway
     * @access public
     * @param  string   $registerAddress registerAddress
     * @param  string   $host 服务地址
     * @param  integer  $port 监听端口
     * @param  array    $option 参数
     * @return void
     */
    public function gateway($registerAddress, $host, $port, $option = [])
    {
        // 初始化 gateway 进程
        if (!empty($option['socket'])) {
            $socket = $option['socket'];
            unset($option['socket']);
        } else {
            $protocol = !empty($option['protocol']) ? $option['protocol'] : 'websocket';
            $socket   = $protocol . '://' . $host . ':' . $port;
            unset($option['host'], $option['port'], $option['protocol']);
        }

        $gateway = new Gateway($socket, isset($option['context']) ? $option['context'] : []);

        // 以下设置参数都可以在配置文件中重新定义覆盖
        $gateway->name                 = 'Gateway';
        $gateway->count                = 4;
        $gateway->lanIp                = '127.0.0.1';
        $gateway->startPort            = 2000;
        $gateway->pingInterval         = 30;
        $gateway->pingNotResponseLimit = 0;
        $gateway->pingData             = '{"type":"ping"}';
        $gateway->registerAddress      = $registerAddress;

        // 全局静态属性设置
        foreach ($option as $name => $val) {
            if (in_array($name, ['stdoutFile', 'daemonize', 'pidFile', 'logFile'])) {
                Worker::${$name} = $val;
                unset($option[$name]);
            }
        }

        $this->option($gateway, $option);
    }

    /**
     * 设置参数
     * @access protected
     * @param  Worker   $worker Worker对象
     * @param  array    $option 参数
     * @return void
     */
    protected function option($worker, array $option = [])
    {
        // 设置参数
        if (!empty($option)) {
            foreach ($option as $key => $val) {
                $worker->$key = $val;
            }
        }
    }
}

 

application\command.php命令行参数配置文件中添加

return [
    'workerman' => '\\app\\common\\command\\Workerman',
];
 
  • 1
  • 2
  • 3

打开三个cmd命令窗口,分别运行
php think workerman register
php think workerman businessworker
php think workerman gateway

运行结果
在这里插入图片描述

调试gatewayworker程序

添加Events监听事件文件application\workerman\Events.php,这里偷懒直接复制了官方的Events文件,自己写的话,方法没写全运行时会报错退出,所以干脆直接全部复制,修改一下命名空间即可。

<?php

namespace app\workerman;

use GatewayWorker\Lib\Gateway;
use think\worker\Application;
use Workerman\Worker;

/**
 * Worker 命令行服务类
 */
class Events
{
    /**
     * onWorkerStart 事件回调
     * 当businessWorker进程启动时触发。每个进程生命周期内都只会触发一次
     *
     * @access public
     * @param  \Workerman\Worker    $businessWorker
     * @return void
     */
    public static function onWorkerStart(Worker $businessWorker)
    {
        $app = new Application;
        $app->initialize();
    }

    /**
     * onConnect 事件回调
     * 当客户端连接上gateway进程时(TCP三次握手完毕时)触发
     *
     * @access public
     * @param  int       $client_id
     * @return void
     */
    public static function onConnect($client_id)
    {
        Gateway::sendToCurrentClient("Your client_id is $client_id");
    }

    /**
     * onWebSocketConnect 事件回调
     * 当客户端连接上gateway完成websocket握手时触发
     *
     * @param  integer  $client_id 断开连接的客户端client_id
     * @param  mixed    $data
     * @return void
     */
    public static function onWebSocketConnect($client_id, $data)
    {
        var_export($data);
    }

    /**
     * onMessage 事件回调
     * 当客户端发来数据(Gateway进程收到数据)后触发
     *
     * @access public
     * @param  int       $client_id
     * @param  mixed     $data
     * @return void
     */
    public static function onMessage($client_id, $data)
    {
        Gateway::sendToAll($data);
    }

    /**
     * onClose 事件回调 当用户断开连接时触发的方法
     *
     * @param  integer $client_id 断开连接的客户端client_id
     * @return void
     */
    public static function onClose($client_id)
    {
        GateWay::sendToAll("client[$client_id] logout\n");
    }

    /**
     * onWorkerStop 事件回调
     * 当businessWorker进程退出时触发。每个进程生命周期内都只会触发一次。
     *
     * @param  \Workerman\Worker    $businessWorker
     * @return void
     */
    public static function onWorkerStop(Worker $businessWorker)
    {
        echo "WorkerStop\n";
    }
}

 

修改配置监听文件config\gateway_worker.php

     // BusinsessWorker配置
    'businessWorker'        => [
        'name'         => 'BusinessWorker',
        'count'        => 1,
        'eventHandler' => '\app\workerman\Events', // 原来是\think\worker\Events,改成自己的监听文件位置
    ],
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

添加前端测试文件,这里使用的是vue,关于前端如何使用webSocket,网上到处都是,也很简单。

// vue测试代码片段
export default {
  data () {
    return {
      websocket: null
    }
  },

  mounted () {
    this.websocket = new WebSocket('ws://127.0.0.1:2348') // 使用gateway的地址

    this.websocket.onmessage = evt => {
      console.log(evt.data) // 打印接收的消息
    }
  }
}
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

重启businessworker服务,运行vue,前端控制台会打印

Your client_id is 7f00000107d000000001
 
  • 1

这是在Events文件监听事件onConnect中的程序,当客户端连接时,向当前客户端发送信息,多开几个窗口,测试多客户端连接时的效果。

向指定客户端发送消息

首先需要明确的是,gatewayworker只能通过client_id识别客户端,每产生一次连接,就会生成一个client_id,即便是同一个页面,发生了多次连接,gatewayworker也会认为是不同的客户端。
实际业务中客户端往往是以用户id或其他形式的id作为区分,所以实际业务中需要将client_id和业务id进行绑定并做判断,这里做测试就不深入讨论了,直接用client_id进行测试

修改前端文件

// vue模板代码片段
<template>
  <el-row type="flex">
    <el-select v-model="selectClientId">
      <el-option v-for="(item, index) in clients" :key="index" :value="item" :label="item" />
    </el-select>
    <el-input v-model="message"></el-input>
    <el-button @click="submit">发送</el-button>
  </el-row>
</template>
 
// vue js代码片段
  data () {
    return {
      websocket: null,
      clients: [], // client用户列表
      selectClientId: '', // 选择的用户
      message: '' // 需要发送的消息
    }
  },
  methods: {
    submit () {
      const data = {
        client_id: this.selectClientId, // 指定的客户端id
        message: this.message
      }
      this.websocket.send(JSON.stringify(data))
    }
  },
  mounted () {
    this.websocket = new WebSocket('ws://127.0.0.1:2348') // 使用gateway的地址
    
    this.websocket.onmessage = evt => {
      const data = JSON.parse(evt.data)
      if (data.type === 'login') {
        this.clients.push(data.client_id)
      }
      console.log(data.message)
    }
  }
 

修改监听文件,修改了onConnect和onMessage两个监听回调

    # ...
    public static function onConnect($client_id)
    {
        // Gateway::sendToCurrentClient("Your client_id is $client_id");
        $message = [
            'type' => 'login',
            'client_id' => $client_id,
            'message' => 'user ' . $client_id . ' is login',
        ];
        Gateway::sendToAll(json_encode($message));
    }
    # ...
    public static function onMessage($client_id, $data)
    {
        // Gateway::sendToAll($data);
        $data = json_decode($data, true);
        $form_client = $client_id;
        $to_client = $data['client_id'];
        $message = $data['message'];
        $send_message = [
            'type' => 'message',
            'message' => "user {$form_client} send {$message} to you",
        ];
        if ($to_client) {
            // 如果有指定用户,则发送给指定用户
            Gateway::sendToClient($to_client, json_encode($send_message));
        } else {
            // 没有指定用户,发送给全部
            Gateway::sendToAll($data);
        }
    }
 

重启worker服务,测试效果
在这里插入图片描述

workerman的官方文档中明确指出不建议直接通过客户端发送消息,而是通过原来的框架处理业务逻辑
与ThinkPHP等框架结合

总体原则:

现有mvc框架项目与GatewayWorker独立部署互不干扰

所有的业务逻辑都由网站页面post/get到mvc框架中完成

GatewayWorker不接受客户端发来的数据,即GatewayWorker不处理任何业务逻辑,GatewayWorker仅仅当做一个单向的推送通道

仅当mvc框架需要向浏览器主动推送数据时才在mvc框架中调用Gateway的API(GatewayClient)完成推送

在TP框架中调用Gateway的API

workerman官方文档建议使用GatewayClient提供的API发送数据,这个需要额外安装composer require workerman/gatewayclient,使用方法在官方文档中有说明,和使用gateway一样。但在TP的实际测试中,无需安装也可以正常使用,这里使用的是GatewayWorker\Lib\Gateway,也不需要配置参数,可以直接使用。

TP处理业务逻辑的控制器

<?php

namespace app\index\controller;

use GatewayWorker\Lib\Gateway;
use think\Controller;

class Index extends Controller
{
    public function index()
    {
        $client_id = $this->request->get('client_id');
        $send_message = $this->request->get('message');
        $message = [
            'type' => 'message',
            'message' => $this->request->get('message'),
        ];
        Gateway::sendToClient($client_id, json_encode($message));
    }
}
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

浏览器直接访问或ajax访问效果一致,运行结果
在这里插入图片描述

至此,TP5.1中使用think-worker调试基本通过,剩下的就是根据实际业务逻辑进行处理了。

总结说明

在官方文档 与ThinkPHP等框架结合 的使用说明中和案例中发现,不需要在Events监听文件中写业务逻辑和判断,所有的业务逻辑都可以在TP框架中完成,Events的作用仅仅是将client_id告诉客户端。

而在tink-worker原来的Events文件中,当客户端连接时,就向当前客户端发送过一条信息"Your client_id is 7f00000107d000000001",使用正则匹配就能拿到client_id,无需更改文件。

那么TP5.1的think-worker的使用可以简化如下

  1. windows下修改gatewayworker的启动方式,Linux无需更改(我也没有测试)。
  2. php业务逻辑中使用GatewayWorker\Lib\Gateway调用gateway的API给客户端发送消息。

所以不需要过多的更改gateway的配置文件,也不需要额外的建立监听文件,就可以直接使用gateway了,当然windows环境下因为机制问题,所以更改了启动方式。饶了一大圈回来,发现think-worker的使用方式是如此简单,所以官方文档是觉得太简单了所以没有给使用说明的必要么????