Sunday, February 19, 2023

MLOps - simulating a streaming new data set

Approach is below . This assumes the use of MNIST dataset 

img_rows, img_cols = 28, 28

from keras.datasets import mnist

(x_train, y_train), (x_test, y_test) = mnist.load_data()

# Set train samples apart that will serve as streaming data later on

x_stream = x_train[:20000]

y_stream = y_train[:20000]

x_train = x_train[20000:]

y_train = y_train[20000:]

stream_sample = [x_stream, y_stream]

The stream_sample is the sample for streaming 

pickle.dump(stream_sample, open(os.getcwd() + kwargs['path_stream_sample'], "wb"))

The stream sample is now written to the pickle file 

Now lets construct the model 

model = Sequential()

model.add(Conv2D(32, kernel_size=(3, 3),



model.add(Conv2D(64, (3, 3), activation='relu'))

model.add(MaxPooling2D(pool_size=(2, 2)))



model.add(Dense(128, activation='relu'))


model.add(Dense(num_classes, activation='softmax'))




Now fitting the model is as below, y_train,




          validation_data=(x_test, y_test))

# now evaluate the model 

score = model.evaluate(x_test, y_test, verbose=0)'Test - loss:', score[0])'Test - accuracy:', score[1]) + kwargs['initial_model_path'])

Now having both stream set and the trained model, lets feed additional data 

For it, using Kafka


Kafka is one of the go-to platforms when you have to deal with streaming data. Its framework basically consists of three players, being 1) brokers; 2) producers; and 3) consumers.

A broker is an instance of a Kafka server (also known as a Kafka node) that hosts named streams of records, which are called topics. A broker takes in messages from producers and stores them to a topic. It in turn enables consumers to fetch messages from a topic.

In its simplest form, you have one single producer pushing messages to one end of a topic, whilst one single consumer fetches messages from the other end of the topic (like for example an app). In the situation of our case where we have Kafka running locally, a single setup likes this (shown below) does the trick.

With the help of the Kafka-Python API we can now simulate a data stream by constructing a Producer that publishes messages to the topic. 

def generate_stream(**kwargs):

producer = KafkaProducer(bootstrap_servers=['kafka:9092'],                              # set up Producer

                         value_serializer=lambda x: dumps(x).encode('utf-8'))

stream_sample = pickle.load(open(os.getcwd() + kwargs['path_stream_sample'], "rb"))       # load stream sample file

rand = random.sample(range(0, 20000), 200)                                                # the stream sample consists of 20000 observations - and along this setup 200 samples are selected randomly

x_new = stream_sample[0]

y_new = stream_sample[1]'Partitions: ', producer.partitions_for('TopicA'))

for i in rand:

json_comb = encode_to_json(x_new[i], y_new[i])                                         # pick observation and encode to JSON

producer.send('TopicA', value=json_comb)                                               # send encoded observation to Kafka topic"Sent number: {}".format(y_new[i]))



Now the task will be to receive the stream data 

To fetch the data from the Kafka topic, we turn again to the Kafka-Python API to construct a Consumer. This Consumer is wrapped in a function that sequentially retrieves observations from the topic, which it in turn converts back from JSON to its original format and groups together in a NumPy array which is stored (in pickle format) in the to_use_for_training folder. 

def get_data_from_kafka(**kwargs):

    consumer = KafkaConsumer(

        kwargs['topic'],                                # specify topic to consume from


        consumer_timeout_ms=3000,                       # break connection if the consumer has fetched anything for 3 secs (e.g. in case of an empty topic)

        auto_offset_reset='earliest',                   # automatically reset the offset to the earliest offset (should the current offset be deleted or anything)

        enable_auto_commit=True,                        # offsets are committed automatically by the consumer


        value_deserializer=lambda x: loads(x.decode('utf-8')))'Consumer constructed')


        xs = []

        ys = []

        for message in consumer:                            # loop over messages

   "Offset: ", message.offset)

            message = message.value

            x, y = decode_json(message)            # decode JSON



  'Image retrieved from topic')

        xs = np.array(xs).reshape(-1, 28, 28, 1)            # put Xs in the right shape for our CNN

        ys = np.array(ys).reshape(-1)                       # put ys in the right shape for our CNN

        new_samples = [xs, ys]

        pickle.dump(new_samples, open(os.getcwd()+kwargs['path_new_data']+str(time.strftime("%Y%m%d_%H%M"))+"_new_samples.p", "wb"))     # write data
[0])+' new samples retrieved')


    except Exception as e:

        print(e)'Error: '+e)

The update_model function in does most of the heavy lifting:

it takes in the data we fetched from the Kafka topic

it loads the current model and gauges how it scores on the test set*

it does a number of epochs of gradient descent with the new data and accordingly adjusts the weights of the model**

it then tests whether the adjusted model scores better on the test set than the current version — and if it does, it replaces the current version and moves the latter to a model archive. If it doesn’t it sticks to the current version of the model

in addition, it moves the data it used for updating the model to the used_for_training folder and logs a set of metrics corresponding to each update run to MLFlow


