name = 'EntryPoint'; $worker -> count = 20; $worker -> onWorkerStart = function(Worker $worker) { global $options, $dbconn; $dbconn = pg_connect(sprintf( "host=%s dbname=scada user=%s password=%s", $options['server_name'], $options['user'], $options['password'] )); }; $worker -> onMessage = function(TcpConnection $connection, Request $request) { global $options, $dbconn; // 先预处理POST内容 if ($request->header('content-type') === 'application/json') { $post = $request -> post(); if (isset($post['action'])) { $post = json_decode(json_encode($post, JSON_PRESERVE_ZERO_FRACTION)); } } else { $body = $request -> rawBody(); if ($body === "") { $response = new Response(200, [ 'Content-Type' => 'application/json;charset=utf-8', ], "空请求!"); $connection -> send($response); } else { $post = json_decode($request -> rawBody()); if (json_last_error() !== JSON_ERROR_NONE) { $response = new Response(200, [ 'Content-Type' => 'application/json;charset=utf-8', ], json_encode(array( 'action' => '错得太离谱以至于我也不知道这是什么动作', 'errcode' => 4001, 'errmsg' => 'POST的内容不是JSON,也并非可按照JSON解析的字符串!或者检查一下你的JSON是不是有注释?' ))); $connection -> send($response); } } } if (isset($post) and !(is_array($post) and count($post) === 0)) { // Axios:发送的POST是array if (isset($post -> action)) { if (str_ends_with($post -> action, 'node')) { // 用这种方式自动匹配调用和action相对应的方法 // 因为pg_insert()不需要action字段 $action = $post -> action; unset($post -> action); if ($action === 'add_node') { $task_connection = new AsyncTcpConnection('Text://127.0.0.1:1888'); $task_data = array( 'action' => 'add_node', 'data' => $post, ); $task_connection -> send(json_encode($task_data)); $task_connection -> onMessage = function(AsyncTcpConnection $task_connection, $task_result) use ($connection) { $task_connection -> close(); $connection -> send($task_result); }; // 执行异步连接 $task_connection->connect(); } else { $enode_configure = new ENodeConfigure($dbconn, post: $post); $res = $enode_configure -> $action(); if ($res === true) $connection -> send(json_encode(array( 'code' => 0, 'msg' => 'Success' ))); else if ($res === false) { $connection -> send(json_encode(array( 'code' => 1, 'msg' => '服务器内部逻辑错误,请联系开发者!' ))); } } } else if ($post -> action === 'set_node_data') { $data_capture = new EDataCapture($dbconn, post: $post); if ($data_capture -> check_res === 'WRONG_WORKING_SUBCLASS') { $response = new Response(200, [ 'Content-Type' => 'application/json;charset=utf-8', ], json_encode(array( 'action' => 'result_set_node_data', 'errcode' => 4002, 'errmsg' => '未登记过的工序单元!' ))); $connection -> send($response); } else if ($data_capture -> check_res === 'MISMATCH_TYPE') { $response = new Response(200, [ 'Content-Type' => 'application/json;charset=utf-8', ], json_encode(array( 'action' => 'result_set_node_data', 'errcode' => 4002, 'errmsg' => '节点编码和数值类型不匹配!' ))); $connection -> send($response); } $res = $data_capture -> set_node_data(); if ($res === true) { $connection -> send(json_encode(array( 'action' => 'result_set_node_data', 'errcode' => 0, 'errmsg' => '' ))); } else { $connection -> send(json_encode(array( 'action' => 'result_set_node_data', 'errcode' => 4002, 'errmsg' => 'ROLLBACKed: Bad data received (structure and/or values)' ))); } } else { // 有action,但是不知道是什么鬼动作 $response = new Response(200, [ 'Content-Type' => 'application/json;charset=utf-8', ], json_encode(array( 'action' => $post -> action, 'errcode' => 4002, 'errmsg' => '乜Q action吖?同朕check check佢' ))); $connection -> send($response); } } else { // action都无 $response = new Response(200, [ 'Content-Type' => 'application/json;charset=utf-8', ], json_encode(array( 'action' => '缺少action字段', 'errcode' => 4001, 'errmsg' => '你请求紧乜撚动作吖?' ))); $connection -> send($response); } } $get = $request -> get(); if (isset($get['query'])) { if ($get['query'] == 'nodes') { $enode_configure = new ENodeConfigure($dbconn, get: $get); $nodes = $enode_configure -> get_nodes(); if (is_null($nodes)) $connection -> send(json_encode(array( 'code' => 1, 'msg' => 'no node data yet' ))); else $connection -> send(json_encode(array( 'code' => 0, 'data' => $nodes ))); } else if ($get['query'] == 'working_subclasses') { $enode_configure = new ENodeConfigure($dbconn, get: $get); $working_subclasses = $enode_configure -> get_working_subclasses($dbconn); if (is_null($working_subclasses)) $connection -> send(json_encode(array( 'code' => 1, 'msg' => '还没有工序单元!' ))); else $connection -> send(json_encode(array( 'code' => 0, 'data' => $working_subclasses ))); } else if ($get['query'] == 'codes') { $enode_configure = new ENodeConfigure($dbconn, get: $get); $codes = $enode_configure -> get_codes_by_working_subclasses(); if (is_null($codes)) $connection -> send(json_encode(array( 'code' => 1, 'msg' => '服务器内部逻辑错误,请联系开发者!' ))); else $connection -> send(json_encode(array( 'code' => 0, 'data' => $codes ))); } else if ($get['query'] == 'node_data') { $data_capture = new EDataCapture($dbconn, get: $get); $data = $data_capture -> get_node_data(); if (is_null($data)) $connection -> send(json_encode(array( 'code' => 1, 'msg' => '服务器内部逻辑错误,请联系开发者!' ))); else if ($data === "TOO_MANY") $connection -> send(json_encode(array( 'code' => 1, 'msg' => '结果太多(超过二百万条),请收紧查询条件!' ))); else $connection -> send(json_encode(array( 'code' => 0, 'data' => $data ))); } else { $connection -> send(json_encode(array( 'code' => 1, 'msg' => '你请求紧乜撚嘢啊?' ))); } } }; $consumer = new Worker('Text://0.0.0.0:1888'); $consumer -> name = 'Consumer'; $consumer -> onWorkerStart = function(Worker $consumer) { global $options, $consumer_dbconn; $consumer_dbconn = pg_connect(sprintf( "host=%s dbname=scada user=%s password=%s", $options['server_name'], $options['user'], $options['password'] )); }; $consumer -> onMessage = function(TcpConnection $connection, $task_data) { global $consumer_dbconn; $task_data = json_decode($task_data); if ($task_data -> action === 'add_node') { $enode_configure = new ENodeConfigure($consumer_dbconn, post: $task_data -> data); $res = $enode_configure -> add_node(); if ($res === true) $connection -> send(json_encode(array( 'code' => 0, 'msg' => 'Success' ))); else if ($res === "REPLICATED") $connection -> send(json_encode(array( 'code' => 1, 'msg' => '同一工序单元内的节点编码不可重复!' ))); else if ($res === false) { $connection -> send(json_encode(array( 'code' => 1, 'msg' => '服务器内部逻辑错误,请联系开发者!' ))); } } }; Worker::runAll();