CODESYSを使う場合のみの例です。 皆さんはスキップして『その4』に進んでください。
記録のみなので詳細は説明していません。
- IoT CoreにPublishする前に、デバイス内PythonにデータをZeroMQでRequest(REQ)します。
- ZeroMQの相手がサーバー(bind)でポート番号50000で待っています。
- ZeroMQはpyzmqのcurveを使って暗号化された公開鍵通信します。
- 公開鍵はmmapで交換しています。
- デバイス内PythonがZeroMQのReply(REP)で送ってきたメッセージを受け取り後、thing nameをメッセージに入れて、そのメッセージをjson形式にに戻してPublishしています。
~/greengrassv2/artifacts/PublishClassMethod/main.py
import zmq,mmap,json
import time
import signal,os
import subprocess
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (QOS,PublishToIoTCoreRequest)
import greengrasssdk
print("************ichiri COMPONENT STARTED ************")
def signal_handler(sig, frame):
if sig==signal.SIGINT:
print('Ctrl+C was pressed. Terminate this program.')
if sig==signal.SIGTSTP:
print('Ctrl+Z was pressed. Terminate this program.')
os._exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTSTP, signal_handler)
'''
try:
_result=subprocess.run(["sudo", "usermod", "-aG", "petalinux",os.getlogin()],capture_output=True,text=True)
print("************ichiri SUBPROCESS RESULT:",_result.stdout)
except Exception as e:
print("ichiri ERROR:",e)
'''
print("**123***********",zmq.__version__,"*****************")
print("**********************",os.getenv("AWS_IOT_THING_NAME"))
context = zmq.Context()
socket = context.socket(zmq.REQ)
# client_secret_key = b"client_secret_key" # クライアントの秘密鍵
client_public_key, client_secret_key = zmq.curve_keypair() # クライアントの公開鍵と秘密鍵を生成
socket.curve_secretkey = client_secret_key
socket.curve_publickey = client_public_key
file_path='/home/petalinux/example.txt'
with open(file_path, 'r+b') as file:
mm = mmap.mmap(file.fileno(),0, access=mmap.ACCESS_READ)
socket.curve_serverkey = mm.readline()
mm.close()
socket.connect("tcp://localhost:50000")
TIMEOUT = 10
topic = "topic"
qos = QOS.AT_LEAST_ONCE
ipc_client = awsiot.greengrasscoreipc.connect()
#client = greengrasssdk.client('iot-data')
#thing_name = context.client.thing_name
#print("Device Name: ", thing_name)
print("***********PUBLISHER START **************")
for i in range(100):
start=time.time()
socket.send(b"Hello results")
message = json.loads(socket.recv())
print("Received reply: %s" % message)
message["thingname"]=os.getenv("AWS_IOT_THING_NAME")
request = PublishToIoTCoreRequest()
request.topic_name = topic
request.payload = json.dumps(message).encode('utf-8')
#request.payload = json.dumps(message).encode('utf-8')
request.qos = qos
operation = ipc_client.new_publish_to_iot_core()
operation.activate(request)
future = operation.get_response()
future.result(TIMEOUT)
print("? publish :{}".format(message))
print(time.time()-start,"sec")
time.sleep(5)
socket.disconnect("tcp://localhost:50000")
print("? publisher finish. ******* ichiri MESSAGE ********")
~/greengrassv2/artifacts/PublishClassMethod/recipe.json
{
"RecipeFormatVersion": "2020-01-25",
"ComponentName": "com.example.PublishClassMethod",
"ComponentVersion": "1.3.0",
"ComponentType": "aws.greengrass.generic",
"ComponentDescription": "This publishes value to topic.",
"ComponentPublisher": "ichiri",
"ComponentConfiguration": {
"DefaultConfiguration": {
"accessControl": {
"aws.greengrass.ipc.mqttproxy": {
"com.example.Publisher:publish:1": {
"operations": [
"aws.greengrass#PublishToIoTCore"
],
"resources": [
"topic"
]
}
}
}
}
},
"Manifests": [
{
"Lifecycle": {
"install": "python3 -m pip install --user awsiotsdk pyzmq greengrasssdk --upgrade",
"run": "python3 -u {artifacts:decompressedPath}/com.example.PublishClassMethod/main.py"
},
"Artifacts": [
{
"Uri": "s3://BUCKET_NAME/COMPONENT_NAME/COMPONENT_VERSION/com.example.PublishClassMethod.zip",
"Unarchive": "ZIP"
}
]
}]
}
~/greengrassv2/artifacts/PublishClassMethod/gdk-config.json
{
"component": {
"com.example.PublishClassMethod": {
"author": "ichiri",
"version": "NEXT_PATCH",
"build": {
"build_system" : "zip"
},
"publish": {
"region": "ap-northeast-1",
"bucket": "mybucket-tokyo-ichiri"
}
}
},
"gdk_version": "1.1.0"
}
同じデバイス内のPythonプログラム
CODESYSからOPC UAで変数取得。 詳しくは『CODESYS Control(Runtime)の変数をPythonで取得1 OPC UA』を参照
- ZeroMQサーバをポート50000で立上、秘密鍵、公開鍵を作成して、
- 公開鍵をmmapで/home/petalinux/example.txtに入れる。 GreengrassのコンポーネントのPythonがこの公開鍵を受け取る。
- Greengrass内のコンポーネントのPythonからZeroMQ REQを受信したら
- OPC UAでCODESYSから変数を読み出す
- 変数のデータ構造をDynamoDBにフィットしやすく後にBig Data処理しやすいJSON形式(最初はDICTにいれ最終JSON)に変更する。 時刻もpythonのdatetimeオブジェクトでなく、文字列でISO8601 UTC形式に変更する。
- そしてZqroMQのREPで送信
~opc_zmq_rep.py
from opcua import Client,ua
import time
import os,sys,traceback
import zmq
import mmap
from datetime import datetime, timedelta
import json
#import signal
from signal import signal, SIGPIPE, SIG_DFL,SIGINT, SIGTSTP
def signal_handler(sig, frame):
if sig==SIGINT:
print('Ctrl+C was pressed. Terminate this program.')
if sig==SIGTSTP:
print('Ctrl+Z was pressed. Terminate this program.')
os._exit(0)
signal(SIGINT, signal_handler)
signal(SIGTSTP, signal_handler)
#signal(SIGPIPE,SIG_DFL)
# Establish connection to the OPC UA server
url = "opc.tcp://127.0.0.1:4840" # URL of your CODESYS OPC UA server
client = Client(url)
client.connect()
print("Connected to OPC UA server")
# ZeroMQ server start
context = zmq.Context()
socket = context.socket(zmq.REP)
server_public_key, server_secret_key = zmq.curve_keypair()
print(server_secret_key)
socket.curve_secretkey = server_secret_key
socket.curve_publickey = server_public_key
_pub_key_size=len(server_public_key)
print(_pub_key_size)
file_path = '/home/petalinux/example.txt'
if not os.path.exists(file_path):
with open(file_path, 'wb') as file:
file.write(40*b"\0")
with open(file_path, 'r+b') as file:
mm = mmap.mmap(file.fileno(), _pub_key_size, access=mmap.ACCESS_WRITE)
mm[0:_pub_key_size] = server_public_key
print("server pub key",server_public_key)
print("ichiri reads:",mm.readline())
mm.close()
socket.curve_server = True
try:
socket.bind("tcp://*:50000")
except zmq.error.ZMQError as e:
print("ZMQError:", e)
os._exit(1)
# Read a specific node (replace 'ns=2;s=YourNode' with your actual node)
nodeids = [ua.NodeId.from_string("ns=4;s=|var|CODESYS Control for Linux ARM64 SL.Application.PLC_PRG.VarTimeCur0"),
ua.NodeId.from_string("ns=4;s=|var|CODESYS Control for Linux ARM64 SL.Application.PLC_PRG.VarTimeCur1"),
ua.NodeId.from_string("ns=4;s=|var|CODESYS Control for Linux ARM64 SL.Application.GVL.MyVariable"),
ua.NodeId.from_string("ns=4;s=|var|CODESYS Control for Linux ARM64 SL.Application.GVL.MyString"),
ua.NodeId.from_string("ns=4;s=|var|CODESYS Control for Linux ARM64 SL.Application.GVL.MyCount")]
send_dict={
"thingname":None,
"timestamp":None,
"variable":{},
"id":"ichiri's"
}
while True:
message = socket.recv()
start=time.time()
print("zmq received: %s" % message)
try:
try:
results = client.uaclient.get_attributes(nodeids, ua.AttributeIds.Value)
send_dict["timestamp"]=str(results[0].SourceTimestamp)
#send_dict={"timestamp":str(result[0].SourceTimestamp-timedelta(hours=9))}
except Exception as e:
print("ERROR right after get_attributes:",e)
client = Client(url)
client.connect()
results = client.uaclient.get_attributes(nodeids, ua.AttributeIds.Value)
for _id, _result in zip(nodeids, results):
_id_pos=_id.Identifier.find("Application")+len("Application.")
_timestamp=str(_result.SourceTimestamp)
send_dict["variable"][_id.Identifier[_id_pos:]]={
"val":_result.Value.Value,
"type":_result.Value._variantType.name,
}
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_tb(exc_traceback) # スタックトレースを出力
print("ichiri ERROR HAPPENED:",e)
print(send_dict)
socket.send(json.dumps(send_dict).encode())
end=time.time()
print((end-start),"sec")

コメント