Python | A simple prototype for tracking human movement

At times social networking sites, job portals and even the government need to keep a tab over the movement of the people from one place to another (or one nation to another). Governments do this analysis on a regular basis to check the rate of urbanization.

Assumptions: Let’s take the example of a Social Networking site. Here I have assumed that we have the following table (In Hadoop or any other Big Data framework of your choice) having the data of people moved from one place to another.

id                                           int #Autogenerated ID
uid                                         int #User ID
some_other_id                  int
cur_location                       int  #Location ID mapped to “Location”  table

So everytime an user updates his/her location we do an INSERT into the database rather than an UPDATE. This way we have the record of the user’s movement in chronological order which can be retrieved using his “User ID”.

Architecture:

Retrieve data [Hadoop Layer using Hive]

Process it into meaningful data [Python]

Store results [MySQL] (So that it is easy to use for displaying purposes)

Python Mining Engine:

Here is a simple python program I wrote for this simple application. In actual real world scenarios you might have to use or invent some complex data mining algorithms. For the purposes of this blog post I successfully used the python script for processing 35,000 records successfully. Here is the complete code (you can also check it out here ):

__author__ = 'Abhay Gupta'
__version__ = 0.1

import pyhs2
import time
import MySQLdb
import datetime
import sys
import pytz

def DATE_TIME():
    return datetime.datetime.now(pytz.timezone('Asia/Calcutta'))

def FORMATTED_TIME():
    return datetime.datetime.strptime(str(DATE_TIME()).split('.')[0], '%Y-%m-%d %H:%M:%S')

#Spinner
def DrawSpinner(counter):
    if counter % 4 == 0:
        sys.stdout.write("/")
    elif counter % 4 == 1:
        sys.stdout.write("-")
    elif counter % 4 == 2:
        sys.stdout.write("\\")
    elif counter % 4 == 3:
        sys.stdout.write("|")
    sys.stdout.flush()
    sys.stdout.write('\b')

#Generator
def neighourhood(iterable):
    iterator = iter(iterable)
    prev = None
    item = iterator.next()  # throws StopIteration if empty.
    for next in iterator:
        yield (prev,item,next)
        prev = item
        item = next
    yield (prev,item,None)

#hive cursor
def get_cursor():
        conn = pyhs2.connect(host='',
               port=10000,
               authMechanism="PLAIN",
               user='hadoop',
               password='',
               database='test')
        return conn.cursor()

def get_mysql_cursor():
        conn = MySQLdb.connect(user='db', passwd='',
                              host='',
                              db='test')
        return conn.cursor()

def get_records():
        cur = get_cursor()
        cur.execute("select * from user_location_history")
        #Fetch table results
        return cur.fetchall()

def get_user_movement():
        #Initializing
        location_dict = {}
        #Fetching all the records
        records = get_records()
        counter = 0
        for record in records:
                counter = counter + 1
                DrawSpinner(counter)
                if location_dict.has_key(record[1]):
                        location_dict[record[1]].append(int(record[3]))
                else:
                        location_dict[record[1]] = [int(record[3])]
        return location_dict

#For performance improvements use list instead of dictionary as we don't need count of the users here
def prepare_movement_data():
        #Initializing
        city_movement_data_dict = {}
        user_location_dict = get_user_movement()
counter = 0
        for user_id, user_movement_path in user_location_dict.iteritems():
                counter = counter + 1
                DrawSpinner(counter)
                if len(set(user_movement_path)) > 1:
                        if city_movement_data_dict.has_key(tuple(user_movement_path)):
                                city_movement_data_dict[tuple(user_movement_path)] = city_movement_data_dict[tuple(user_movement_path)] + 1
                        else:
                                city_movement_data_dict[tuple(user_movement_path)] = 1
        return city_movement_data_dict

def store_mining_results(unique_movement_map_tuple):
        sql_query = None
        insert_flag = False
        update_flag = False
        cur = get_mysql_cursor()
        for prev, current, next in neighourhood(unique_movement_map_tuple):
                #Execute query
                cur.execute('select * from user_flow_location')
                #Handling the empty table case
                fetched_data = cur.fetchall()
                if len(fetched_data) == 0 and prev is not None and current is not None and prev != current and current != next:
                        sql_query = 'insert into user_flow_location(location_from, location_to, count)\
                                                values('+  str(prev) + ',' + str(current) + ', 1 )'
                        cur.execute(sql_query)
                else:
                        insert_flag = False
                        update_flag = False
                        for record in fetched_data:
                                if record[2] == prev and record[3] == current:
                                        update_flag = True
                                        sql_query = 'update user_flow_location set count = ' + str(record[3] + 1) +\
                                        ' where id = ' + str(record[0])
                                        cur.execute(sql_query)
                                elif prev is not None and current is not None and prev != current and current != next:
                                        insert_flag = True
                if update_flag == False and insert_flag == True:
                        #Insert only if the entry doesn't exists
                        #A quick fix. It can be improved later.
                        cur2 = get_mysql_cursor()
                        cur2.execute('select * from user_flow_location where location_from = ' +\
                                         str(prev) + ' and location_to = ' + str(current))
                        if len(cur2.fetchall()) == 0:
                                sql_query = 'insert into user_flow_location\
                                                (location_from, location_to, count)\
                                                values('+  str(prev) + ',' + str(current)+', 1)'
                                cur.execute(sql_query)
                        cur2.close()
        cur.close()

if __name__ == '__main__':
        start_time = time.time()
        #spinner = spinning_cursor()
        print 'Preparing data...'
        city_movement_data_dict = prepare_movement_data()
        sys.stdout.flush()
        sys.stdout.write('\b')
        print '\nData prepared for mining.'
        print 'Processing data and storing results...'
        counter = 0
        for unique_movement_map, unique_user_count in city_movement_data_dict.iteritems():
                #print str(unique_movement_map), ' : ', str(unique_user_count), ' user(s) relocated through this path'
                store_mining_results(unique_movement_map)
                counter = counter + 1
                DrawSpinner(counter)
        sys.stdout.flush()
        sys.stdout.write('\b')
        end_time = time.time()
        print '\nData mining complete!'
        print '\nTotal execution time = ', str(end_time - start_time), ' seconds\n'

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s