How to send a websocket action after receiving a DynamoDB Stream event

I am having a hard time with using a dynamodb stream to send data to the client via websockets. I have streams working and i have websockets working separetly, but not together.

This is my yml stack. Please show me how you’d craft a NODEjs runtime function to receive a stream event, then send a websocket connection to the clients.

service: itsm-stream-demo3

provider:
name: aws
runtime: nodejs10.x

environment:
websocketIDTable:
Ref: websocketIDTableNew1
demoTable:
Ref: demoTableNew1

iamRoleStatements:
- Effect: Allow
Action:
- “dynamodb:PutItem”
- “dynamodb:GetItem”
- “dynamodb:DeleteItem”
- “dynamodb:Scan”
Resource:
- Fn::GetAtt: [websocketIDTableNew1, Arn]
- Effect: Allow
Action:
- “execute-api:ManageConnections”
Resource:
- “arn:aws:execute-api:::**/@connections/*”

- Effect: Allow
  Action:
    - dynamodb:Query
    - dynamodb:Scan
    - dynamodb:GetItem
    - dynamodb:PutItem
    - dynamodb:UpdateItem
    - dynamodb:DeleteItem
    - dynamodb:DescribeStream
    - dynamodb:GetRecords
    - dynamodb:GetShardIterator
    - dynamodb:ListStreams

  Resource:
    - Fn::GetAtt: [demoTableNew1, Arn]

functions:
connectionHandler:
handler: handler/handler.connectionHandler
events:
- websocket:
route: $connect
- websocket:
route: $disconnect

defaultHandler:
handler: handler/handler.defaultHandler
events:
- websocket:
route: $default

sendMessageHandler:
handler: handler/handler.sendMessageHandler
events:
- websocket:
route: sendMessage

getStream:
handler: handler/handler.getStream
events:
- stream:
type: dynamodb
batchSize: 1
startingPosition: LATEST
arn:
Fn::GetAtt:
- demoTableNew1
- StreamArn

resources:
Resources:
websocketIDTableNew1:
Type: “AWS::DynamoDB::Table”
Properties:
AttributeDefinitions:
- AttributeName: “connectionId”
AttributeType: “S”
KeySchema:
- AttributeName: “connectionId”
KeyType: “HASH”
BillingMode: PAY_PER_REQUEST
TableName: websocketIDTableNew1

demoTableNew1:
  Type: AWS::DynamoDB::Table
  Properties:
    BillingMode: PAY_PER_REQUEST
    TableName: demoTableNew1
    StreamSpecification:
      StreamViewType: NEW_AND_OLD_IMAGES
    AttributeDefinitions:
      - AttributeName: pk
        AttributeType: S
      - AttributeName: sk
        AttributeType: S
    KeySchema:
      - AttributeName: pk
        KeyType: HASH
      - AttributeName: sk
        KeyType: RANGE
    GlobalSecondaryIndexes:
      - IndexName: reverse
        KeySchema:
          - AttributeName: sk
            KeyType: HASH
          - AttributeName: pk
            KeyType: RANGE
        Projection:
          ProjectionType: ALL

This is my handler.

“use strict”;

const AWS = require(“aws-sdk”);
const dynamo = new AWS.DynamoDB.DocumentClient();

require(“aws-sdk/clients/apigatewaymanagementapi”);

const successfullResponse = {
statusCode: 200,
body: “everything is alright”
};

const connectionHandler = (event, context, callback) => {
console.log(event);

if (event.requestContext.eventType === “CONNECT”) {
// Handle connection
addConnection(event.requestContext.connectionId)
.then(() => {
callback(null, successfullResponse);
})
.catch(err => {
console.log(err);
callback(null, JSON.stringify(err));
});
} else if (event.requestContext.eventType === “DISCONNECT”) {
// Handle disconnection
deleteConnection(event.requestContext.connectionId)
.then(() => {
callback(null, successfullResponse);
})
.catch(err => {
console.log(err);
callback(null, {
statusCode: 500,
body: "Failed to connect: " + JSON.stringify(err)
});
});
}
};

// THIS ONE DOESNT DO ANYHTING
const defaultHandler = (event, context, callback) => {
console.log(“defaultHandler was called”);
console.log(event);

callback(null, {
statusCode: 200,
body: “defaultHandler”
});
};

const sendMessageHandler = (event, context, callback) => {
console.log(“EVENT”, JSON.stringify(event));
sendMessageToAllConnected(event)
.then(() => {
callback(null, successfullResponse);
})
.catch(err => {
callback(null, JSON.stringify(err));
});
};

const sendMessageToAllConnected = event =>
getConnectionIds().then(connectionData =>
connectionData.Items.map(connectionId =>
send(event, connectionId.connectionId)
)
);

const getConnectionIds = () => {
const params = {
TableName: process.env.websocketIDTable,
ProjectionExpression: “connectionId”
};

return dynamo.scan(params).promise();
};

const send = (event, connectionId) => {
const body = JSON.parse(event.body);
const postData = body.data;

const endpoint =
event.requestContext.domainName + “/” + event.requestContext.stage;
const apigwManagementApi = new AWS.ApiGatewayManagementApi({
apiVersion: “2018-11-29”,
endpoint: endpoint
});

const params = {
ConnectionId: connectionId,
Data: postData
};
return apigwManagementApi.postToConnection(params).promise();
};

const addConnection = connectionId => {
const params = {
TableName: process.env.websocketIDTable,
Item: {
connectionId: connectionId
}
};

return dynamo.put(params).promise();
};

const deleteConnection = connectionId => {
const params = {
TableName: process.env.websocketIDTable,
Key: {
connectionId: connectionId
}
};

return dynamo.delete(params).promise();
};

const getStream = (event, context, callback) => {
const params = {
TableName: process.env.demoTable
};

console.log(“EVENT”, JSON.stringify(event));

dynamo.scan(params, (error, result) => {
// handle potential errors
if (error) {
console.error(error);
callback(new Error(“Error scanning db”));
return;
}

console.log("STREAM RESULT", JSON.stringify(result));
console.log("STREAM WHAT CHANGED", JSON.stringify(event.Records));
// create a response

const someEvent = {
  requestContext: {
    routeKey: "sendMessage",
    eventType: "MESSAGE",
    messageDirection: "IN",
    stage: "dev",
    domainName: "r2a5gx3500.execute-api.us-east-1.amazonaws.com",
    body: {
      action: "sendMessage",
      data: JSON.stringify(event.Records)
    }
  }
};
sendMessageHandler(someEvent, context, callback);

});
};

module.exports.defaultHandler = defaultHandler;
module.exports.connectionHandler = connectionHandler;
module.exports.sendMessageHandler = sendMessageHandler;
module.exports.getStream = getStream;