AWS Greengrass~DynamoDBやってみる その3の2 CODESYS ZeroMQ

CODESYSを使う場合のみの例です。 皆さんはスキップして『その4』に進んでください。

記録のみなので詳細は説明していません。

  1. IoT CoreにPublishする前に、デバイス内PythonにデータをZeroMQでRequest(REQ)します。
  2. ZeroMQの相手がサーバー(bind)でポート番号50000で待っています。
  3. ZeroMQはpyzmqのcurveを使って暗号化された公開鍵通信します。
  4. 公開鍵はmmapで交換しています。
  5. デバイス内PythonがZeroMQのReply(REP)で送ってきたメッセージを受け取り後、thing nameをメッセージに入れて、そのメッセージをjson形式にに戻してPublishしています。
~/greengrassv2/artifacts/PublishClassMethod/main.py
import zmq,mmap,json
import time
import signal,os
import subprocess
import awsiot.greengrasscoreipc
from awsiot.greengrasscoreipc.model import (QOS,PublishToIoTCoreRequest)
import greengrasssdk


print("************ichiri COMPONENT STARTED ************")
def signal_handler(sig, frame):
    if sig==signal.SIGINT:
        print('Ctrl+C was pressed. Terminate this program.')
    if sig==signal.SIGTSTP:
        print('Ctrl+Z was pressed. Terminate this program.')
    os._exit(0)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTSTP, signal_handler)
'''
try:
    _result=subprocess.run(["sudo", "usermod", "-aG", "petalinux",os.getlogin()],capture_output=True,text=True)
    print("************ichiri SUBPROCESS RESULT:",_result.stdout)
except Exception as e:
    print("ichiri ERROR:",e)
'''

print("**123***********",zmq.__version__,"*****************")
print("**********************",os.getenv("AWS_IOT_THING_NAME"))
context = zmq.Context()
socket = context.socket(zmq.REQ)
# client_secret_key = b"client_secret_key"  # クライアントの秘密鍵
client_public_key, client_secret_key = zmq.curve_keypair()  # クライアントの公開鍵と秘密鍵を生成
socket.curve_secretkey = client_secret_key
socket.curve_publickey = client_public_key
file_path='/home/petalinux/example.txt'
with open(file_path, 'r+b') as file:
    mm = mmap.mmap(file.fileno(),0, access=mmap.ACCESS_READ)
    socket.curve_serverkey  = mm.readline()
    mm.close()
socket.connect("tcp://localhost:50000")

TIMEOUT = 10
topic = "topic"
qos = QOS.AT_LEAST_ONCE

ipc_client = awsiot.greengrasscoreipc.connect()

#client = greengrasssdk.client('iot-data')
#thing_name = context.client.thing_name
#print("Device Name: ", thing_name)
print("***********PUBLISHER START **************")
for i in range(100):
    start=time.time()
    socket.send(b"Hello results")
    message = json.loads(socket.recv())
    print("Received reply: %s" % message)
    message["thingname"]=os.getenv("AWS_IOT_THING_NAME")
    request = PublishToIoTCoreRequest()
    request.topic_name = topic
    request.payload = json.dumps(message).encode('utf-8')
    #request.payload = json.dumps(message).encode('utf-8')
    request.qos = qos

    operation = ipc_client.new_publish_to_iot_core()
    operation.activate(request)
    future = operation.get_response()
    future.result(TIMEOUT)
    print("? publish :{}".format(message))
    print(time.time()-start,"sec")
    time.sleep(5)
socket.disconnect("tcp://localhost:50000")
print("? publisher finish. ******* ichiri MESSAGE ********")
~/greengrassv2/artifacts/PublishClassMethod/recipe.json
{
    "RecipeFormatVersion": "2020-01-25",
    "ComponentName": "com.example.PublishClassMethod",
    "ComponentVersion": "1.3.0",
    "ComponentType": "aws.greengrass.generic",
    "ComponentDescription": "This publishes value to topic.",
    "ComponentPublisher": "ichiri",
    "ComponentConfiguration": {
        "DefaultConfiguration": {
          "accessControl": {
            "aws.greengrass.ipc.mqttproxy": {
              "com.example.Publisher:publish:1": {
                "operations": [
                  "aws.greengrass#PublishToIoTCore"
                ],
                "resources": [
                  "topic"
                ]
              }
            }
          }
        }
    },
    "Manifests": [
      {
        "Lifecycle": {
          "install": "python3 -m pip install --user awsiotsdk pyzmq greengrasssdk --upgrade",
          "run": "python3 -u {artifacts:decompressedPath}/com.example.PublishClassMethod/main.py"
        },
        "Artifacts": [
          {
            "Uri": "s3://BUCKET_NAME/COMPONENT_NAME/COMPONENT_VERSION/com.example.PublishClassMethod.zip",
            "Unarchive": "ZIP"
          }
        ]
      }]
  }
~/greengrassv2/artifacts/PublishClassMethod/gdk-config.json
{
    "component": {
      "com.example.PublishClassMethod": {
        "author": "ichiri",
        "version": "NEXT_PATCH",
        "build": {
          "build_system" : "zip"
        },
        "publish": {
          "region": "ap-northeast-1",
          "bucket": "mybucket-tokyo-ichiri"
        }
      }
    },
    "gdk_version": "1.1.0"
  }

同じデバイス内のPythonプログラム

CODESYSからOPC UAで変数取得。 詳しくは『CODESYS Control(Runtime)の変数をPythonで取得1 OPC UA』を参照

  1. ZeroMQサーバをポート50000で立上、秘密鍵、公開鍵を作成して、
  2. 公開鍵をmmapで/home/petalinux/example.txtに入れる。 GreengrassのコンポーネントのPythonがこの公開鍵を受け取る。
  3. Greengrass内のコンポーネントのPythonからZeroMQ REQを受信したら
  4. OPC UAでCODESYSから変数を読み出す
  5. 変数のデータ構造をDynamoDBにフィットしやすく後にBig Data処理しやすいJSON形式(最初はDICTにいれ最終JSON)に変更する。 時刻もpythonのdatetimeオブジェクトでなく、文字列でISO8601 UTC形式に変更する。
  6. そしてZqroMQのREPで送信
~opc_zmq_rep.py
from opcua import Client,ua
import time
import os,sys,traceback
import zmq
import mmap
from datetime import datetime, timedelta
import json
#import signal
from signal import signal, SIGPIPE, SIG_DFL,SIGINT, SIGTSTP

def signal_handler(sig, frame):
    if sig==SIGINT:
        print('Ctrl+C was pressed. Terminate this program.')
    if sig==SIGTSTP:
        print('Ctrl+Z was pressed. Terminate this program.')
    os._exit(0)

signal(SIGINT, signal_handler)
signal(SIGTSTP, signal_handler)
#signal(SIGPIPE,SIG_DFL)

# Establish connection to the OPC UA server
url = "opc.tcp://127.0.0.1:4840"  # URL of your CODESYS OPC UA server
client = Client(url)
client.connect()
print("Connected to OPC UA server")
# ZeroMQ server start
context = zmq.Context()
socket = context.socket(zmq.REP)
server_public_key, server_secret_key = zmq.curve_keypair()
print(server_secret_key)

socket.curve_secretkey = server_secret_key
socket.curve_publickey = server_public_key
_pub_key_size=len(server_public_key)
print(_pub_key_size)
file_path = '/home/petalinux/example.txt'

if not os.path.exists(file_path):
    with open(file_path, 'wb') as file:
        file.write(40*b"\0")
with open(file_path, 'r+b') as file:
    mm = mmap.mmap(file.fileno(), _pub_key_size, access=mmap.ACCESS_WRITE)
    mm[0:_pub_key_size] = server_public_key
    print("server pub key",server_public_key)
    print("ichiri reads:",mm.readline())
    mm.close()
socket.curve_server = True
try:
    socket.bind("tcp://*:50000")
except zmq.error.ZMQError as e:
    print("ZMQError:", e)
    os._exit(1)

# Read a specific node (replace 'ns=2;s=YourNode' with your actual node)
nodeids = [ua.NodeId.from_string("ns=4;s=|var|CODESYS Control for Linux ARM64 SL.Application.PLC_PRG.VarTimeCur0"),
            ua.NodeId.from_string("ns=4;s=|var|CODESYS Control for Linux ARM64 SL.Application.PLC_PRG.VarTimeCur1"),
            ua.NodeId.from_string("ns=4;s=|var|CODESYS Control for Linux ARM64 SL.Application.GVL.MyVariable"),
            ua.NodeId.from_string("ns=4;s=|var|CODESYS Control for Linux ARM64 SL.Application.GVL.MyString"),
            ua.NodeId.from_string("ns=4;s=|var|CODESYS Control for Linux ARM64 SL.Application.GVL.MyCount")]

send_dict={
        "thingname":None,
        "timestamp":None,
        "variable":{},
        "id":"ichiri's"
        }

while True:
    message = socket.recv()
    start=time.time()
    print("zmq received: %s" % message)

    try:
        try:
            results = client.uaclient.get_attributes(nodeids, ua.AttributeIds.Value)
            send_dict["timestamp"]=str(results[0].SourceTimestamp)
            #send_dict={"timestamp":str(result[0].SourceTimestamp-timedelta(hours=9))}
        except Exception as e:
            print("ERROR right after get_attributes:",e)
            client = Client(url)
            client.connect()
            results = client.uaclient.get_attributes(nodeids, ua.AttributeIds.Value)

        for _id, _result in zip(nodeids, results):
            _id_pos=_id.Identifier.find("Application")+len("Application.")
            _timestamp=str(_result.SourceTimestamp)
            send_dict["variable"][_id.Identifier[_id_pos:]]={
                "val":_result.Value.Value,
                "type":_result.Value._variantType.name,
                }
    except Exception as e:
        exc_type, exc_value, exc_traceback = sys.exc_info()
        traceback.print_tb(exc_traceback)  # スタックトレースを出力
        print("ichiri ERROR HAPPENED:",e)

    print(send_dict)
    socket.send(json.dumps(send_dict).encode())
    end=time.time()
    print((end-start),"sec")

コメント