Files
EdgeManager/EdgeManager.php

425 lines
18 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
use Workerman\Protocols\Http\Request;
use Workerman\Protocols\Http\Response;
require_once __DIR__ . '/vendor/autoload.php';
use EdgeManager\EDataCapture\{ EDataCapture, ENodeConfigure };
use EdgeManager\EController\{ EConfigure, ECommand };
$options = getopt('h::', ['no_dup_code', 'relay_device_status', 'server_name:', 'port::', 'user:', 'password:', 'help::']);
if (array_key_exists('h', $options) or array_key_exists('help', $options)) {
print_r(
"EdgeManager使用说明
--no_dup_code 禁止code在不同的working subclass间复用
--relay_device_status 不判断是否是设备状态并转发到MES接口
--server_name pg实例的FQDN
--user pg实例的用户名
--port pg实例的端口号
--password pg实例的密码
-h, --help 显示此帮助信息
"
);
exit;
}
init_db($options['server_name'], $options['port'] ?? 5432, $options['user'], $options['password']);
$worker = new Worker('http://0.0.0.0:8888');
$worker -> name = 'EntryPoint';
$worker -> count = 20;
$worker -> onWorkerStart = function(Worker $worker) {
global $options, $dbconn;
$dbconn = pg_connect(sprintf(
"host=%s port=%s dbname=scada user=%s password=%s",
$options['server_name'], $options['port'] ?? 5432, $options['user'], $options['password']
));
};
$worker -> onMessage = function(TcpConnection $connection, Request $request) {
global $options, $dbconn;
// 先预处理POST内容
if ($request->header('content-type') === 'application/json') {
$post = $request -> post();
$get = $request -> get();
if (isset($post['action'])) {
$post = json_decode(json_encode($post, JSON_PRESERVE_ZERO_FRACTION));
}
} else {
$get = $request -> get();
$body = $request -> rawBody();
if ($body === "") {
if (count($get) === 0) {
$response = new Response(200, [
'Content-Type' => 'application/json;charset=utf-8',
], "空请求!");
$connection -> send($response);
}
} else {
$post = json_decode($request -> rawBody());
if (json_last_error() !== JSON_ERROR_NONE) {
$response = new Response(200, [
'Content-Type' => 'application/json;charset=utf-8',
], json_encode(array(
'action' => '错得太离谱以至于我也不知道这是什么动作',
'errcode' => 4001,
'errmsg' => 'POST的内容不是JSON也并非可按照JSON解析的字符串或者检查一下你的JSON是不是有注释'
)));
$connection -> send($response);
}
}
}
if (isset($post) and !(is_array($post) and count($post) === 0)) {
// Axios发送的POST是array
if (isset($post -> action)) {
if (str_ends_with($post -> action, 'node')) {
// 用这种方式自动匹配调用和action相对应的方法
// 因为pg_insert()不需要action字段
$action = $post -> action;
unset($post -> action);
if ($action === 'add_node') {
$task_connection = new AsyncTcpConnection('Text://127.0.0.1:1888');
$task_data = array(
'action' => 'add_node',
'data' => $post,
);
$task_connection -> send(json_encode($task_data));
$task_connection -> onMessage = function(AsyncTcpConnection $task_connection, $task_result) use ($connection) {
$task_connection -> close();
$connection -> send($task_result);
};
// 执行异步连接
$task_connection->connect();
} else {
$enode_configure = new ENodeConfigure($dbconn, no_dup_code: $options['no_dup_code'] ?? true, post: $post);
$res = $enode_configure -> $action();
if ($res === true)
$connection -> send(json_encode(array(
'code' => 0,
'msg' => 'Success'
)));
else if ($res === false) {
$connection -> send(json_encode(array(
'code' => 1,
'msg' => '服务器内部逻辑错误,请联系开发者!'
)));
}
}
} else if (
str_ends_with($post -> action, 'server')
or str_ends_with($post -> action, 'device')
) {
$action = $post -> action;
unset($post -> action);
if ($action === 'add_server') {
$task_connection = new AsyncTcpConnection('Text://127.0.0.1:1888');
$task_data = array(
'action' => 'add_server',
'data' => $post,
);
$task_connection -> send(json_encode($task_data));
$task_connection -> onMessage = function(AsyncTcpConnection $task_connection, $task_result) use ($connection) {
$task_connection -> close();
$connection -> send($task_result);
};
// 执行异步连接
$task_connection->connect();
} else {
$e_configure = new EConfigure($dbconn, post: $post);
$res = $e_configure -> $action();
if ($res === true)
$connection -> send(json_encode(array(
'code' => 0,
'msg' => 'Success'
)));
else if ($res === "REMAINING") {
$connection -> send(json_encode(array(
'code' => 1,
'msg' => '该服务中仍有关联的设备,不允许删除!'
)));
} else if ($res === false) {
$connection -> send(json_encode(array(
'code' => 1,
'msg' => '服务器内部逻辑错误,请联系开发者!'
)));
}
}
} else if ($post -> action === 'set_node_data') {
$data_capture = new EDataCapture($dbconn, no_dup_code: $options['no_dup_code'] ?? true, relay_device_status: $options['relay_device_status'] ?? true, post: $post);
if ($data_capture -> check_res === 'WRONG_WORKING_SUBCLASS') {
$response = new Response(200, [
'Content-Type' => 'application/json;charset=utf-8',
], json_encode(array(
'action' => 'result_set_node_data',
'errcode' => 4002,
'errmsg' => '工序单元有误,请检查是否未指定或未登记!'
)));
$connection -> send($response);
} else if ($data_capture -> check_res === 'MISMATCH_TYPE') {
$response = new Response(200, [
'Content-Type' => 'application/json;charset=utf-8',
], json_encode(array(
'action' => 'result_set_node_data',
'errcode' => 4002,
'errmsg' => '节点编码和数值类型不匹配!'
)));
$connection -> send($response);
} else if ($data_capture -> check_res === 'NO_DEVICE_CODE') {
$response = new Response(200, [
'Content-Type' => 'application/json;charset=utf-8',
], json_encode(array(
'action' => 'result_set_node_data',
'errcode' => 4002,
'errmsg' => 'device_code为必填字段'
)));
$connection -> send($response);
}
$res = $data_capture -> set_node_data();
if ($res === true) {
$connection -> send(json_encode(array(
'action' => 'result_set_node_data',
'errcode' => 0,
'errmsg' => ''
)));
} else {
$connection -> send(json_encode(array(
'action' => 'result_set_node_data',
'errcode' => 4002,
'errmsg' => 'ROLLBACKed: Bad data received (structure and/or values)'
)));
}
} else if ($post -> action === 'exec') {
$task_connection = new AsyncTcpConnection('Text://127.0.0.1:1888');
$task_data = array(
'action' => 'exec',
'data' => $post,
);
$task_connection -> send(json_encode($task_data));
$task_connection -> onMessage = function(AsyncTcpConnection $task_connection, $task_result) use ($connection) {
$task_connection -> close();
$connection -> send($task_result);
};
// 执行异步连接
$task_connection->connect();
} else {
// 有action但是不知道是什么鬼动作
$response = new Response(200, [
'Content-Type' => 'application/json;charset=utf-8',
], json_encode(array(
'action' => $post -> action,
'errcode' => 4002,
'errmsg' => '乜Q action吖同朕check check佢'
)));
$connection -> send($response);
}
} else {
// action都无
$response = new Response(200, [
'Content-Type' => 'application/json;charset=utf-8',
], json_encode(array(
'action' => '缺少action字段',
'errcode' => 4001,
'errmsg' => '你请求紧乜撚动作吖?'
)));
$connection -> send($response);
}
}
if (isset($get['query'])) {
if ($get['query'] === 'nodes') {
$enode_configure = new ENodeConfigure($dbconn, no_dup_code: $options['no_dup_code'] ?? true, get: $get);
$nodes = $enode_configure -> get_nodes();
if (is_null($nodes))
$connection -> send(json_encode(array(
'code' => 1,
'msg' => '未添加过节点!'
)));
else
$connection -> send(json_encode(array(
'code' => 0,
'data' => $nodes
)));
} else if ($get['query'] === 'working_subclasses') {
$enode_configure = new ENodeConfigure($dbconn, no_dup_code: $options['no_dup_code'] ?? true, get: $get);
$working_subclasses = $enode_configure -> get_working_subclasses($dbconn);
if (is_null($working_subclasses))
$connection -> send(json_encode(array(
'code' => 1,
'msg' => '还没有工序单元!'
)));
else
$connection -> send(json_encode(array(
'code' => 0,
'data' => $working_subclasses
)));
} else if ($get['query'] === 'codes') {
$enode_configure = new ENodeConfigure($dbconn, no_dup_code: $options['no_dup_code'] ?? true, get: $get);
$codes = $enode_configure -> get_codes_by_working_subclasses();
if (is_null($codes))
$connection -> send(json_encode(array(
'code' => 1,
'msg' => '服务器内部逻辑错误,请联系开发者!'
)));
else
$connection -> send(json_encode(array(
'code' => 0,
'data' => $codes
)));
} else if ($get['query'] === 'node_data') {
$data_capture = new EDataCapture($dbconn, no_dup_code: $options['no_dup_code'] ?? true, get: $get);
$data = $data_capture -> get_node_data();
if (is_null($data))
$connection -> send(json_encode(array(
'code' => 1,
'msg' => '服务器内部逻辑错误,请联系开发者!'
)));
else if ($data === "TOO_MANY")
$connection -> send(json_encode(array(
'code' => 1,
'msg' => '结果太多(超过二百万条),请收紧查询条件!'
)));
else
$connection -> send(json_encode(array(
'code' => 0,
'data' => $data
)));
} else if ($get['query'] === 'servers') {
$servers = EConfigure::get_servers($dbconn);
if (is_null($servers))
$connection -> send(json_encode(array(
'code' => 1,
'msg' => '未添加过服务!'
)));
else
$connection -> send(json_encode(array(
'code' => 0,
'data' => $servers
)));
} else if ($get['query'] === 'device') {
$e_configure = new EConfigure($dbconn, get: $get);
$device = $e_configure -> get_device();
if (is_null($device))
$connection -> send(json_encode(array(
'code' => 1,
'msg' => '未添加过设备!'
)));
else
$connection -> send(json_encode(array(
'code' => 0,
'data' => $device
)));
} else if ($get['query'] === 'device_list') {
$e_configure = new EConfigure($dbconn, get: $get);
$devices = $e_configure -> get_device_list();
if (is_null($devices))
$connection -> send(json_encode(array(
'code' => 1,
'msg' => '未添加过设备!'
)));
else
$connection -> send(json_encode(array(
'code' => 0,
'data' => $devices
)));
} else {
$connection -> send(json_encode(array(
'code' => 1,
'msg' => '你请求紧乜撚嘢啊?'
)));
}
}
};
$consumer = new Worker('Text://0.0.0.0:1888');
$consumer -> name = 'Consumer';
$consumer -> onWorkerStart = function(Worker $consumer) {
global $options, $consumer_dbconn;
$consumer_dbconn = pg_connect(sprintf(
"host=%s port=%s dbname=scada user=%s password=%s",
$options['server_name'], $options['port'] ?? 5432, $options['user'], $options['password']
));
};
$consumer -> onMessage = function(TcpConnection $connection, $task_data) {
global $consumer_dbconn;
$task_data = json_decode($task_data);
if ($task_data -> action === 'add_node') {
$enode_configure = new ENodeConfigure($consumer_dbconn, no_dup_code: $options['no_dup_code'] ?? true, post: $task_data -> data);
$res = $enode_configure -> add_node();
if ($res === true)
$connection -> send(json_encode(array(
'code' => 0,
'msg' => 'Success'
)));
else if ($res === "REPLICATED")
$connection -> send(json_encode(array(
'code' => 1,
'msg' => isset($options['no_dup_code']) ? '节点编码不可重复!' : '同一工序单元内的节点编码不可重复!'
)));
else if ($res === false) {
$connection -> send(json_encode(array(
'code' => 1,
'msg' => '服务器内部逻辑错误,请联系开发者!'
)));
}
} else if ($task_data -> action === 'add_server') {
$e_configure = new EConfigure($consumer_dbconn, post: $task_data -> data);
$res = $e_configure -> add_server();
if ($res === true)
$connection -> send(json_encode(array(
'code' => 0,
'msg' => 'Success'
)));
else if ($res === "REPLICATED")
$connection -> send(json_encode(array(
'code' => 1,
'msg' => '该地址或名称已注册过服务!'
)));
else if ($res === false) {
$connection -> send(json_encode(array(
'code' => 1,
'msg' => '服务器内部逻辑错误,请联系开发者!'
)));
}
} else if ($task_data -> action === 'exec') {
$e_command = new ECommand($consumer_dbconn, $task_data -> data);
$e_command -> exec();
$connection -> send(json_encode(array(
'code' => 0,
'msg' => 'Success'
)));
} else if ($task_data -> action === 'set_device_status') {
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, 'http://8.sctmes.com:22347');
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_HTTPHEADER, array('Content-Type: application/json'));
curl_setopt($ch, CURLOPT_HEADER, 1);
curl_setopt($ch,CURLOPT_POSTFIELDS, json_encode($task_data) );
curl_setopt($ch, CURLOPT_TIMEOUT, 10);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
$return = curl_exec($ch);
curl_close($ch);
}
};
Worker::runAll();