Setting up the Log Forwarder

The instructions mentioned in the section are applicable only for the Serverless approach to install the Big Data Protector.

In the native EMR setup, Protegrity processes could be managed directly within the cluster nodes. However, in the containerized EMR Serverless environment, this level of control is limited. As a result, logs must be redirected to either Amazon S3 or CloudWatch. Using a CloudWatch Logs subscription filter, relevant log entries are streamed into Amazon Kinesis Data Streams. A Lambda function then processes these Kinesis batches, extracts the Protegrity audit JSON lines, constructs an OpenSearch Bulk (_bulk) payload, and sends it to the ESA endpoint.

Note: CloudWatch log lines are not always “instant”. Some delay is observed. This is an expected behavior.

Important: The logging functionality will only work when the jobs are submitted using the AWS CLI with aws emr-serverless start-job-run command. A sample command is listed below.

aws emr-serverless start-job-run \
  --region <region_name> \
  --application-id <application_id> \
  --execution-role-arn arn:aws:iam::<Account_ID>:role/EMR-Servlerless-Execution-Role \
  --job-driver '{
    "sparkSubmit": {
      "entryPoint": "s3://<script_path>/<script_name>.py"
    }
  }' \
  --configuration-overrides '{
    "monitoringConfiguration": {
      "cloudWatchLoggingConfiguration": {
        "enabled": true,
        "logGroupName": "<log_group_name>",
        "logStreamNamePrefix": "emrs",
        "logTypes": {
          "SPARK_DRIVER": ["STDOUT","STDERR"],
          "SPARK_EXECUTOR": ["STDOUT","STDERR"]
        }
      }
    }
  }'

Note: Only driver logs will be generated when a job is executed from the AWS Web UI. Therefore, execute the jobs only through the AWS CLI to generate both the driver and the executor logs in the CloudWatch Log group.

Prerequisites

The Lambda function is able to reach ESA

The ESA is configured in a private network. Therefore, the Lambda function must run in a VPC/subnet that have network route to that IP (VPN/TGW/peering/inside same network). Ensure the following:

  • The Lambda function is attached to the VPC subnet that can route to the ESA IP address.
  • The Security Group egress allows TCP 9200 to the ESA IP address.
  • NACLs allow it.
  • The TLS CA cert is available to the Lambda function.

The Lambda function is able to access the Kinesis Stream

The Lambda function reading from Kinesis must be able to reach the Kinesis API endpoints. If NAT is available, skip the endpoints.

The Kinesis Stream is able to retrieve the Logs from the CloudWatch Log group

The Kinesis Stream must be able to retrieve the Logs from the CloudWatch Log group.

EMR Serverless is able to send the logs to the CloudWatch Log group

The EMR Serverless cluster must be able to send the logs to the CloudWatch Log group.

Creating the Kinesis Data Stream

  1. Log in to the AWS console.

  2. Navigate to the Amazon Kinesis page.

  3. Click Data streams.

  4. Click Create Data stream.

  5. In the Data stream name box, enter a name to identify the stream.

  6. Under Capacity mode, select the required mode.

    Note: In case of Provisioned mode, start with 1 shard. This can be increased later.

  7. Click Create data stream.

  8. After the data stream is created, open the data stream.

  9. Note the ARN.

    Note: The default retention period is 24 hours. To increase the retention period, set the required duration in the Retention period box under the Configuration tab.

Creating the IAM Role

CloudWatch requires permissions to write the logs into the Kinesis stream. Create an IAM role that grants the required permissions to CloudWatch for writing the logs into the Kinesis stream.

  1. To create the role, log in to the AWS console.
  2. Navigate to IAM > Roles > Create role.
  3. Set the Trusted entity as AWS service.
  4. Set the Use case as CloudWatch Events.
  5. Set a Name for the role.
  6. Include permissions for the policy. A sample is listed below.
    {
    "Version": "2012-10-17",
    "Statement": [
     {
       "Sid": "AllowPutToKinesis",
       "Effect": "Allow",
       "Action": [
         "kinesis:PutRecord",
         "kinesis:PutRecords"
       ],
       "Resource": "arn:aws:kinesis:<region_name>:<Account_ID>:stream/emr-protegrity-audit-stream"
     }
       ]
    }
    
  7. Ensure the trust policy allows logs service.
    {
    "Version": "2012-10-17",
    "Statement": [
     {
       "Effect": "Allow",
       "Principal": { "Service": "logs.<region_name>.amazonaws.com" },
       "Action": "sts:AssumeRole"
     }
    ]
    }
    

Creating the CloudWatch Log group

  1. Log in to the AWS console.
  2. Navigate to the CloudWatch page.
  3. Navigate to Logs > Log management.
  4. Click Create log group.
  5. In the Log group name box, enter a name to identify the group in the following syntax:
    /aws/<log_group_name>
    
  6. From the Retention setting list, select the required option.
  7. From the Log class list, select the required option.
  8. Click Create.

Note: Ensure to assign the required IAM permissions to the Log group. The EMR Serverless application execution role must have permissions to access the above-created CloudWatch Log group.

Creating the CloudWatch Logs Subscription Filter

  1. Log in to the AWS console.
  2. Navigate to the CloudWatch page.
  3. Navigate to Logs > Log management.
  4. Select the CloudWatch log group name that is created.
  5. Select Actions > Create subscription filter.
  6. Select the required Destination account.
  7. Under Kinesis data stream, select the stream name that is created.
  8. Under IAM role, select the role that was created for the CloudWatch Log group.
  9. If the Protegrity JSON lines contain “logtype”, specify the filter pattern as logtype.

    Note: If the JSON is embedded in other text, filter on a unique token, such as, correlationid or protection.

  10. Click Start streaming.

Note: CloudWatch Logs allows only a limited number of subscription filters per log group. The common limit is 2 subscription filters per log group.

Creating the Lambda Function

The Lambda function is responsible to send the logs from the Kinesis stream to the ESA.

  1. Log in to the AWS console.
  2. Navigate to the Lambda page.
  3. To create a function, click Create function.
  4. Select the Author from scratch option.
  5. In the Function name box, enter a name to identify the function.
  6. From the Runtime list, select the required language, such as, Python.
  7. Under Execution role, select the Create a new role with basic Lambda permissions option.
  8. Click Create function.

    Note: Ensure that the Lambda function must have access to the Kinesis stream, SQS access. The function must also have the LambdaBasicExecutionRole permissions and LambdaVPCAccessExecutionRole permissions.

Attaching a VPC to the Lambda Function

  1. To edit the function and attach a VPC, on the Lambda page, click the function name.
  2. Click the Configuration tab.
  3. From the left pane, click VPC.
  4. To modify the configuration, click Edit.
  5. From the VPC list, select the required VPC.
  6. From the Subnets list, select the required subnet.

    Note: Ensure the subnet can connect to the ESA IP address.

  7. From the Security groups list, select the group that allows egress to the ESA IP address.
  8. To persist the changes, click Save.

    Note: Attaching a Lambda function to a VPC without any NAT or endpoints can result in the Lambda function being unable to call the AWS APIs including the Kinesis stream.

Adding a Trigger to the Kinesis Stream

  1. To add a trigger to the Kinesis stream, click the Triggers tab.
  2. Click Add trigger.
  3. From the Trigger configuration list, select the source as Kinesis.
  4. From the Kinesis stream list, select the required stream.
  5. In the Batch size box, enter 200.
  6. In the Batch window box, enter any value between 1 and 5.
  7. Click Add.
  8. To configure the retry behavior, navigate to the Lambda page.
  9. Click Event source mappings.
  10. Click the required Kinesis trigger.
  11. Click the Configuration tab.
  12. Enable the Bisect batch on function error feature.
  13. Set the Maximum retry attempts to 10 or more.
  14. Set the Maximum record age to a longer duration.

Providing the CA.pem File to the Lambda Function

The CA.pem file must be provided to the Lambda function. The Curl component requires these certificates for TLS verification. The optimal and secure approach is to store the CA.pem file in the Secrets Manager.

Downloading the CA.pem File

  1. Log in to the ESA through a terminal having the required permissions.

  2. Navigate to the /etc/ksa/certificates/plug/ directory.

  3. Download the CA.pem file from this directory.

  4. After certificate is downloaded, open the PEM file in any text editor.

  5. Replace all new lines with escaped new line: \n.

  6. To escape new lines from command line, use one of the following commands depending on the operating system:

    For Linux:

    awk 'NF {printf "%s\\n",$0;}' CA.pem > output.txt
    

    For Windows PowerShell:

    (Get-Content '.\CA.pem') -join '\n' | Set-Content 'output.txt'
    

Storing the Certificates

  1. Log in to the AWS console.
  2. Navigate to the Secrets Manager page.
  3. Click Store a new secret.
  4. Under Secret type, select Other type of secret.
  5. In the Key box, enter ca_pem.
  6. In the value box, enter the contents of the CA.pem file.
  7. Click Next.
  8. Enter a name to identify the secret.
  9. Click Next.
  10. Click Store.
  11. Note the Secret ARN.

Setting up the Lambda Function

To set up the Lambda function:

  1. Log in to the AWS console.
  2. Navigate to the Lambda page.
  3. Click the required function.
  4. Click the Code tab.
  5. Click the lambda_function.py function.
  6. Paste the code from the lambda_function.py file that was generated after executing the configurator script.
  7. Click Deploy.
  8. Click the Configuration tab.
  9. From the left pane, click Permissions.
  10. Click the Role name to open the Role page.
  11. From the Add permissions list, select Create inline policy.
  12. Under Policy editor, select JSON.
  13. Paste the following policy:
    {
      "Version": "2012-10-17",
      "Statement": [
    	{
    		"Sid": "AllowGetSpecificSecret",
    		"Effect": "Allow",
    		"Action": [
    			"secretsmanager:GetSecretValue",
    			"secretsmanager:DescribeSecret"
    		],
    		"Resource": "arn:aws:secretsmanager:<region_name>:<Account_ID>:secret:<secret_name>"
         }
      ]
    }
    
  14. Click Next.
  15. In the Policy name box, enter a name for the policy.
  16. Click Create.
  17. Navigate to the Lambda page.
  18. Click the required function.
  19. From the left pane, click Environment variables.
  20. Click Edit and add the following variables in the key:value format:
ESA_BULK_URL = https://<ESA_IP_Address>:9200/pty_insight_audit/_bulk?pipeline=logs_pipeline
ESA_CA_SECRET_ID = <ARN_of_the_Secret_from_Secret_Manager>
ESA_CA_SECRET_JSON_KEY = ca_pem
ONLY_MATCH_SUBSTRING = "logtype" (optional extra filter)
BULK_MAX_BYTES = 5242880 (5MB)
HTTP_TIMEOUT_SEC = 120
  1. To persist the changes, click Save.

Troubleshooting

Validate each hop before moving to the next. Most issues are isolated to one hop.

Verify logs are reaching CloudWatch (EMR → CloudWatch)

Where to check:

  • CloudWatch Logs → Log groups → /aws/<log_group_name>
  • Open the latest log stream.

What to check:

  • New log events should appear while the EMR Serverless job is running.
  • If you do not see new events, the problem is upstream (EMR monitoring config or EMR execution role permissions).

If this fails:

  • Confirm the EMR Serverless job run has CloudWatch logging enabled.
  • Confirm the execution role attached to the job/application has permissions to write to the log group/streams.

Verify CloudWatch Subscription Filter is configured (CloudWatch → Kinesis)

Where to check:

  • CloudWatch Logs → Log groups → /aws/<log_group_name> → Subscription filters

What to check:

  • A subscription filter exists.
  • Destination is the correct Kinesis Data Stream.
  • The filter pattern matches your logs.

Recommended test:

  • Temporarily set a permissive filter (for testing):
    • Match all: ""
    • Or minimal match: “logtype”
  • Save and observe whether data begins flowing into Kinesis.

If this fails:

  • Most common cause is IAM permissions for CloudWatch Logs to write records into Kinesis (destination access role / resource policy).

Verify Kinesis is receiving events (Kinesis ingestion)

Where to check:

  • Kinesis → Data streams → → Monitoring

What to check:

  • IncomingRecords should be greater than 0 during active logging.
  • IncomingBytes should also increase.

If this fails:

  • CloudWatch subscription filter is not delivering. Possible causes can include incorrect stream, incorrect filter pattern, or missing permissions.

Verify Lambda Function is triggered (Kinesis → Lambda)

Where to check:

  • Lambda → → Configuration → Triggers
  • Lambda → Monitor

What to check:

  • Kinesis trigger exists and is Enabled.
  • Monitor metrics:
    • Invocations should increase.
    • Errors should be 0 (or very low).

If this fails:

  • Trigger/event source mapping may be disabled, misconfigured, or pointing to the wrong stream.

Validate Lambda processing and payload (Lambda internal validation)

Where to check:

  • CloudWatch Logs → Log groups → /aws/lambda/

What to check:

  • Confirm Lambda is actually parsing events:
    • docs_seen= should be > 0
    • bulk_calls= should be >= 1 when data exists
  • Confirm outbound calls:
    • Log should show ESA HTTP status=200
    • ESA bulk response should not show errors:true

Common failure patterns:

  • TLS/CA errors
    • NO_CERTIFICATE - indicates the CA.pem file loaded from Secrets Manager is empty/malformed.
    • CERTIFICATE_VERIFY_FAILED - indicates incorrect CA chain or wrong certificate for the ESA endpoint.
  • Filtering too strict
    • If docs_seen=0, your ONLY_MATCH_SUBSTRING or JSON-line parsing is skipping everything.

Validate ESA ingestion (Lambda → ESA)

Where to check:

  • Lambda log output for ESA bulk response.
  • ESA/OpenSearch logs (if accessible).
  • Index / pipeline configuration.

What to check:

  • Bulk response should show:
    • errors: false
    • Successful item status (2xx)
  • If errors: true, inspect first error item:
    • Strict mapping exceptions indicate you are sending fields that are not allowed by index mapping.
    • Pipeline errors indicate ingest pipeline expects different fields or types.

Quick Diagnosis Rules

  • CloudWatch log streams have events, but Kinesis IncomingRecords=0 → Subscription filter / IAM permissions / wrong destination stream.
  • Kinesis has IncomingRecords>0, but Lambda Invocations=0 → Kinesis trigger (event source mapping) disabled/misconfigured.
  • Lambda invokes, but ESA is not receiving logs: → TLS/CA issue, ESA bulk endpoint issue, pipeline/mapping errors, or filter logic dropping events.

Last modified : January 13, 2026