#!/usr/bin/perl # parser pentru mesajele MQTT # Version: 1.0.2 use strict; use warnings; use JSON qw( decode_json ); use DBI; use Proc::Daemon; # pentru debug pun daemon=0 ca sa nu mai ruleze ca daemon my $daemon=1; my $continue; if ($daemon==1){ Proc::Daemon::Init; $continue = 1; $SIG{TERM} = sub { $continue = 0 }; open PID, ">>/var/run/mqttparser.pid" or die "Couldn't open file: $!"; print PID "$$\n"; close PID; } # citesc fisierul de config my $config = '/etc/mqttparser.conf'; open my $CFG, $config or die "Could not open my $config: my $!"; my $db; my $dbuser; my $dbpass; my $dbhost; my $mqttuser; my $mqttpass; my $row=1; while( my $line = <$CFG>){ my @credentials = split /=/, $line; if ($row==1) {$db = $credentials[1]; chomp($db);} if ($row==2) {$dbuser = $credentials[1]; chomp($dbuser);} if ($row==3) {$dbpass = $credentials[1]; chomp($dbpass);} if ($row==4) {$dbhost = $credentials[1]; chomp($dbhost);} if ($row==5) {$mqttuser = $credentials[1]; chomp($mqttuser);} if ($row==6) {$mqttpass = $credentials[1]; chomp($mqttpass);} $row++; } close $CFG; my $dbh = DBI->connect("DBI:mysql:$db:$dbhost", $dbuser, $dbpass); open(my $SUB, "/usr/bin/mosquitto_sub -u $mqttuser -P $mqttpass -v -t \"#\" |"); while (my $mqttMessage = <$SUB> ) { my $start = substr($mqttMessage, 0, 4); # prelucrez doar mesajele de pe canalele stat/# sau tele/# if (($start eq "stat")||($start eq "tele")){ if (index($mqttMessage, "{") == -1) { #print "Mesajul nu contine json, il sparg dupa spatiu\n" (my $channel, my $nonjson) = split /\ /, $mqttMessage, 2; (my $root, my $topic, my $queue) = split /\//, $channel; # aleg canalele de unde salvez date in DB: if ($queue eq "LWT"){ getdevinfoFILE ($topic, "LWT", $nonjson); getdevinfoDB ($topic, "LWT", $nonjson); } else { #print "nu ma intereseaza alte canale fara json"; } } else { #print "Mesajul contine json, il sparg dupa {\n"; my $channel = substr($mqttMessage, 0, index("$mqttMessage", '{')); $channel=~ s/\ //g; # sterg ultimul spatiu (my $root, my $topic, my $queue) = split /\//, $channel; my $json = substr($mqttMessage, index("$mqttMessage", '{'), rindex("$mqttMessage", '}')); # aleg canalele de unde salvez date in DB: if ($queue eq "ENERGY"){ # tele/mng9184-1657/ENERGY {"Time":"2018-03-04T13:18:39","Total":1376.995,"Yesterday":23.704,"Today":16.450,"Period":102,"Power":1255,"Factor":0.99,"Voltage":237,"Current":5.298} my $decoded = decode_json($json); my $yesterday = $decoded->{'Yesterday'}; my $today = $decoded->{'Today'}; getdevinfoFILE ($topic, "Yesterday", $yesterday); getdevinfoDB ($topic, "Yesterday", $yesterday); getdevinfoFILE ($topic, "Today", $today); getdevinfoDB ($topic, "Today", $today); } if ($queue eq "ESP"){ } } } else { #print 'Nu ma intereseaza mesajele astea!\n$mqttMessage'; } } $dbh->disconnect; if ($daemon==1){ while ($continue) { } } sub getdevinfoFILE { (my $topic, my $camp, my $valoare) = @_; my $cale='/admin/scripts/mqtt-perl/mqttparser.devel.txt'; my $tm=localtime; open (my $fh, '>>', $cale) or die "Cannot open"; (my $user, my $devid) = split /\-/, $topic; print $fh "$tm: Update FILE getdevinfo: $user - $devid - $camp - $valoare\n"; close $fh; return; } sub getdevinfoDB { (my $topic, my $camp, my $valoare) = @_; my $query = "insert into mqttparser values (?, ?, ?)"; my $sqlQuery = $dbh->prepare($query) or die "Can't prepare $query: $dbh->errstr\n"; my $tm=localtime; (my $user, my $devid) = split /\-/, $topic; $sqlQuery->execute('0', "$tm", "$user - $devid - $camp - $valoare"); return; }