2 years ago

#69555

test-img

irr_pie

Pyflink session window aggregation by separate keys

I'm trying to wrap my head around the pyflink datastream api. My use case is the following: The source is a kinesis datastream consisting of the following:

cookie cluster dim0 dim1 dim2 time_event
1 1 5 5 5 1min
1 2 1 0 6 30min
2 1 1 2 3 45min
1 1 10 10 15 70min
2 1 5 5 10 120min

I want to create a session window aggregation with a gap of 60 minutes, calculating the mean for each cookie-cluster combination. The window assignment should be based on the cookie, the aggregation based on cookie and cluster.

The result would therefore be like this (each row being forwarded immediately):

cookie cluster dim0 dim1 dim2 time_event
1 1 5 5 5 1min
1 2 1 0 6 30min
2 1 1 2 3 45min
1 1 7.5 7.5 10 70 min
2 1 5 5 10 120min

Expressed in SQL, for a new record I'd like to perform this aggregation:

INSERT INTO `input` (`cookie`, `cluster`, `dim0`, `dim1`, `dim2`, `time_event`) VALUES
    ("1", "1", 0, 0, 0, 125)

WITH RECURSIVE by_key AS (
    SELECT *,
    (time_event - lag(time_event) over (partition by cookie order by time_event)) as "time_passed"
    FROM input
    WHERE cookie = "1"
    ),
new_session AS (
    SELECT *, 
    CASE WHEN time_passed > 60 THEN 1 ELSE 0 END as "new_session"
    FROM by_key),
by_session AS (
    SELECT *, SUM(new_session) OVER(partition by cookie order by time_event) as "session_number"
    FROM new_session)
SELECT cookie, cluster, avg(dim0), avg(dim1), avg(dim2), max(time_event)
    FROM by_session
    WHERE cluster = "1"
    GROUP BY session_number
    ORDER BY session_number DESC
    LIMIT 1

I tried to accomplish this with the table api, but I need the results to be updated as soon as a new record is added to a cookie-cluster combination. This is my first project with flink, and the datastream API is an entirely different beast, especially since a lot of stuff is not included yet for python.

My current approach looks like this:

  1. Create a table from the kinesis datastream (datastream has no kinesis connector)
  2. Convert it to a datastream to perform the aggregation. From what I've read, watermarks are propagated and the resulting row objects contain the column names, i.e. I can handle them like a python dictionary. Please correct me, if I'm wrong on this.
  3. Key the data stream by the cookie.
  4. Window with a custom SessionWindowsAssigner, borrowing from the Table API. I'm working on a seperate post on that.
  5. Process the windows by calculating the mean for each cluster
table_env = StreamTableEnvironment.create(stream_env, environment_settings=env_settings)
table_env.execute_sql(
        create_table(input_table_name, input_stream, input_region, stream_initpos)
    )
ds = table_env.to_append_stream(input_table_name)
ds.key_by(lambda r: r["cookie"])\
  .window(SessionWindowAssigner(session_gap=60, is_event_time=True)\
  .trigger(OnElementTrigger()).\
  .process(MeanWindowProcessFunction())

My basic idea for the ProcessWindowFunction would go like this:

class MeanWindowProcessFunction(ProcessWindowFunction[Dict, Dict, str, TimeWindow]):

    def process(self,
                key: str,
                content: ProcessWindowFunction.Context,
                elements: Iterable) -> Iterable[Dict]:

            clusters = {}
            cluster_records = {}
            for element in inputs:
                if element["cluster"] not in clusters:
                    clusters[element["cluster"]] = {key: val for key, val in element.as_dict().items()}
                    cluster_records[element["cluster"]] = 0
                else:
                    for dim in range(3):
                        clusters[element["cluster"]][f"dim{dim}"] += element[f"dim{dim}"]
    
                clusters[element["cluster"]]["time_event"] = element["time_event"]
                cluster_records[element["cluster"]] += 1
    
            for cluster in clusters.keys():
                for dim in range(3):
                    clusters[cluster][f"dim{dim}"] /= cluster_records[cluster]

            return clusters.values()

    def clear(self, context: 'ProcessWindowFunction.Context') -> None:
        pass
  • Is this the right approach for this problem?
  • Do I need to consider anything else for the ProcessWindowFunction, like actually implementing the clear method?

I'd be very grateful for any help, or any more elaborate examples of windowed analytics applications in pyflink. Thank you!

python

apache-flink

flink-streaming

pyflink

amazon-kinesis

0 Answers

Your Answer

Accepted video resources