CODESYS Control(Runtime)の変数をPythonで取得2 External Call(Unix Domain Socket)

External CallはUnixドメインソケット(UDS)を使ってスキャン同期で動作し、値が1つなら一回の読書で50us程度の高速通信です。 External Callは同一ボード内の同一Linux OS内通信です。 タイミングが重要な制御用の情報やデバッグなどでは役に立ちそうです。 しかし、PLCプログラム側にも記述が必要で、データ構造は持たない値しか送受信できずデータ管理が大変なのでBig Dataとしては使いづらい感じです。 CODESYS Developmentに入っていたExampleを参考に作成しました。(ここにありました。 C:\Program Files\CODESYS 3.5.19.50\CODESYS\CODESYS Control SL Extension Package\4.11.0.0\Examples\ExternalCall\UDSExternalCallExample.project)

External Callを使うためには

SL Extension packageが必要です。 しかしこれは、Linux ARM64 SLをインストールしたら自動的にインストールされていました。

PLC側設定

  1. ライブラリ設定
  2. ファンクションブロック作成
  3. プログラム(POU)作成~MainTaskに追加

ライブラリ追加

  • まず、使用するライブラリを追加しなければなりません。
  • 左のナビ画面から『ライブラリマネージャー』をダブルクリックすると『ライブラリマネージャー』画面が開きます。
  • 『ライブラリの追加』をクリックしてライブラリ追加のダイアログボックスを開きます。
  • そこで検索部分に赤枠内の『CmpApp』、『CmpErrors』、『CmpEventMgr』などを追加します。(どこまで必要だったか覚えてないですが、CmpApp、CmpErrorsがないとビルドでエラーになりました。 逆にエラーになったら一つづつライブラリ追加するのも手です。)

ファンクションブロック作成

  • 左のナビ画面の『Application』上で右クリックして『オブジェクトの追加』~『POU...』を選択すると以下のダイアログボックスが表示されます。
  • ファンクションブロック名を入れます。 私は『myExternalFunction』と言う名前にしています。
  • タイプは『ファンクションブロック』を選択します。
  • 実装言語は『構造化テキスト(ST)』を選択して
  • 追加』します。

ファンクションブロックのヘッダー部分

以下を張り付ける。

// Function Block using Unix Domain Sockets.
// Input parameters are added before the effective call and output parameters are retrieved after the call.
// Names and types of the parameters are transferred via Unix Domain Sockets to a corresponding remote server (see uds_external_function.py as example for implementation of the external function).
FUNCTION_BLOCK myExternalFunction EXTENDS EXTAPI.UDSExternalFunction
VAR_INPUT
	diIn1 : DINT;
	diIn2 : DINT;
	diIn3 : DINT;
	diIn4 : WSTRING;	
END_VAR
VAR_OUTPUT
	diOut: DINT;
END_VAR
VAR
	sValue : STRING;
END_VAR

ファンクションブロックのボディー部分

以下を張り付ける。

THIS^.fctname := 'myExternalFunction';
SUPER^();

Result := THIS^.AddParameter('parameterIn1', 'DINT', TO_STRING(diIn1));
IF Result <> Errors.ERR_OK THEN
	RETURN;
END_IF

Result := THIS^.AddParameter('parameterIn2', 'DINT', TO_STRING(diIn2));
IF Result <> Errors.ERR_OK THEN
	RETURN;
END_IF

Result := THIS^.AddParameter('parameterIn3', 'DINT', TO_STRING(diIn3));
IF Result <> Errors.ERR_OK THEN
	RETURN;
END_IF

Result := THIS^.AddParameter('parameterIn4', 'WSTRING', TO_STRING(diIn4));
IF Result <> Errors.ERR_OK THEN
	RETURN;
END_IF

Result := THIS^.Call();
IF Result <> Errors.ERR_OK THEN
	RETURN;
END_IF

Result := THIS^.GetParameter('parameterOut', 'DINT', ADR(sValue));
IF Result <> Errors.ERR_OK THEN
	RETURN;
END_IF

diOut := TO_DINT(sValue);

PLCプログラムの作成

  • 左のナビ画面の『Application』上で右クリックして『オブジェクトの追加』~『POU...』を選択すると以下のダイアログボックスが表示されます。
  • POU名を入れます。 私は『PLC?PRG_1』と言う名前にしています。
  • タイプは『プログラム』を選択します。
  • 実装言語はここも『構造化テキスト(ST)』を選択して
  • 追加』します。

POUのヘッダー部分

以下を張り付ける。

PROGRAM PLC_PRG_1
VAR
	_myExternalFunction : myExternalFunction; // Instance of example implementation of external function call via Unix Domain Sockets
	diParamIn1 : DINT; // First input parameter of the external function
	diParamIn2 : DINT := 22; // Second input parameter of the external function
	diParamIn3 : DINT := 12345;
	diParamIn4 : WSTRING := "hello ichiri";
	diParamOut : DINT; // Output parameter of the external function	
	Result : SysTypes.RTS_IEC_RESULT;
END_VAR

POUのボディー部分

以下を張り付ける。

_myExternalFunction(
	diIn1 := diParamIn1,
	diIn2 := diParamIn2,
	diIn3 := diParamIn3,
	diIn4 := diParamIn4,
	diOut => diParamOut,
	Result => Result);
	
diParamIn1 := diParamIn1 + 1;

PLC_PRG_1をMainTaskに追加する

  • 左のナビ画面のApplication直下のPLC_PRG_1をコピペで、MainTaskに張り付ける。
  • MainTaskのPLC_PRGやApplication直下のMyExecPythonFunc、_PythonExecuteは関係無いので無視してください。
  • 必要なのはmyExternalFunction(FB)とPLC_PRG_1の2ファイルだけで、PLC_PRG_1だけMainTaskに追加します。

ここまで出来たら、CODESYS Control(Runtime)を起動して、CODESYS Developmentでログインして、上記プログラムをビルド~転送~運転にします。(エラーが出たら、記述間違いか、ライブラリーが足りないのだと思います。)

Python側設定

作業directory作成

/var/run/codesysextension/extfuncsを作成し、所有者を変更して、実行可能形式に変更しておきます。 Pythonプログラム内でも作成して使用者変更しようとしていますが、何故か『Permission denied』になりました。

$ sudo mkdir -p/var/run/codesysextension/extfuncs
$ sudo chown petalinux:petalinux /var/run/codesysextension/extfuncs
$ sudo chmod +x /var/run/codesysextension/extfuncs

Pythonプログラム

  • 足りないライブラリがあれば、pip3 installでインストールしておく。
  • ファイル名は私の場合は『uds_external_function.py』としています。
uds_external_function.py
import socket
import sys
import struct
import threading
import fcntl, os
import errno
import time

# This class helps to connect to a CODESYS Control runtime
# and provide an implementation of an external function
class ExternalFunctionBase(threading.Thread):

    def __init__(self):
        self.sock = None
        threading.Thread.__init__(self)
        self.functionName = self.__class__.__name__
        self.dir = '/var/run/codesysextension/extfuncs'
        self.endpoint = os.path.join(self.dir, self.functionName + '.sock')
        try:
            getattr(self, 'Call')
        except:
            print('Your class does not have a method with name "Call"')
            return None

    def run(self):

        try:
            if not os.path.exists(self.dir):
                os.makedirs(self.dir)
        except socket.error as msg:
            if msg.errno == 13:
                print("Could not create " + self.dir + ": permission denied.")
                print("Either run this script with sufficient permissions or create the directory with sufficient permissions (e.g. sudo mkdir " + self.dir + "; sudo chown $(whoami) " + self.dir + ")")
            else:
                print(msg)
            return False

        try:
            os.unlink(self.endpoint)
        except socket.error as msg:
            if msg.errno != 13 and msg.errno != 2:
                print(msg)
                return False

        try:
            # create unix domain socket
            self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            print("Starting server for external function " + self.functionName)
            # bind the socket to CODESYS runtime
            self.sock.bind(self.endpoint)
            self.sock.listen(1)
        except socket.error as msg:
            if msg.errno == 13:
                print("Could not bind socket: permission denied on " + self.endpoint)
                print("Either run this script with sufficient permissions or change permissions on " + self.dir + " (e.g. sudo chown $(whoami) " + self.dir + ")")
            else:
                print(msg)
            return False

        while True:
            try:
                connection, client_address = self.sock.accept()
            except socket.error as msg:
                if self.sock != None:
                    self.sock.close()
                    return False

            self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
            while True:
                try:
                    request = connection.recv(1000)
                    if request:
                        dictParams = {}
                        format = "IiI"
                        msgId, msgType, rcvDataSize = struct.unpack_from(format, request)
                        #print('Request received: MsgId: %u, MsgType: %d, dataSize: %u' % (msgId, msgType, rcvDataSize))
                        if rcvDataSize > 0:
                            format = "%ip" % rcvDataSize
                            data = request[12:] #12 bytes header
                            if not data:
                                print("Unexpected data size")
                                break
                            liParamsRaw = [x.decode("UTF-8") for x in data.split(b'\x00')]
                            for param in liParamsRaw:
                                if ':=' in param:
                                    paramName, paramLiteral = param.split(':=')
                                    paramType, paramValue = paramLiteral.split('#')
                                    dictParams[paramName] = paramType, paramValue

                        #print('Call function "' + self.functionName + '" with parameters ' + str(dictParams))
                        func = getattr(self, 'Call')
                        # Call the external function
                        dictRetParams = func(dictParams)
                        if dictRetParams:
                            params = b''
                            #print('Return parameters "' + str(dictRetParams))
                            for paramName, (paramType, paramValue) in dictRetParams.items():
                                params += (str(paramName) + ':=' + str(paramType) + '#' + str(paramValue)).encode() + b'\00'
                            format = "IiII"
                            response = struct.pack(format, msgId, msgType, len(params), 0) + params
                        else:
                            # Return error code (here: 1=ERR_FAILED)
                            format = "IiII"
                            response = struct.pack(format, msgId, msgType, 0, 1)
                        connection.sendall(response)
                        continue
                    else:
                        break
                except socket.error as msg:
                    if msg.errno == errno.EAGAIN or msg.errno == errno.EWOULDBLOCK or msg.errno == None:
                        continue
                    else:
                        print(msg)
                        break
# end of class

# The name of the class defines the name of the external function
class myExternalFunction(ExternalFunctionBase):

    # This is the effective call of the external function.
    # Input parameters are passed as dict of tuple of strings (paramType, paramValue) with parameter name as index.
    # E.g. {'parameterIn1': ('DINT', '11'), 'parameterIn2': ('DINT', '22')}
    # Return parameters must be in the same format.
    def __init__(self):
        self.i = 0
        ExternalFunctionBase.__init__(self)
        self.TimeStart=0
        self.TimeEnd=0

    def Call(self, dictParams):
        in1 = int(dictParams['parameterIn1'][1])
        in2 = int(dictParams['parameterIn2'][1])
        dictRetParams = {}
        dictRetParams['parameterOut'] = 'DINT', in1 + in2
        self.i= self.i + 1
        self.TimeEnd=time.perf_counter_ns()

        print(self.i,"*****",dictParams,dictRetParams,"TimeSpent:", self.TimeEnd-self.TimeStart)
        self.TimeStart = self.TimeEnd

        return dictRetParams

# example for external function:
if __name__ == "__main__":

    extFunc = myExternalFunction()
    extFunc.start()

Pythonプログラム実行

sudoを付けないとエラーとなったので付けています。

uds_external_function.py
$ sudo python uds_external_function.py
  • 以下はプログラムを実行した結果です。
  • ParameterIn1やParameterIn2等、PLCプログラムとPythonプログラムで指定した値が入ってきます。
  • DINT、WSTRINGもPLCプログラムで定義てPythonプログラムで
  • PLCプログラム実行スキャンタイム20msに設定しているのでTimeSpentはほぼ20msとなっています。

スキャンタイムの変更

  • CODESYS Developmentで『オフライン』にして
  • 左のナビ画面の『MainTask』をダブルクリックして
  • 以下の周期を1にすると1msとなります。
  • 1msとすると、上記のPythonプログラムで左の番号が1秒ごとに1000増えていきます。 また一番右の数字はns単位ですが1msに近くなります。(上記図はほぼ20msになっています)

参考 サンプルプログラム実行時の注意点

ODESYS Developmentに入っていたExampleを(C:\Program Files\CODESYS 3.5.19.50\CODESYS\CODESYS Control SL Extension Package\4.11.0.0\Examples\ExternalCall\UDSExternalCallExample.project)を開いて実行しようとすると、ビルドでエラーとなった。 そこで、以下2点をする無事ビルドできました。

  1. 『ライブラリマネージャー』から『不足ライブラリ』をクリックすると多くのライブラリがダウンロードされエラーがなくなった。 ダウンロードするたびに、ビルドを押して更新して、エラーが出たら再度『不足ライブラリ』でライブラリを追加するとエラーなくビルドできた。
  2. それでも足りなかったようで、不足ライブラリでも解決できなかったのは、ライブラリーマネージャーで『ライブラリ追加』でCmpErrorsを追加しました。

コメント