Sunday, February 19, 2023

MLOps - simulating a streaming new data set

Approach is below . This assumes the use of MNIST dataset 

img_rows, img_cols = 28, 28

from keras.datasets import mnist

(x_train, y_train), (x_test, y_test) = mnist.load_data()

# Set train samples apart that will serve as streaming data later on

x_stream = x_train[:20000]

y_stream = y_train[:20000]

x_train = x_train[20000:]

y_train = y_train[20000:]


stream_sample = [x_stream, y_stream]

The stream_sample is the sample for streaming 


pickle.dump(stream_sample, open(os.getcwd() + kwargs['path_stream_sample'], "wb"))


The stream sample is now written to the pickle file 


Now lets construct the model 



model = Sequential()

model.add(Conv2D(32, kernel_size=(3, 3),

activation='relu',

input_shape=input_shape))

model.add(Conv2D(64, (3, 3), activation='relu'))

model.add(MaxPooling2D(pool_size=(2, 2)))

model.add(Dropout(0.25))

model.add(Flatten())

model.add(Dense(128, activation='relu'))

model.add(Dropout(0.5))

model.add(Dense(num_classes, activation='softmax'))


model.compile(loss=keras.losses.categorical_crossentropy,

  optimizer=keras.optimizers.Adadelta(),

  metrics=['accuracy'])


Now fitting the model is as below 



model.fit(x_train, y_train,

          batch_size=kwargs['batch_size'],

          epochs=kwargs['epochs'],

          verbose=1,

          validation_data=(x_test, y_test))



# now evaluate the model 


score = model.evaluate(x_test, y_test, verbose=0)


logging.info('Test - loss:', score[0])

logging.info('Test - accuracy:', score[1])


model.save(os.getcwd() + kwargs['initial_model_path'])



Now having both stream set and the trained model, lets feed additional data 


For it, using Kafka

 

Kafka is one of the go-to platforms when you have to deal with streaming data. Its framework basically consists of three players, being 1) brokers; 2) producers; and 3) consumers.


A broker is an instance of a Kafka server (also known as a Kafka node) that hosts named streams of records, which are called topics. A broker takes in messages from producers and stores them to a topic. It in turn enables consumers to fetch messages from a topic.



In its simplest form, you have one single producer pushing messages to one end of a topic, whilst one single consumer fetches messages from the other end of the topic (like for example an app). In the situation of our case where we have Kafka running locally, a single setup likes this (shown below) does the trick.


With the help of the Kafka-Python API we can now simulate a data stream by constructing a Producer that publishes messages to the topic. 



def generate_stream(**kwargs):


producer = KafkaProducer(bootstrap_servers=['kafka:9092'],                              # set up Producer

                         value_serializer=lambda x: dumps(x).encode('utf-8'))


stream_sample = pickle.load(open(os.getcwd() + kwargs['path_stream_sample'], "rb"))       # load stream sample file


rand = random.sample(range(0, 20000), 200)                                                # the stream sample consists of 20000 observations - and along this setup 200 samples are selected randomly


x_new = stream_sample[0]

y_new = stream_sample[1]


logging.info('Partitions: ', producer.partitions_for('TopicA'))


for i in rand:

json_comb = encode_to_json(x_new[i], y_new[i])                                         # pick observation and encode to JSON

producer.send('TopicA', value=json_comb)                                               # send encoded observation to Kafka topic

logging.info("Sent number: {}".format(y_new[i]))

sleep(1)


producer.close()




Now the task will be to receive the stream data 


To fetch the data from the Kafka topic, we turn again to the Kafka-Python API to construct a Consumer. This Consumer is wrapped in a function that sequentially retrieves observations from the topic, which it in turn converts back from JSON to its original format and groups together in a NumPy array which is stored (in pickle format) in the to_use_for_training folder. 



def get_data_from_kafka(**kwargs):


    consumer = KafkaConsumer(

        kwargs['topic'],                                # specify topic to consume from

        bootstrap_servers=[kwargs['client']],

        consumer_timeout_ms=3000,                       # break connection if the consumer has fetched anything for 3 secs (e.g. in case of an empty topic)

        auto_offset_reset='earliest',                   # automatically reset the offset to the earliest offset (should the current offset be deleted or anything)

        enable_auto_commit=True,                        # offsets are committed automatically by the consumer

        #group_id='my-group',

        value_deserializer=lambda x: loads(x.decode('utf-8')))



    logging.info('Consumer constructed')


    try:


        xs = []

        ys = []


        for message in consumer:                            # loop over messages


            logging.info( "Offset: ", message.offset)

            message = message.value

            x, y = decode_json(message)            # decode JSON


            xs.append(x)

            ys.append(y)


            logging.info('Image retrieved from topic')


        xs = np.array(xs).reshape(-1, 28, 28, 1)            # put Xs in the right shape for our CNN

        ys = np.array(ys).reshape(-1)                       # put ys in the right shape for our CNN


        new_samples = [xs, ys]


        pickle.dump(new_samples, open(os.getcwd()+kwargs['path_new_data']+str(time.strftime("%Y%m%d_%H%M"))+"_new_samples.p", "wb"))     # write data


        logging.info(str(xs.shape[0])+' new samples retrieved')


        consumer.close()


    except Exception as e:

        print(e)

        logging.info('Error: '+e)



The update_model function in update_functions.py does most of the heavy lifting:


it takes in the data we fetched from the Kafka topic


it loads the current model and gauges how it scores on the test set*


it does a number of epochs of gradient descent with the new data and accordingly adjusts the weights of the model**


it then tests whether the adjusted model scores better on the test set than the current version — and if it does, it replaces the current version and moves the latter to a model archive. If it doesn’t it sticks to the current version of the model


in addition, it moves the data it used for updating the model to the used_for_training folder and logs a set of metrics corresponding to each update run to MLFlow


references:

https://www.vantage-ai.com/en/blog/keeping-your-ml-model-in-shape-with-kafka-airflow-and-mlflow

No comments:

Post a Comment