Key-Value - Output
Continuing from the last tutorial about inputs using key-values, we will use store entries as key-values. We can view this as the reverse of the previous tutorial.
Prerequisites
This guide uses local
Fluvio cluster. If you need to install it, please follow the instructions at here.
Dataflow
Overview
For this example, we will write a dataflow that tracks books in a library system. It will take a book from the source and return the book as a value combined with a barcode as the key. We will write a short hashing function that takes the title and hashes it into a barcode that will serve as the key for our sink.
Define the types
Like the previous example, we will need to define our types. This time, its a little shorter.
types:
book:
type: object
properties:
name:
type: string
year:
type: u32
author:
type: string
Topic List
The following is our list of topics.
topics:
new-book:
schema:
value:
type: book
books-in-system:
schema:
key:
type: string
value:
type: book
Source
The new-book
is the source topic.
Sink
The books-in-system
is the sink topic. It contains a string
key and a book
object.
Transform
We will apply a transform to take the Book
entries into the tuple (String,Book)
.
transforms:
- operator: map
run: |
fn newbook(book:Book) -> Result<(Option<String>,Book)>{
let mut p:u32 = 53;
let m:u32 = 1000000009;
let mut hash:u32 = 0;
for c in book.name.chars() {
hash = (hash + (c as u32)*p) % m;
p = (p*53) % m;
}
Ok((Some(hash.to_string()),book))
}
The following just calculates the hash value and returns it as the barcode.
Running the Example
Full Code
Copy and paste following config and save it as dataflow.yaml
.
apiVersion: 0.5.0
meta:
name: key-value-output
version: 0.1.0
namespace: example
config:
converter: json
consumer:
default_starting_offset:
value: 0
position: End
types:
book:
type: object
properties:
name:
type: string
year:
type: u32
author:
type: string
topics:
new-book:
schema:
value:
type: book
books-in-system:
schema:
key:
type: string
value:
type: book
services:
addbook:
sources:
- type: topic
id: new-book
transforms:
- operator: map
run: |
fn newbook(book:Book) -> Result<(Option<String>,Book)>{
let mut p:u32 = 53;
let m:u32 = 1000000009;
let mut hash:u32 = 0;
for c in book.name.chars() {
hash = (hash + (c as u32)*p) % m;
p = (p*53) % m;
}
Ok((Some(hash.to_string()),book))
}
sinks:
- type: topic
id: books-in-system
Running SDF
To run example:
$ sdf run --ephemeral
Produce data
We will produce some data by first writing it into a file name book.txt
.
{ "author": "Brian W. Kernighan and Dennis M. Ritchie", "name": "C Programming Language", "year": 1988 }
{ "author": "Anany Levitin", "name": "Introduction to the Design and Analysis of Algorithms", "year": 2001 }
{ "author": "Steve Klabnik and Carol Nichols", "name": "The Rust Programming Language", "year": 2018 }
{ "author": "Steve Klabnik and Carol Nichols", "name": "Rust Cookbook", "year": 2020 }
We can produce data via
$ fluvio produce new-book -f book.txt
$ fluvio consume new-book -Bdk
Consume data
To consume the data
$ fluvio consume books-in-system -Bdk
[439139953] {"author":"Brian W. Kernighan and Dennis M. Ritchie","name":"C Programming Language","year":1988}
[747501183] {"author":"Anany Levitin","name":"Introduction to the Design and Analysis of Algorithms","year":2001}
[305905177] {"author":"Steve Klabnik and Carol Nichols","name":"The Rust Programming Language","year":2018}
[605191240] {"author":"Steve Klabnik and Carol Nichols","name":"Rust Cookbook","year":2020}
We can see that the key is the hash value followed by the data of the book.
Cleanup
Exit sdf
terminal and clean-up. The --force
flag removes the topics:
$ sdf clean --force
Conclusion
This how-to focused on using key-values as out. The following pages contains another example of key-value
as inputs.