5. Comunicação interna

Esta página descreve como cada serviço na dojot se comunica.

5.1. Componentes

Os principais componentes atuais na dojot são mostrados em Fig. 5.1.

[Auth]
[DeviceManager]
[Persister]
[History]
[DataBroker]
[FlowBroker]
[x509-identity-mgmt]

package "Databases" {
  [mongodb]
  [postgreSQL]
}
package "IoT agents" {
  [IoT MQTT]
  [IoT LoRa]
  [IoT sigfox]
  [IoT RabbitMQ]
}

[postgreSQL] <-- [Auth]
[postgreSQL] <-- [DeviceManager]
[postgreSQL] <- [Kong]
[postgreSQL] <-- [x509-identity-mgmt]
[mongodb] <- [Persister]
[mongodb] <-- [FlowBroker]
[mongodb] <-- [History]
[mongodb] <-- [x509-identity-mgmt]

Fig. 5.1 Componentes da Dojot

Eles são:

  • Auth: mecanismo de autenticação

  • DeviceManager: armazenamento de dispositivo e modelo.

  • Persister: componente que armazena todos os dados gerados por dispositivos.

  • History: componente que expõe todos os dados gerados por dispositivos.

  • DataBroker: lida com subjects e tópicos do Kafka, além de conexões socket.io.

  • Flowbroker: lida com fluxos (CRUD e execução de fluxo)

  • IoT agents: agentes para diferentes protocolos.

Cada serviço será descrito brevemente nesta página. Mais informações podem ser encontradas na documentação de cada componente.

5.2. Mensagens e autenticação

Existem dois meios pelos quais os componentes dojot podem se comunicar: via solicitações HTTP REST e via Kafka. Eles são destinados a diferentes propósitos.

As solicitações HTTP podem ser enviadas no momento da inicialização quando um componente deseja, por exemplo, informações sobre recursos específicos, como lista de dispositivos ou tenants. Para isso, eles devem saber qual componente possui qual recurso para recuperá-los corretamente. Isso significa - e isso é muito importante porque conduz as escolhas arquiteturais na dojot - que apenas um único serviço é responsável por recuperar modelos de dados para um recurso específico (observe que um serviço pode ter várias instâncias). Por exemplo, o DeviceManager é responsável por armazenar e recuperar o modelo de informações para dispositivos e modelos, FlowBroker para descrições de fluxo, History para dados históricos e assim por diante.

O Kafka, por outro lado, permite uma comunicação pouco acoplada entre instâncias de serviços. Isso significa que um produtor (quem envia uma mensagem) não sabe quais componentes receberão sua mensagem. Além disso, qualquer consumidor não sabe quem gerou a mensagem de que está sendo consumida. Isso permite que os dados sejam transmitidos com base em “interesses”: um consumidor está interessado em receber mensagens com um determinado “assunto” (subject) (mais sobre isso mais tarde) e os produtores enviarão mensagens para todos os componentes que estiverem interessados nele. Observe que esse mecanismo permite que vários serviços emitam mensagens com o mesmo “assunto” (subject), bem como vários serviços que consomem mensagens com o mesmo “assunto” (subject), sem soluções alternativas complicadas.

5.2.1. Enviando solicitações HTTP

Para enviar solicitações via HTTP, um serviço deve criar um token de acesso, descrito aqui. Não há outras considerações além de seguir a descrição da API associada a cada serviço. Isso pode ser visto na figura Fig. 5.2. Observe que todas as interações descritas aqui são abstrações das reais. Além disso, deve-se notar que essas interações são válidas apenas para componentes internos. Qualquer serviço externo deve usar o Kong como ponto de entrada.

actor Client
boundary Kong
control Auth

Client -> Kong: POST /auth \nBody={"admin", "p4ssw0rD"}
activate Kong
Kong -> Auth: POST /user \nBody={"admin", "p4ssw0rD"}
Auth --> Kong: JWT="873927dab"
Kong --> Client: JWT="873927dab"
deactivate Kong

Fig. 5.2 Autenticação inicial

Nesta figura, um cliente recupera um token de acesso para o usuário admin cuja senha é p4ssw0rd. Depois disso, um usuário pode enviar uma solicitação para as APIs HTTP usando-o. Isso é mostrado na Fig. Fig. 5.3. Nota: o mecanismo de autorização real é detalhado no documento Auth + API gateway (Kong).

actor Client
boundary Kong
control Auth
control DeviceManager
database PostgreSQL

Client -> Kong: POST /device \nHeaders="Authorization: Bearer JWT"\nBody={ device }
activate Kong
Kong -> Auth: POST /pep \nBody={"admin", "/device"}
Auth --> Kong: OK 200
Kong -> DeviceManager: POST /device \nHeaders="Authorization: JWT" \nBody={ "device" : "XYZ" }
activate DeviceManager
DeviceManager -> PostgreSQL: INSERT INTO ....
PostgreSQL --> DeviceManager: OK
DeviceManager --> Kong: OK 200
deactivate DeviceManager
Kong --> Client: OK 200
deactivate Kong

Fig. 5.3 Enviando mensagens para a API HTTP

Nesta figura, um cliente cria um novo dispositivo usando o token recuperado em Fig. 5.2. Essa solicitação é analisada pelo Kong, que chamará o Auth para verificar se o usuário definido no token tem permissão para POST para o endpoint /device. Somente após a aprovação dessa solicitação, o Kong a encaminhará para o DeviceManager.

5.2.2. Enviando mensagens via Kafka

Kafka usa uma abordagem bem diferente. Cada mensagem deve ser associada a um subject e um tenant. Isso é mostrado na Fig. 5.4;

control DeviceManager
control DataBroker
control Kafka

DeviceManager -> DataBroker: GET /topic/dojot.device-manager.devices \nHeaders="Authorization: Bearer JWT"
note left
  JWT contains the
  service associated
  to the subject
  (admin, for instance).
end note
activate DataBroker
DataBroker -> Kafka: CREATE TOPIC \nadmin.dojot.device-manager.devices\n{ "topic-profile": { ... } }
note left
  There's no need
  to recreate this
  topic if it is
  already created.
end note
Kafka -> DataBroker: OK
DataBroker --> DeviceManager: { "topic" : "admin.dojot.device-manager.devices" }
deactivate DataBroker
DeviceManager -> Kafka: SEND MESSAGE\n topic:admin.dojot.device-manager.devices\ndata: {"device": "XYZ", "event": "CREATE", ...}
Kafka --> DeviceManager: OK

Fig. 5.4 Recuperando tópicos do Kafka

Neste exemplo, o DeviceManager precisa publicar uma mensagem sobre um novo dispositivo. Para isso, ele envia uma solicitação ao DataBroker, indicando qual tenant (dentro do token JWT) e qual subject (dojot.device-manager.devices) deseja usar para enviar a mensagem.

Para entender melhor como tudo funciona, você pode verificar a documentação do Data Broker para o componente e API, os links estão em Componentes e APIs.

5.2.3. Inicialização dos tenants

Todos os componentes estão interessados em um conjunto de subjects, que serão usados para enviar ou receber mensagens do Kafka. Como a dojot agrupa tópicos do Kafka e tenants em subjects (um subjects será composto por um ou mais tópicos Kafka, cada um transmitindo mensagens para um tenant específico), o componente deve iniciar cada tenant antes de enviar ou receber mensagens. Isso é feito em duas fases: tempo de inicialização do componente e tempo de execução do componente.

Na primeira fase, um componente solicita ao Auth para recuperar todos os tenants configurados no momento. Está interessado, digamos, em consumir mensagens dos subject device-data e dojot.device-manager.devices. Portanto, ele solicitará ao DataBroker um tópico para cada tenant para cada subject. Com essa lista de tópicos, ele pode criar Produtores e Consumidores para enviar e receber mensagens através desses tópicos. Isso é mostrado em Fig. 5.5.

control Component
control Auth
control DataBroker
control Kafka

Component-> Auth: GET /tenants
Auth --> Component: {"tenants" : ["admin", "tenant1"]}
loop each $tenant in tenants
  Component -> DataBroker: GET /topic/device-data \nHeaders="Authorization: JWT[tenant]"
  DataBroker --> Component: {"topic" : "**$tenant**.device-data"}
  Component -> Kafka: SUBSCRIBE\ntopic:**$tenant**.device-data
  Kafka --> Component: OK
  Component -> DataBroker: GET /topic/dojot.device-manager.devices \nHeaders="Authorization: JWT[tenant]"
  DataBroker --> Component: {"topic" : "**$tenant**.device-data"}
  Component -> Kafka: SUBSCRIBE\ntopic: **$tenant**.device-data
  Kafka --> Component: OK
end

Fig. 5.5 Inicialização do tenants no início

A segunda fase inicia após a inicialização e seu objetivo é processar todas as mensagens recebidas pelo Kafka se subscrevendo no tópico dojot-management.dojot.tenancy. Isso incluirá qualquer tenant criado após todos os serviços estarem em funcionamento. Fig. 5.6 mostra como lidar com essas mensagens.

control Kafka
control Component
control DataBroker

Kafka -> Component: MESSAGE\ntopic:dojot-management.dojot.tenancy\nmessage: {"type": "CREATE", "tenant": "new-tenant"}
Component -> DataBroker: GET /topic/device-data\nHeaders: "Authorization: Bearer JWT"
note left
  JWT contains
  new-tenant
end note
DataBroker --> Component: OK {"topic" : "new-tenant.device-data"}
Component -> Kafka: SUBSCRIBE\ntopic: new-tenant.device-data
Kafka --> Component: OK
Component -> DataBroker: GET /topic/dojot.device-manager.devices\nHeaders: "Authorization: Bearer JWT"
note left
  JWT contains
  new tenant
end note
DataBroker --> Component: OK {"topic" : "new-tenant.dojot.device-manager.devices"}
Component -> Kafka: SUBSCRIBE\ntopic: new-tenant.dojot.device-manager.devices
Kafka --> Component: OK

Fig. 5.6 Inicialização do tenant

Todos os serviços que estão de alguma forma interessados em usar subjects devem executar este procedimento para receber corretamente todas as mensagens.

5.3. Auth + API gateway (Kong)

Auth é um serviço profundamente conectado ao Kong. É responsável pelo gerenciamento, autenticação e autorização do usuário. Como tal, é invocado pelo Kong sempre que uma solicitação é recebida por um de seus*endpoints* registrados. Esta seção detalha como isso é realizado e como eles funcionam juntos.

5.3.1. Configuração do Kong

Existem dois procedimentos de configuração ao iniciar o Kong na dojot:

  1. Migrando Dados Existentes

  2. Registrando endpoints e plugins de API.

A primeira tarefa é realizada simplesmente invocando o Kong com uma flag especial.

O segundo é executando um script de configuração. Seu único objetivo é registrar endpoints no Kong, tal como:

#create a service
curl  -sS -X PUT \
--url ${kong}/services/data-broker \
--data "name=data-broker" \
--data "url=http://data-broker:80"

#create a route to service
curl  -sS -X PUT \
--url ${kong}/services/data-broker/routes/data-broker_route \
--data 'paths=["/device/(.*)/latest", "/subscription"]' \
--data "strip_path=false"

Este comando registrará o endpoint /dispositivo/*/latest e /subscription e todas as solicitações serão encaminhadas para http//data-broker:80. Você pode verificar a documentação sobre como adicionar endpoints na documentação do Kong em Componentes e APIs.

Para alguns dos endpoints registrados, o script adicionará dois plugins aos endpoints selecionados:

  1. Geração JWT. A documentação para este plugin está disponível na Kong JWT plugin page.

  2. Configura um plugin que encaminhará todas as solicitações para o Auth para autenticar solicitações. Este plugin está disponível dentro do Kong repository.

A solicitação a seguir instala esses dois plugins na API do data-broker:

#pepkong - auth
curl  -sS  -X POST \
--url ${kong}/services/data-broker/plugins/ \
--data "name=pepkong" \
--data "config.pdpUrl=http://auth:5000/pdp"

#JWT generation
curl  -sS  -X POST \
--url ${kong}/services/data-broker/plugins/ \
--data "name=jwt"

5.3.1.1. Mensagens emitidas

O Auth emitirá apenas uma mensagem via Kafka para a criação do tenant:

{
  "type" : "CREATE",
  "tenant" : "XYZ"
}

E uma para exclusão do tenant:

{
  "type" : "DELETE",
  "tenant" : "XYZ"
}

Por padrão, essas mensagens são criadas no tópico dojot-management.dojot.tenancy do Kafka.

Este prefixo do tópico pode ser configurado, verifique a documentação do componente`Auth` em Componentes e APIs.

5.4. Device Manager

O DeviceManager armazena e recupera modelos de informações para dispositivos e modelos e algumas informações estáticas sobre eles também. Sempre que um dispositivo é criado, removido ou apenas editado, ele publica uma mensagem no Kafka. Depende apenas do DataBroker e Kafka pelos motivos já explicados neste documento.

A documentação do DeviceManager no GitHub ReadMe explica com mais detalhes todas as mensagens publicadas. Você pode encontrar o link em: Componentes e APIs.

5.5. Agente IoT

Os agentes de IoT recebem mensagens de dispositivos e os convertem em uma mensagem padrão a ser publicada no Kafka. Para fazer isso, eles podem querer saber quais dispositivos são criados para filtrar corretamente as mensagens que não são permitidas na dojot (usando, por exemplo, informações de segurança para bloquear mensagens de dispositivos não autorizados). Ele usará o subject device-data e a inicialização de tenants, conforme descrito em Inicialização dos tenants.

Após solicitar os tópicos para todos os tenants no subject device-data, o agente IoT começará a receber dados dos dispositivos. Como há várias maneiras pelas quais os dispositivos podem fazer isso, esta etapa não será detalhada nesta seção (isso depende muito de como cada agente de IoT funciona). No entanto, ele deve enviar uma mensagem para Kafka para informar outros componentes de todos os novos dados que o dispositivo acabou de enviar. Isso é mostrado na Fig. 5.7, neste caso, estamos usando o tenant admin.

control Kafka

IoTAgent -> Kafka: SEND MESSAGE\n topic: admin.device-data...\ndata: IoTAgentMessage
Kafka -> IoTAgent: OK

Fig. 5.7 Mensagem do agente de IoT para Kafka

Os dados enviados pelo agente de IoT têm a estrutura mostrada na Fig. 5.8.

class Metadata {
  + deviceid: string
  + tenant: string
  + timestamp: long int
 }

 class IoTAgentMessage {
   + metadata: Metadata
   + attrs: Dict<string, any>
 }

 IoTAgentMessage::metadata -> Metadata

Fig. 5.8 Estrutura de mensagens do agente IoT

Essa mensagem seria:

{
    "metadata": {
        "deviceid": "c6ea4b",
        "tenant": "admin",
        "timestamp": 1528226137452
    },
    "attrs": {
        "humidity": 60,
        "temperature" : 23
    }
}

5.6. Persister

Persister é um serviço muito simples, cujo único objetivo é receber mensagens dos dispositivos (usando o subject device-data) e armazená-las no MongoDB. Para isso, é realizado o procedimento de inicialização (detalhado em Bootstrapping tenants) e, sempre que uma nova mensagem é recebida, ele cria um novo documento Mongo e o armazena na coleção do dispositivo.

control Kafka
control Persister
database MongoDB

Kafka -> Persister: MESSAGE\ntopic: admin.device-data \nmessage: IoTAgentMessage
Persister -> MongoDB: NEW DOC { IoTAgentMessage }
MongoDB --> Persister: OK
Persister --> Kafka: COMMIT

Fig. 5.9 Persister

Este serviço é simples, pois é por design.

5.7. History

O History também é um serviço muito simples: sempre que um usuário ou aplicativo envia uma solicitação, ele consulta o MongoDB e cria uma mensagem adequada para enviar de volta ao usuário/aplicativo. Isso é mostrado na Fig. 5.10.

actor User
boundary Kong
control History
database MongoDB

User -> Kong: GET /device/history/efac?attr=temperature\nHeaders="Authorization: JWT"
activate Kong
Kong -> Kong: authorize
Kong -> History: GET /history/efac?attr=temperature\nHeaders="Authorization: JWT"
activate History
History -> MongoDB: db.efac.find({attr=temperature})
MongoDB --> History: doc1, doc2
History -> History: processDocs([doc1, doc2])
History --> Kong: OK\n{"efac":[\n\t{"temperature" : 10},\n\t{"temperature": 20}\n]}
deactivate History
Kong -> User: OK\n{"efac":[\n\t{"temperature" : 10},\n\t{"temperature": 20}\n]}
deactivate Kong

Fig. 5.10 History

5.8. Data Broker

O DataBroker possui algumas funcionalidades a mais do que apenas gerar tópicos para pares {tenant, subject}. Ele também servirá conexões socket.io para emitir mensagens em tempo real. Para fazer isso, ele recupera todos os tópicos para o subject device-data, assim como em qualquer outro componente interessado nos dados recebidos dos dispositivos. Assim que receber uma mensagem, ela será encaminhada para uma ‘sala’ (usando o vocabulário do socket.io) associada ao dispositivo e ao tenant associado. Portanto, todo cliente conectado a ele (como interfaces gráficas de usuário) receberão uma nova mensagem contendo todos os dados recebidos. Para obter mais informações sobre como abrir uma conexão socket.io com o DataBroker, consulte a documentação da API do DataBroker em Componentes e APIs.

Nota

As conexões socket.io em tempo real via Data Broker serão descontinuadas em versões futuras. Use o Kafka WS ao invés dele.

5.9. Autoridade Certificadora

A plataforma dojot possui internamente uma autoridade certificadora (CA) capaz de emitir certificados x.509 para que os dispositivos possam se comunicar com a plataforma através de um canal seguro (usando o protocolo TLS). Ao requisitar um certificado para a plataforma, é necessário informar um CSR, o qual passará por uma série de validações até chegar na Autoridade Certificadora interna, que por sua vez, se todas as verificações passarem com sucesso, assinará um certificado e vinculará este certificado ao registro do dispositivo. O componente x509-identity-mgmt é responsável por oferecer os serviços relacionados a certificados para dispositivos.

5.10. Kafka WS

O serviço Kafka WS permite que os usuários recuperem dados condicionais e/ou parciais em tempo real de um determinado tópico da dojot em um Cluster Kafka. Ele funciona com conexões puras de websocket, para que se possam criar clientes websocket em qualquer linguagem desejada, desde que eles suportem RFC 6455.

5.10.1. Conectando com o serviço

A conexão é realizada em dois passos: primeiro é obtido um ticket de uso único via requisição HTTP, e depois o cliente se conecta ao serviço via websocket passando-o como parâmetro.

5.10.1.1. Primeiro passo: obter um ticket de uso único

Um ticket permite o usúario se subscrever em um tópico da dojot. Para obter o ticket é necessário ter um token JWT gerado pelo serviço de Autenticação/Autorização da plataforma. A requisição HTTP para obter um ticket deve ser realizada usando o verbo GET para o endpoint <base-url>/kafka-ws/v1/ticket. A requisição deve ter o cabeçalho Authorization com o token JWT obtido anteriormente como valor. Exemplo:

GET <base-url>/kafka-ws/v1/ticket
Authorization: Bearer [Encoded JWT]

O componente responde com a seguinte sintaxe:

HTTP/1.1 200 OK
Content-type: application/json
{
  "ticket": "[an opaque ticket of 64 hexadecimal characters]"
}

Nota: No contexto de um deployment da dojot, o token é providenciado pelo serviço de Autenticação e é validado pelo Gateway API antes de redirecionar a conexão para o Kafka WS. Portanto, nenhuma validação é feita pelo Kafka WS.

5.10.1.2. Segundo passo: Estabelecer a conexão websocket

A conexão é feita via websocket pura, usando a URI <base-url>/kafka-ws/v1/topics/:topic. Você deve passar o ticket gerado anteriormente como parâmetro para esta URI. Também é possivel passar opções e filtros condicionais como parâmetros para esta URI.

5.10.2. Comportamento ao solicitar um ticket e uma conexão websocket

Abaixo podemos entender o comportamento do serviço Kafka WS quando um usuário (por meio de um user agent) solicita um ticket para estabelecer uma comunicação via websocket com Kafka WS.

Observe que quando o usuário solicita um novo ticket, o Kafka WS extrai algumas informações do token de acesso do usuário (JWT) e gera um payload assinado, para ser usado posteriormente na decisão de autorizar (ou não) a conexão via websocket. A partir do payload, é gerado um ticket e os dois são armazenados no Redis, onde o ticket é a chave para obter o payload. Um TTL é definido pelo Kafka WS, então o usuário deve usar o ticket dentro do tempo estabelecido, caso contrário, o Redis apaga automaticamente o ticket e o payload.

Após obter o ticket, o usuário faz uma solicitação HTTP ao Kafka WS requisitando um upgrade de protocolo para se comunicar via websocket. Como a especificação dessa solicitação HTTP limita o uso de cabeçalhos adicionais, é necessário enviar o ticket pela URL, para que possa ser validado pelo Kafka WS antes de autorizar o upgrade.

Dado que o ticket esteja válido, ou seja, corresponde a uma entrada no Redis, o Kafka WS recupera o payload relacionado ao ticket, verifica sua integridade e exclui essa entrada no Redis para que o ticket não possa ser usado novamente.

Com o payload é possível tomar a decisão de autorizar ou não o upgrade para websocket. Se a autorização for concedida, o Kafka WS abre um canal de subscrição com base em um tópico específico no Kafka. A partir daí, o upgrade para websocket é estabelecido e o usuário começa a receber os dados à medida que vão sendo publicados no Kafka.

actor User
boundary Kong
control "Kafka-WS"
database Redis
control Kafka

group Get Ticket
    User -> Kong: GET /kafka-ws/v1/ticket\nHeaders="Authorization: JWT"
    Kong -> Kong: Checks JWT
    Kong -> "Kafka-WS" : Request a ticket
    "Kafka-WS" -> "Kafka-WS" : Sign the payload and\ngenerate a ticket for it
    "Kafka-WS" -> Redis : Register the ticket and\npayload with a TTL
    "Kafka-WS"<-- Redis : Sucess
    User <-- "Kafka-WS" : Returns the newly generated ticket
end

group Connect via websocket
    User -> Kong: Upgrade HTTP to websocket\n(ticket in the URL)
    Kong -> "Kafka-WS" : Forward the ticket
    "Kafka-WS" -> Redis : Recovers payload (if any)
    "Kafka-WS"<-- Redis : Payload found
    "Kafka-WS" -> "Kafka-WS" : Checks the payload
    "Kafka-WS" -> Kafka : Subscrive to kafka topic\n(Using the payload)
    "Kafka-WS" <-- Kafka : Sucess
    User <-- "Kafka-WS" : Upgrade to websocket accepted\nConnected!
    "Kafka-WS" <-- Kafka : New data in the topic
    User <-- "Kafka-WS" : Returns data
    "Kafka-WS" <-- Kafka : [...]
    User <-- "Kafka-WS" : [...]
    "Kafka-WS" <-- Kafka : [...]
    User <-- "Kafka-WS" : [...]
end

Fig. 5.11 Obtenção de ticket e conexão via websocket