CODESYSを使う場合のみの例です。 皆さんはスキップして『その4』に進んでください。
記録のみなので詳細は説明していません。
- IoT CoreにPublishする前に、デバイス内PythonにデータをZeroMQでRequest(REQ)します。
- ZeroMQの相手がサーバー(bind)でポート番号50000で待っています。
- ZeroMQはpyzmqのcurveを使って暗号化された公開鍵通信します。
- 公開鍵はmmapで交換しています。
- デバイス内PythonがZeroMQのReply(REP)で送ってきたメッセージを受け取り後、thing nameをメッセージに入れて、そのメッセージをjson形式にに戻してPublishしています。
~/greengrassv2/artifacts/PublishClassMethod/main.py
import zmq,mmap,json import time import signal,os import subprocess import awsiot.greengrasscoreipc from awsiot.greengrasscoreipc.model import (QOS,PublishToIoTCoreRequest) import greengrasssdk print("************ichiri COMPONENT STARTED ************") def signal_handler(sig, frame): if sig==signal.SIGINT: print('Ctrl+C was pressed. Terminate this program.') if sig==signal.SIGTSTP: print('Ctrl+Z was pressed. Terminate this program.') os._exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTSTP, signal_handler) ''' try: _result=subprocess.run(["sudo", "usermod", "-aG", "petalinux",os.getlogin()],capture_output=True,text=True) print("************ichiri SUBPROCESS RESULT:",_result.stdout) except Exception as e: print("ichiri ERROR:",e) ''' print("**123***********",zmq.__version__,"*****************") print("**********************",os.getenv("AWS_IOT_THING_NAME")) context = zmq.Context() socket = context.socket(zmq.REQ) # client_secret_key = b"client_secret_key" # クライアントの秘密鍵 client_public_key, client_secret_key = zmq.curve_keypair() # クライアントの公開鍵と秘密鍵を生成 socket.curve_secretkey = client_secret_key socket.curve_publickey = client_public_key file_path='/home/petalinux/example.txt' with open(file_path, 'r+b') as file: mm = mmap.mmap(file.fileno(),0, access=mmap.ACCESS_READ) socket.curve_serverkey = mm.readline() mm.close() socket.connect("tcp://localhost:50000") TIMEOUT = 10 topic = "topic" qos = QOS.AT_LEAST_ONCE ipc_client = awsiot.greengrasscoreipc.connect() #client = greengrasssdk.client('iot-data') #thing_name = context.client.thing_name #print("Device Name: ", thing_name) print("***********PUBLISHER START **************") for i in range(100): start=time.time() socket.send(b"Hello results") message = json.loads(socket.recv()) print("Received reply: %s" % message) message["thingname"]=os.getenv("AWS_IOT_THING_NAME") request = PublishToIoTCoreRequest() request.topic_name = topic request.payload = json.dumps(message).encode('utf-8') #request.payload = json.dumps(message).encode('utf-8') request.qos = qos operation = ipc_client.new_publish_to_iot_core() operation.activate(request) future = operation.get_response() future.result(TIMEOUT) print("? publish :{}".format(message)) print(time.time()-start,"sec") time.sleep(5) socket.disconnect("tcp://localhost:50000") print("? publisher finish. ******* ichiri MESSAGE ********")
~/greengrassv2/artifacts/PublishClassMethod/recipe.json
{ "RecipeFormatVersion": "2020-01-25", "ComponentName": "com.example.PublishClassMethod", "ComponentVersion": "1.3.0", "ComponentType": "aws.greengrass.generic", "ComponentDescription": "This publishes value to topic.", "ComponentPublisher": "ichiri", "ComponentConfiguration": { "DefaultConfiguration": { "accessControl": { "aws.greengrass.ipc.mqttproxy": { "com.example.Publisher:publish:1": { "operations": [ "aws.greengrass#PublishToIoTCore" ], "resources": [ "topic" ] } } } } }, "Manifests": [ { "Lifecycle": { "install": "python3 -m pip install --user awsiotsdk pyzmq greengrasssdk --upgrade", "run": "python3 -u {artifacts:decompressedPath}/com.example.PublishClassMethod/main.py" }, "Artifacts": [ { "Uri": "s3://BUCKET_NAME/COMPONENT_NAME/COMPONENT_VERSION/com.example.PublishClassMethod.zip", "Unarchive": "ZIP" } ] }] }
~/greengrassv2/artifacts/PublishClassMethod/gdk-config.json
{ "component": { "com.example.PublishClassMethod": { "author": "ichiri", "version": "NEXT_PATCH", "build": { "build_system" : "zip" }, "publish": { "region": "ap-northeast-1", "bucket": "mybucket-tokyo-ichiri" } } }, "gdk_version": "1.1.0" }
同じデバイス内のPythonプログラム
CODESYSからOPC UAで変数取得。 詳しくは『CODESYS Control(Runtime)の変数をPythonで取得1 OPC UA』を参照
- ZeroMQサーバをポート50000で立上、秘密鍵、公開鍵を作成して、
- 公開鍵をmmapで/home/petalinux/example.txtに入れる。 GreengrassのコンポーネントのPythonがこの公開鍵を受け取る。
- Greengrass内のコンポーネントのPythonからZeroMQ REQを受信したら
- OPC UAでCODESYSから変数を読み出す
- 変数のデータ構造をDynamoDBにフィットしやすく後にBig Data処理しやすいJSON形式(最初はDICTにいれ最終JSON)に変更する。 時刻もpythonのdatetimeオブジェクトでなく、文字列でISO8601 UTC形式に変更する。
- そしてZqroMQのREPで送信
~opc_zmq_rep.py
from opcua import Client,ua import time import os,sys,traceback import zmq import mmap from datetime import datetime, timedelta import json #import signal from signal import signal, SIGPIPE, SIG_DFL,SIGINT, SIGTSTP def signal_handler(sig, frame): if sig==SIGINT: print('Ctrl+C was pressed. Terminate this program.') if sig==SIGTSTP: print('Ctrl+Z was pressed. Terminate this program.') os._exit(0) signal(SIGINT, signal_handler) signal(SIGTSTP, signal_handler) #signal(SIGPIPE,SIG_DFL) # Establish connection to the OPC UA server url = "opc.tcp://127.0.0.1:4840" # URL of your CODESYS OPC UA server client = Client(url) client.connect() print("Connected to OPC UA server") # ZeroMQ server start context = zmq.Context() socket = context.socket(zmq.REP) server_public_key, server_secret_key = zmq.curve_keypair() print(server_secret_key) socket.curve_secretkey = server_secret_key socket.curve_publickey = server_public_key _pub_key_size=len(server_public_key) print(_pub_key_size) file_path = '/home/petalinux/example.txt' if not os.path.exists(file_path): with open(file_path, 'wb') as file: file.write(40*b"\0") with open(file_path, 'r+b') as file: mm = mmap.mmap(file.fileno(), _pub_key_size, access=mmap.ACCESS_WRITE) mm[0:_pub_key_size] = server_public_key print("server pub key",server_public_key) print("ichiri reads:",mm.readline()) mm.close() socket.curve_server = True try: socket.bind("tcp://*:50000") except zmq.error.ZMQError as e: print("ZMQError:", e) os._exit(1) # Read a specific node (replace 'ns=2;s=YourNode' with your actual node) nodeids = [ua.NodeId.from_string("ns=4;s=|var|CODESYS Control for Linux ARM64 SL.Application.PLC_PRG.VarTimeCur0"), ua.NodeId.from_string("ns=4;s=|var|CODESYS Control for Linux ARM64 SL.Application.PLC_PRG.VarTimeCur1"), ua.NodeId.from_string("ns=4;s=|var|CODESYS Control for Linux ARM64 SL.Application.GVL.MyVariable"), ua.NodeId.from_string("ns=4;s=|var|CODESYS Control for Linux ARM64 SL.Application.GVL.MyString"), ua.NodeId.from_string("ns=4;s=|var|CODESYS Control for Linux ARM64 SL.Application.GVL.MyCount")] send_dict={ "thingname":None, "timestamp":None, "variable":{}, "id":"ichiri's" } while True: message = socket.recv() start=time.time() print("zmq received: %s" % message) try: try: results = client.uaclient.get_attributes(nodeids, ua.AttributeIds.Value) send_dict["timestamp"]=str(results[0].SourceTimestamp) #send_dict={"timestamp":str(result[0].SourceTimestamp-timedelta(hours=9))} except Exception as e: print("ERROR right after get_attributes:",e) client = Client(url) client.connect() results = client.uaclient.get_attributes(nodeids, ua.AttributeIds.Value) for _id, _result in zip(nodeids, results): _id_pos=_id.Identifier.find("Application")+len("Application.") _timestamp=str(_result.SourceTimestamp) send_dict["variable"][_id.Identifier[_id_pos:]]={ "val":_result.Value.Value, "type":_result.Value._variantType.name, } except Exception as e: exc_type, exc_value, exc_traceback = sys.exc_info() traceback.print_tb(exc_traceback) # スタックトレースを出力 print("ichiri ERROR HAPPENED:",e) print(send_dict) socket.send(json.dumps(send_dict).encode()) end=time.time() print((end-start),"sec")
コメント