event_kafka Module


Table of Contents

1. Admin Guide
1.1. Overview
1.2. Kafka socket syntax
1.3. Kafka events syntax
1.4. Dependencies
1.4.1. OpenSIPS Modules
1.4.2. External Libraries or Applications
1.5. Exported Parameters
1.5.1. broker_id (string)
1.6. Exported Functions
1.6.1. kafka_publish(broker_id, message, [key], [report_route])
1.7. Examples
2. Contributors
2.1. By Commit Statistics
2.2. By Commit Activity
3. Documentation
3.1. Contributors

List of Tables

2.1. Top contributors by DevScore(1), authored commits(2) and lines added/removed(3)
2.2. Most recently active contributors(1) to this module

List of Examples

1.1. Set broker_id parameter
1.2. kafka_publish() function usage
1.3. E_PIKE_BLOCKED event
1.4. Kafka socket

Chapter1.Admin Guide

1.1.Overview

This module is an implementation of an Apache Kafka producer. It serves as a transport backend for the Event Interface and also provides a stand-alone connector to be used from the OpenSIPS script in order to publish messages to Kafka brokers.

1.2.Kafka socket syntax

'kafka:' brokers '/' topic ['?' properties]

Meaning of the socket fields:

  • brokers - comma-separated list of the addresses (as host:port) of the Kafka brokers to connect to. These are the "bootstrap" servers used by the client to discover the Kafka cluster. This corresponds to the bootstrap.servers / metadata.broker.list configuration property.

  • topic - Kafka topic used to publish messages to.

  • properties - configuration properties to be transparently passed to the Kafka client library. The syntax is:

    'g.'|'t.' property '=' value ['&' 'g.'|'t.' property '=' value] ...

    The g. or t. prefix before each property name specifies whether it's a global or topic level property, as classified by the Kafka library. Documentation for the supported properties can be found here.

    Note that some library properties have the topic. prefix as part of their name, but still fall under the global category.

    key=callid is an extra property that is not passed to the Kafka library and is interpreted by OpenSIPS itself. When enabling this property the record published to Kafka will also include the Call-ID of the current SIP message as key.

1.3.Kafka events syntax

The event payload is formated as a JSON-RPC notification, with the event name as the method field and the event parameters as the params field.

The record published to Kafka will also include the Call-ID of the current SIP message as key, if the key=callid property is provided in the event socket.

1.4.Dependencies

1.4.1.OpenSIPS Modules

The following modules must be loaded before this module:

  • none.

1.4.2.External Libraries or Applications

The following libraries or applications must be installed before running OpenSIPS with this module loaded:

  • librdkafka-dev

librdkafka-dev can be installed from the Confluent APT or YUM repositories.

1.5.Exported Parameters

1.5.1.broker_id (string)

This parameter specifies the configuration for a Kafka broker (or cluster) that can be used to publish messages directly from the script, using the kafka_publish() function.

The format of the parameter is: [ID]kafka_socket, where ID is an identifier for this broker instance and kafka_socket is a specification similar to the Kafka socket syntax.

The key=callid property does not have an effect for brokers configured through this parameter.

This parameter can be set multiple times.

Example1.1.Set broker_id parameter

...
modparam("event_kafka", "broker_id", "[k1]127.0.0.1:9092/topic1?g.linger.ms=100&t.acks=all")
...

1.6.Exported Functions

1.6.1. kafka_publish(broker_id, message, [key], [report_route])

Publishes a message to a Kafka broker (or cluster). As the actual send operation is done in an asynchronous manner, a report route may be provided in order to check the message delivery status.

Returns 1 if the message was succesfully queued for sending or -1 otherwise.

This function can be used from any route.

The function has the following parameters:

  • broker_id (string) - the ID of the Kafka broker (or cluster). Must be one of the IDs defined through the broker_id modparam.

  • message (string) - the payload of the Kafka message to publish.

  • key (string, optional) - the key of the Kafka record to publish.

  • report_route (string, static, optional) - name of a script route to be executed when the message delivery status is available. Information about the message publishing will be available in this route through the following AVP variables:

    • $avp(kafka_id) - broker ID

    • $avp(kafka_status) - delivery status, 0 if succesfull, -1 othewise

    • $avp(kafka_key) - message key

    • $avp(kafka_msg) - message payload

Example1.2.kafka_publish() function usage

	...
	$var(msg) = "my msg content";
	kafka_publish("k1", $var(kmsg), $ci, "kafka_report");
	...
	route[kafka_report] {
		xlog("Delivery status: $avp(kafka_status) for broker: $avp(kafka_id)\n");
	}
	...
	

1.7.Examples

Example1.3.E_PIKE_BLOCKED event


{
  "jsonrpc": "2.0",
  "method": "E_PIKE_BLOCKED",
  "params": {
    "ip": "192.168.2.11"
  }
}


Example1.4.Kafka socket


	kafka:127.0.0.1:9092/topic1?t.message.timeout.ms=1000&key=callid


Chapter2.Contributors

2.1.By Commit Statistics

Table2.1.Top contributors by DevScore(1), authored commits(2) and lines added/removed(3)

NameDevScoreCommitsLines ++Lines --
1. Vlad Patrascu (@rvlad-patrascu)214190816

(1) DevScore = author_commits + author_lines_added / (project_lines_added / project_commits) + author_lines_deleted / (project_lines_deleted / project_commits)

(2) including any documentation-related commits, excluding merge commits. Regarding imported patches/code, we do our best to count the work on behalf of the proper owner, as per the "fix_authors" and "mod_renames" arrays in opensips/doc/build-contrib.sh. If you identify any patches/commits which do not get properly attributed to you, please submit a pull request which extends "fix_authors" and/or "mod_renames".

(3) ignoring whitespace edits, renamed files and auto-generated files

2.2.By Commit Activity

Table2.2.Most recently active contributors(1) to this module

NameCommit Activity
1. Vlad Patrascu (@rvlad-patrascu)Aug 2020 - Mar 2021

(1) including any documentation-related commits, excluding merge commits

Chapter3.Documentation

3.1.Contributors

Last edited by: Vlad Patrascu (@rvlad-patrascu).

Documentation Copyrights:

Copyright 2020 www.opensips-solutions.com